blob: fdeaf1e93b4e9ae9d9efea0e58e1cf0bcb68e96c [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/event_scheduler.h"
2
3#include <algorithm>
4#include <deque>
5
6#include "aos/events/event_loop.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08007#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -07008
9namespace aos {
10
11EventScheduler::Token EventScheduler::Schedule(
Austin Schuh8bd96322020-02-13 21:18:22 -080012 monotonic_clock::time_point time, ::std::function<void()> callback) {
Alex Perrycb7da4b2019-08-28 19:35:56 -070013 return events_list_.emplace(time, callback);
14}
15
16void EventScheduler::Deschedule(EventScheduler::Token token) {
Brian Silvermanbd405c02020-06-23 16:25:23 -070017 // We basically want to DCHECK some nontrivial logic. Guard it with NDEBUG to ensure the compiler
18 // realizes it's all unnecessary when not doing debug checks.
19#ifndef NDEBUG
20 {
21 bool found = false;
22 auto i = events_list_.begin();
23 while (i != events_list_.end()) {
24 if (i == token) {
25 CHECK(!found) << ": The same iterator is in the multimap twice??";
26 found = true;
27 }
28 ++i;
29 }
30 CHECK(found) << ": Trying to deschedule an event which is not scheduled";
31 }
32#endif
Alex Perrycb7da4b2019-08-28 19:35:56 -070033 events_list_.erase(token);
34}
35
Austin Schuh8bd96322020-02-13 21:18:22 -080036aos::monotonic_clock::time_point EventScheduler::OldestEvent() {
37 if (events_list_.empty()) {
38 return monotonic_clock::max_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080039 }
Austin Schuh8bd96322020-02-13 21:18:22 -080040
41 return events_list_.begin()->first;
Alex Perrycb7da4b2019-08-28 19:35:56 -070042}
43
Austin Schuh58646e22021-08-23 23:51:46 -070044void EventScheduler::Shutdown() {
45 on_shutdown_();
46}
47
48void EventScheduler::Startup() {
49 ++boot_count_;
50 RunOnStartup();
51}
52
Austin Schuh8bd96322020-02-13 21:18:22 -080053void EventScheduler::CallOldestEvent() {
54 CHECK_GT(events_list_.size(), 0u);
55 auto iter = events_list_.begin();
Austin Schuh58646e22021-08-23 23:51:46 -070056 const logger::BootTimestamp t =
57 FromDistributedClock(scheduler_scheduler_->distributed_now());
58 VLOG(1) << "Got time back " << t;
59 CHECK_EQ(t.boot, boot_count_);
60 CHECK_EQ(t.time, iter->first) << ": Time is wrong on node " << node_index_;
Austin Schuh8bd96322020-02-13 21:18:22 -080061
62 ::std::function<void()> callback = ::std::move(iter->second);
63 events_list_.erase(iter);
64 callback();
Austin Schuhb7c8d2a2021-07-19 19:22:12 -070065
66 converter_->ObserveTimePassed(scheduler_scheduler_->distributed_now());
Austin Schuh8bd96322020-02-13 21:18:22 -080067}
68
69void EventScheduler::RunOnRun() {
Austin Schuh39788ff2019-12-01 18:22:57 -080070 for (std::function<void()> &on_run : on_run_) {
71 on_run();
72 }
73 on_run_.clear();
Alex Perrycb7da4b2019-08-28 19:35:56 -070074}
75
Austin Schuh057d29f2021-08-21 23:05:15 -070076void EventScheduler::RunOnStartup() {
77 for (size_t i = 0; i < on_startup_.size(); ++i) {
78 on_startup_[i]();
79 }
80 on_startup_.clear();
81}
82
Austin Schuh58646e22021-08-23 23:51:46 -070083void EventScheduler::RunStarted() { started_(); }
84
Austin Schuhac0771c2020-01-07 18:36:30 -080085std::ostream &operator<<(std::ostream &stream,
86 const aos::distributed_clock::time_point &now) {
87 // Print it the same way we print a monotonic time. Literally.
88 stream << monotonic_clock::time_point(now.time_since_epoch());
89 return stream;
90}
91
Austin Schuh8bd96322020-02-13 21:18:22 -080092void EventSchedulerScheduler::AddEventScheduler(EventScheduler *scheduler) {
93 CHECK(std::find(schedulers_.begin(), schedulers_.end(), scheduler) ==
94 schedulers_.end());
95 CHECK(scheduler->scheduler_scheduler_ == nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -070096 CHECK_EQ(scheduler->node_index(), schedulers_.size());
Austin Schuh8bd96322020-02-13 21:18:22 -080097
98 schedulers_.emplace_back(scheduler);
99 scheduler->scheduler_scheduler_ = this;
100}
101
Austin Schuh58646e22021-08-23 23:51:46 -0700102void EventSchedulerScheduler::Reboot() {
103 const std::vector<logger::BootTimestamp> &times =
104 std::get<1>(reboots_.front());
105 CHECK_EQ(times.size(), schedulers_.size());
106
107 VLOG(1) << "Rebooting at " << now_;
108 for (const auto &time : times) {
109 VLOG(1) << " " << time;
110 }
111
112 is_running_ = false;
113
114 // Shut everything down.
115 std::vector<size_t> rebooted;
116 for (size_t node_index = 0; node_index < schedulers_.size(); ++node_index) {
117 if (schedulers_[node_index]->boot_count() == times[node_index].boot) {
118 continue;
119 } else {
120 rebooted.emplace_back(node_index);
121 CHECK_EQ(schedulers_[node_index]->boot_count() + 1,
122 times[node_index].boot);
123 schedulers_[node_index]->Shutdown();
124 }
125 }
126
127 // And start it back up again to reboot. When something starts back up
128 // (especially message_bridge), it could try to send stuff out. We want
129 // to move everything over to the new boot before doing that.
130 for (const size_t node_index : rebooted) {
131 CHECK_EQ(schedulers_[node_index]->boot_count() + 1, times[node_index].boot);
132 schedulers_[node_index]->Startup();
133 }
134
135 for (const size_t node_index : rebooted) {
136 schedulers_[node_index]->RunStarted();
137 }
138
139 for (const size_t node_index : rebooted) {
140 schedulers_[node_index]->RunOnRun();
141 }
142 is_running_ = true;
143}
144
Austin Schuh8bd96322020-02-13 21:18:22 -0800145void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
146 distributed_clock::time_point end_time = now_ + duration;
147 logging::ScopedLogRestorer prev_logger;
148 RunOnRun();
149
150 // Run all the sub-event-schedulers.
151 while (is_running_) {
152 std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
153 OldestEvent();
Austin Schuh58646e22021-08-23 23:51:46 -0700154 if (!reboots_.empty() &&
155 std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
156 // Reboot is next.
157 if (std::get<0>(reboots_.front()) > end_time) {
158 // Reboot is after our end time, give up.
159 is_running_ = false;
160 break;
161 }
162
163 CHECK_LE(now_,
164 std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
165 << ": Simulated time went backwards by too much. Please "
166 "investigate.";
167 now_ = std::get<0>(reboots_.front());
168 Reboot();
169 reboots_.erase(reboots_.begin());
170 continue;
171 }
172
Austin Schuh8bd96322020-02-13 21:18:22 -0800173 // No events left, bail.
174 if (std::get<0>(oldest_event) == distributed_clock::max_time ||
175 std::get<0>(oldest_event) > end_time) {
176 is_running_ = false;
177 break;
178 }
179
180 // We get to pick our tradeoffs here. Either we assume that there are no
181 // backward step changes in our time function for each node, or we have to
Austin Schuh2f8fd752020-09-01 22:38:28 -0700182 // let time go backwards. We currently only really see this happen when 2
183 // events are scheduled for "now", time changes, and there is a nanosecond
184 // or two of rounding due to integer math.
185 //
186 // //aos/events/logging:logger_test triggers this.
187 CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
Austin Schuh8bd96322020-02-13 21:18:22 -0800188 << ": Simulated time went backwards by too much. Please investigate.";
189 now_ = std::get<0>(oldest_event);
190
191 std::get<1>(oldest_event)->CallOldestEvent();
192 }
193
194 now_ = end_time;
195}
196
197void EventSchedulerScheduler::Run() {
198 logging::ScopedLogRestorer prev_logger;
199 RunOnRun();
200 // Run all the sub-event-schedulers.
201 while (is_running_) {
202 std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
203 OldestEvent();
Austin Schuh58646e22021-08-23 23:51:46 -0700204 if (!reboots_.empty() &&
205 std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
206 // Reboot is next.
207 CHECK_LE(now_,
208 std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
209 << ": Simulated time went backwards by too much. Please "
210 "investigate.";
211 now_ = std::get<0>(reboots_.front());
212 Reboot();
213 reboots_.erase(reboots_.begin());
214 continue;
215 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800216 // No events left, bail.
217 if (std::get<0>(oldest_event) == distributed_clock::max_time) {
218 break;
219 }
220
221 // We get to pick our tradeoffs here. Either we assume that there are no
222 // backward step changes in our time function for each node, or we have to
Austin Schuh2f8fd752020-09-01 22:38:28 -0700223 // let time go backwards. We currently only really see this happen when 2
224 // events are scheduled for "now", time changes, and there is a nanosecond
225 // or two of rounding due to integer math.
226 //
227 // //aos/events/logging:logger_test triggers this.
228 CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
Austin Schuh8bd96322020-02-13 21:18:22 -0800229 << ": Simulated time went backwards by too much. Please investigate.";
230 now_ = std::get<0>(oldest_event);
231
232 std::get<1>(oldest_event)->CallOldestEvent();
233 }
234
235 is_running_ = false;
236}
237
238std::tuple<distributed_clock::time_point, EventScheduler *>
239EventSchedulerScheduler::OldestEvent() {
240 distributed_clock::time_point min_event_time = distributed_clock::max_time;
241 EventScheduler *min_scheduler = nullptr;
242
243 // TODO(austin): Don't linearly search... But for N=3, it is probably the
244 // fastest way to do this.
245 for (EventScheduler *scheduler : schedulers_) {
246 const monotonic_clock::time_point monotonic_event_time =
247 scheduler->OldestEvent();
248 if (monotonic_event_time != monotonic_clock::max_time) {
249 const distributed_clock::time_point event_time =
250 scheduler->ToDistributedClock(monotonic_event_time);
251 if (event_time < min_event_time) {
252 min_event_time = event_time;
253 min_scheduler = scheduler;
254 }
255 }
256 }
257
Austin Schuh87dd3832021-01-01 23:07:31 -0800258 if (min_scheduler) {
259 VLOG(1) << "Oldest event " << min_event_time << " on scheduler "
260 << min_scheduler->node_index_;
261 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800262 return std::make_tuple(min_event_time, min_scheduler);
263}
264
Alex Perrycb7da4b2019-08-28 19:35:56 -0700265} // namespace aos