blob: 97c1e8306b8a798a38235c16ad8a8695ce2c09e1 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/event_scheduler.h"
2
3#include <algorithm>
4#include <deque>
5
6#include "aos/events/event_loop.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08007#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -07008
9namespace aos {
10
Austin Schuhef8f1ae2021-12-11 12:35:05 -080011EventScheduler::Token EventScheduler::Schedule(monotonic_clock::time_point time,
12 Event *callback) {
James Kuszmaul86e86c32022-07-21 17:39:47 -070013 CHECK_LE(monotonic_clock::epoch(), time);
Alex Perrycb7da4b2019-08-28 19:35:56 -070014 return events_list_.emplace(time, callback);
15}
16
17void EventScheduler::Deschedule(EventScheduler::Token token) {
Brian Silverman7026e2d2021-11-11 16:15:35 -080018 // We basically want to DCHECK some nontrivial logic. Guard it with NDEBUG to
19 // ensure the compiler realizes it's all unnecessary when not doing debug
20 // checks.
Brian Silvermanbd405c02020-06-23 16:25:23 -070021#ifndef NDEBUG
22 {
23 bool found = false;
24 auto i = events_list_.begin();
25 while (i != events_list_.end()) {
26 if (i == token) {
27 CHECK(!found) << ": The same iterator is in the multimap twice??";
28 found = true;
29 }
30 ++i;
31 }
32 CHECK(found) << ": Trying to deschedule an event which is not scheduled";
33 }
34#endif
Alex Perrycb7da4b2019-08-28 19:35:56 -070035 events_list_.erase(token);
36}
37
Austin Schuh8bd96322020-02-13 21:18:22 -080038aos::monotonic_clock::time_point EventScheduler::OldestEvent() {
James Kuszmaul86e86c32022-07-21 17:39:47 -070039 // If we haven't started yet, schedule a special event for the epoch to allow
40 // ourselves to boot.
41 if (!called_started_) {
42 return aos::monotonic_clock::epoch();
43 }
44
Austin Schuh8bd96322020-02-13 21:18:22 -080045 if (events_list_.empty()) {
46 return monotonic_clock::max_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080047 }
Austin Schuh8bd96322020-02-13 21:18:22 -080048
49 return events_list_.begin()->first;
Alex Perrycb7da4b2019-08-28 19:35:56 -070050}
51
James Kuszmaul86e86c32022-07-21 17:39:47 -070052void EventScheduler::Shutdown() {
53 CHECK(!is_running_);
54 on_shutdown_();
55}
Austin Schuh58646e22021-08-23 23:51:46 -070056
57void EventScheduler::Startup() {
58 ++boot_count_;
James Kuszmaul86e86c32022-07-21 17:39:47 -070059 CHECK(!is_running_);
60 MaybeRunOnStartup();
61 CHECK(called_started_);
Austin Schuh58646e22021-08-23 23:51:46 -070062}
63
Austin Schuh8bd96322020-02-13 21:18:22 -080064void EventScheduler::CallOldestEvent() {
James Kuszmaul86e86c32022-07-21 17:39:47 -070065 if (!called_started_) {
66 // If we haven't started, start.
67 MaybeRunOnStartup();
68 MaybeRunOnRun();
69 CHECK(called_started_);
70 return;
71 }
72 CHECK(is_running_);
Austin Schuh8bd96322020-02-13 21:18:22 -080073 CHECK_GT(events_list_.size(), 0u);
74 auto iter = events_list_.begin();
Austin Schuh58646e22021-08-23 23:51:46 -070075 const logger::BootTimestamp t =
76 FromDistributedClock(scheduler_scheduler_->distributed_now());
Austin Schuhc1ee1b62022-03-22 17:09:52 -070077 VLOG(2) << "Got time back " << t;
Austin Schuh58646e22021-08-23 23:51:46 -070078 CHECK_EQ(t.boot, boot_count_);
79 CHECK_EQ(t.time, iter->first) << ": Time is wrong on node " << node_index_;
Austin Schuh8bd96322020-02-13 21:18:22 -080080
Austin Schuhef8f1ae2021-12-11 12:35:05 -080081 Event *callback = iter->second;
Austin Schuh8bd96322020-02-13 21:18:22 -080082 events_list_.erase(iter);
Austin Schuhef8f1ae2021-12-11 12:35:05 -080083 callback->Handle();
Austin Schuhb7c8d2a2021-07-19 19:22:12 -070084
85 converter_->ObserveTimePassed(scheduler_scheduler_->distributed_now());
Austin Schuh8bd96322020-02-13 21:18:22 -080086}
87
88void EventScheduler::RunOnRun() {
James Kuszmaul86e86c32022-07-21 17:39:47 -070089 CHECK(is_running_);
Austin Schuhe33c08d2022-02-03 18:15:21 -080090 while (!on_run_.empty()) {
91 std::function<void()> fn = std::move(*on_run_.begin());
92 on_run_.erase(on_run_.begin());
93 fn();
Austin Schuh39788ff2019-12-01 18:22:57 -080094 }
Alex Perrycb7da4b2019-08-28 19:35:56 -070095}
96
Austin Schuhe33c08d2022-02-03 18:15:21 -080097void EventScheduler::RunOnStartup() noexcept {
98 while (!on_startup_.empty()) {
James Kuszmaul86e86c32022-07-21 17:39:47 -070099 CHECK(!is_running_);
Austin Schuhe33c08d2022-02-03 18:15:21 -0800100 std::function<void()> fn = std::move(*on_startup_.begin());
101 on_startup_.erase(on_startup_.begin());
102 fn();
Austin Schuh057d29f2021-08-21 23:05:15 -0700103 }
Austin Schuh057d29f2021-08-21 23:05:15 -0700104}
105
Austin Schuhe33c08d2022-02-03 18:15:21 -0800106void EventScheduler::RunStarted() {
James Kuszmaul86e86c32022-07-21 17:39:47 -0700107 CHECK(!is_running_);
Austin Schuhe33c08d2022-02-03 18:15:21 -0800108 if (started_) {
109 started_();
110 }
James Kuszmaul86e86c32022-07-21 17:39:47 -0700111 is_running_ = true;
Austin Schuhe33c08d2022-02-03 18:15:21 -0800112}
113
James Kuszmaul86e86c32022-07-21 17:39:47 -0700114void EventScheduler::MaybeRunStopped() {
115 CHECK(is_running_);
116 is_running_ = false;
117 if (called_started_) {
118 called_started_ = false;
119 if (stopped_) {
120 stopped_();
121 }
122 }
123}
124
125void EventScheduler::MaybeRunOnStartup() {
126 CHECK(!called_started_);
127 CHECK(!is_running_);
128 const logger::BootTimestamp t =
129 FromDistributedClock(scheduler_scheduler_->distributed_now());
130 if (t.boot == boot_count_ && t.time >= monotonic_clock::epoch()) {
131 called_started_ = true;
132 RunOnStartup();
133 }
134}
135
136void EventScheduler::MaybeRunOnRun() {
137 if (called_started_) {
138 RunStarted();
139 RunOnRun();
Austin Schuhe33c08d2022-02-03 18:15:21 -0800140 }
141}
Austin Schuh58646e22021-08-23 23:51:46 -0700142
Austin Schuhac0771c2020-01-07 18:36:30 -0800143std::ostream &operator<<(std::ostream &stream,
144 const aos::distributed_clock::time_point &now) {
145 // Print it the same way we print a monotonic time. Literally.
146 stream << monotonic_clock::time_point(now.time_since_epoch());
147 return stream;
148}
149
Austin Schuh8bd96322020-02-13 21:18:22 -0800150void EventSchedulerScheduler::AddEventScheduler(EventScheduler *scheduler) {
151 CHECK(std::find(schedulers_.begin(), schedulers_.end(), scheduler) ==
152 schedulers_.end());
153 CHECK(scheduler->scheduler_scheduler_ == nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -0700154 CHECK_EQ(scheduler->node_index(), schedulers_.size());
Austin Schuh8bd96322020-02-13 21:18:22 -0800155
156 schedulers_.emplace_back(scheduler);
157 scheduler->scheduler_scheduler_ = this;
158}
159
James Kuszmaul86e86c32022-07-21 17:39:47 -0700160void EventSchedulerScheduler::MaybeRunStopped() {
161 CHECK(!is_running_);
162 for (EventScheduler *scheduler : schedulers_) {
163 if (scheduler->is_running()) {
164 scheduler->MaybeRunStopped();
165 }
166 }
167}
168
169bool EventSchedulerScheduler::RunUntil(
170 realtime_clock::time_point end_time, EventScheduler *scheduler,
171 std::function<std::chrono::nanoseconds()> fn_realtime_offset) {
172 logging::ScopedLogRestorer prev_logger;
173 MaybeRunOnStartup();
174
175 bool reached_end_time = false;
176
177 RunMaybeRealtimeLoop([this, scheduler, end_time, fn_realtime_offset,
178 &reached_end_time]() {
179 std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
180 OldestEvent();
181 aos::distributed_clock::time_point oldest_event_time_distributed =
182 std::get<0>(oldest_event);
183 logger::BootTimestamp test_time_monotonic =
184 scheduler->FromDistributedClock(oldest_event_time_distributed);
185 realtime_clock::time_point oldest_event_realtime(
186 test_time_monotonic.time_since_epoch() + fn_realtime_offset());
187
188 if ((std::get<0>(oldest_event) == distributed_clock::max_time) ||
189 (oldest_event_realtime > end_time &&
190 (reboots_.empty() ||
191 std::get<0>(reboots_.front()) > oldest_event_time_distributed))) {
192 is_running_ = false;
193 reached_end_time = true;
194
195 // We have to nudge our time back to the distributed time
196 // corresponding to our desired realtime time.
197 const aos::monotonic_clock::time_point end_monotonic =
198 aos::monotonic_clock::epoch() + end_time.time_since_epoch() -
199 fn_realtime_offset();
200 const aos::distributed_clock::time_point end_time_distributed =
201 scheduler->ToDistributedClock(end_monotonic);
202
203 now_ = end_time_distributed;
204
205 return;
206 }
207
208 if (!reboots_.empty() &&
209 std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
210 // Reboot is next.
211 CHECK_LE(now_,
212 std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
213 << ": Simulated time went backwards by too much. Please "
214 "investigate.";
215 now_ = std::get<0>(reboots_.front());
216 Reboot();
217 reboots_.erase(reboots_.begin());
218 return;
219 }
220
221 // We get to pick our tradeoffs here. Either we assume that there are
222 // no backward step changes in our time function for each node, or we
223 // have to let time go backwards. We currently only really see this
224 // happen when 2 events are scheduled for "now", time changes, and
225 // there is a nanosecond or two of rounding due to integer math.
226 //
227 // //aos/events/logging:logger_test triggers this.
228 CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
229 << ": Simulated time went backwards by too much. Please "
230 "investigate.";
231
232 now_ = std::get<0>(oldest_event);
233
234 std::get<1>(oldest_event)->CallOldestEvent();
235 });
236
237 MaybeRunStopped();
238
239 return reached_end_time;
240}
241
Austin Schuh58646e22021-08-23 23:51:46 -0700242void EventSchedulerScheduler::Reboot() {
243 const std::vector<logger::BootTimestamp> &times =
244 std::get<1>(reboots_.front());
245 CHECK_EQ(times.size(), schedulers_.size());
246
247 VLOG(1) << "Rebooting at " << now_;
248 for (const auto &time : times) {
249 VLOG(1) << " " << time;
250 }
251
252 is_running_ = false;
253
254 // Shut everything down.
255 std::vector<size_t> rebooted;
256 for (size_t node_index = 0; node_index < schedulers_.size(); ++node_index) {
257 if (schedulers_[node_index]->boot_count() == times[node_index].boot) {
258 continue;
259 } else {
260 rebooted.emplace_back(node_index);
261 CHECK_EQ(schedulers_[node_index]->boot_count() + 1,
262 times[node_index].boot);
James Kuszmaul86e86c32022-07-21 17:39:47 -0700263 schedulers_[node_index]->MaybeRunStopped();
Austin Schuh58646e22021-08-23 23:51:46 -0700264 schedulers_[node_index]->Shutdown();
265 }
266 }
267
268 // And start it back up again to reboot. When something starts back up
269 // (especially message_bridge), it could try to send stuff out. We want
270 // to move everything over to the new boot before doing that.
271 for (const size_t node_index : rebooted) {
Austin Schuh58646e22021-08-23 23:51:46 -0700272 schedulers_[node_index]->Startup();
273 }
Austin Schuh58646e22021-08-23 23:51:46 -0700274 for (const size_t node_index : rebooted) {
James Kuszmaul86e86c32022-07-21 17:39:47 -0700275 schedulers_[node_index]->MaybeRunOnRun();
Austin Schuh58646e22021-08-23 23:51:46 -0700276 }
277 is_running_ = true;
278}
279
Austin Schuh8bd96322020-02-13 21:18:22 -0800280void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
281 distributed_clock::time_point end_time = now_ + duration;
282 logging::ScopedLogRestorer prev_logger;
James Kuszmaul86e86c32022-07-21 17:39:47 -0700283 MaybeRunOnStartup();
Austin Schuh8bd96322020-02-13 21:18:22 -0800284
285 // Run all the sub-event-schedulers.
James Kuszmaulb67409b2022-06-20 16:25:03 -0700286 RunMaybeRealtimeLoop([this, end_time]() {
Austin Schuh8bd96322020-02-13 21:18:22 -0800287 std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
288 OldestEvent();
Austin Schuh58646e22021-08-23 23:51:46 -0700289 if (!reboots_.empty() &&
290 std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
291 // Reboot is next.
292 if (std::get<0>(reboots_.front()) > end_time) {
293 // Reboot is after our end time, give up.
294 is_running_ = false;
James Kuszmaulb67409b2022-06-20 16:25:03 -0700295 return;
Austin Schuh58646e22021-08-23 23:51:46 -0700296 }
297
298 CHECK_LE(now_,
299 std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
300 << ": Simulated time went backwards by too much. Please "
301 "investigate.";
302 now_ = std::get<0>(reboots_.front());
303 Reboot();
304 reboots_.erase(reboots_.begin());
James Kuszmaulb67409b2022-06-20 16:25:03 -0700305 return;
Austin Schuh58646e22021-08-23 23:51:46 -0700306 }
307
Austin Schuh8bd96322020-02-13 21:18:22 -0800308 // No events left, bail.
309 if (std::get<0>(oldest_event) == distributed_clock::max_time ||
310 std::get<0>(oldest_event) > end_time) {
311 is_running_ = false;
James Kuszmaulb67409b2022-06-20 16:25:03 -0700312 return;
Austin Schuh8bd96322020-02-13 21:18:22 -0800313 }
314
315 // We get to pick our tradeoffs here. Either we assume that there are no
316 // backward step changes in our time function for each node, or we have to
Austin Schuh2f8fd752020-09-01 22:38:28 -0700317 // let time go backwards. We currently only really see this happen when 2
318 // events are scheduled for "now", time changes, and there is a nanosecond
319 // or two of rounding due to integer math.
320 //
321 // //aos/events/logging:logger_test triggers this.
322 CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
Austin Schuh8bd96322020-02-13 21:18:22 -0800323 << ": Simulated time went backwards by too much. Please investigate.";
James Kuszmaul86e86c32022-07-21 17:39:47 -0700324 // push time forwards
Austin Schuh8bd96322020-02-13 21:18:22 -0800325 now_ = std::get<0>(oldest_event);
326
327 std::get<1>(oldest_event)->CallOldestEvent();
James Kuszmaulb67409b2022-06-20 16:25:03 -0700328 });
Austin Schuh8bd96322020-02-13 21:18:22 -0800329
330 now_ = end_time;
Austin Schuhe33c08d2022-02-03 18:15:21 -0800331
James Kuszmaul86e86c32022-07-21 17:39:47 -0700332 MaybeRunStopped();
Austin Schuh8bd96322020-02-13 21:18:22 -0800333}
334
335void EventSchedulerScheduler::Run() {
336 logging::ScopedLogRestorer prev_logger;
James Kuszmaul86e86c32022-07-21 17:39:47 -0700337 MaybeRunOnStartup();
338
339 // Run all the sub-event-schedulers.
James Kuszmaulb67409b2022-06-20 16:25:03 -0700340 RunMaybeRealtimeLoop([this]() {
Austin Schuh8bd96322020-02-13 21:18:22 -0800341 std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
342 OldestEvent();
Austin Schuh58646e22021-08-23 23:51:46 -0700343 if (!reboots_.empty() &&
344 std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
345 // Reboot is next.
346 CHECK_LE(now_,
347 std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
348 << ": Simulated time went backwards by too much. Please "
349 "investigate.";
350 now_ = std::get<0>(reboots_.front());
351 Reboot();
352 reboots_.erase(reboots_.begin());
James Kuszmaulb67409b2022-06-20 16:25:03 -0700353 return;
Austin Schuh58646e22021-08-23 23:51:46 -0700354 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800355 // No events left, bail.
356 if (std::get<0>(oldest_event) == distributed_clock::max_time) {
James Kuszmaulb67409b2022-06-20 16:25:03 -0700357 is_running_ = false;
358 return;
Austin Schuh8bd96322020-02-13 21:18:22 -0800359 }
360
361 // We get to pick our tradeoffs here. Either we assume that there are no
362 // backward step changes in our time function for each node, or we have to
Austin Schuh2f8fd752020-09-01 22:38:28 -0700363 // let time go backwards. We currently only really see this happen when 2
364 // events are scheduled for "now", time changes, and there is a nanosecond
365 // or two of rounding due to integer math.
366 //
367 // //aos/events/logging:logger_test triggers this.
368 CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
Austin Schuh8bd96322020-02-13 21:18:22 -0800369 << ": Simulated time went backwards by too much. Please investigate.";
370 now_ = std::get<0>(oldest_event);
371
372 std::get<1>(oldest_event)->CallOldestEvent();
James Kuszmaulb67409b2022-06-20 16:25:03 -0700373 });
Austin Schuhe33c08d2022-02-03 18:15:21 -0800374
James Kuszmaul86e86c32022-07-21 17:39:47 -0700375 MaybeRunStopped();
Austin Schuh8bd96322020-02-13 21:18:22 -0800376}
377
James Kuszmaulb67409b2022-06-20 16:25:03 -0700378template <typename F>
379void EventSchedulerScheduler::RunMaybeRealtimeLoop(F loop_body) {
380 internal::TimerFd timerfd;
381 CHECK_LT(0.0, replay_rate_) << "Replay rate must be positive.";
382 distributed_clock::time_point last_distributed_clock =
383 std::get<0>(OldestEvent());
384 monotonic_clock::time_point last_monotonic_clock = monotonic_clock::now();
385 timerfd.SetTime(last_monotonic_clock, std::chrono::seconds(0));
386 epoll_.OnReadable(
387 timerfd.fd(), [this, &last_distributed_clock, &last_monotonic_clock,
388 &timerfd, loop_body]() {
389 const uint64_t read_result = timerfd.Read();
390 if (!is_running_) {
391 epoll_.Quit();
392 return;
393 }
394 CHECK_EQ(read_result, 1u);
395 // Call loop_body() at least once; if we are in infinite-speed replay,
396 // we don't actually want/need the context switches from the epoll
397 // setup, so just loop.
398 // Note: The performance impacts of this code have not been carefully
399 // inspected (e.g., how much does avoiding the context-switch help; does
400 // the timerfd_settime call matter).
401 // This is deliberately written to support the user changing replay
402 // rates dynamically.
403 do {
404 loop_body();
405 if (is_running_) {
406 const monotonic_clock::time_point next_trigger =
407 last_monotonic_clock +
408 std::chrono::duration_cast<std::chrono::nanoseconds>(
409 (now_ - last_distributed_clock) / replay_rate_);
410 timerfd.SetTime(next_trigger, std::chrono::seconds(0));
411 last_monotonic_clock = next_trigger;
412 last_distributed_clock = now_;
413 } else {
414 epoll_.Quit();
415 }
416 } while (replay_rate_ == std::numeric_limits<double>::infinity() &&
417 is_running_);
418 });
419
420 epoll_.Run();
421 epoll_.DeleteFd(timerfd.fd());
422}
423
Austin Schuh8bd96322020-02-13 21:18:22 -0800424std::tuple<distributed_clock::time_point, EventScheduler *>
425EventSchedulerScheduler::OldestEvent() {
426 distributed_clock::time_point min_event_time = distributed_clock::max_time;
427 EventScheduler *min_scheduler = nullptr;
428
429 // TODO(austin): Don't linearly search... But for N=3, it is probably the
430 // fastest way to do this.
431 for (EventScheduler *scheduler : schedulers_) {
432 const monotonic_clock::time_point monotonic_event_time =
433 scheduler->OldestEvent();
434 if (monotonic_event_time != monotonic_clock::max_time) {
435 const distributed_clock::time_point event_time =
436 scheduler->ToDistributedClock(monotonic_event_time);
437 if (event_time < min_event_time) {
438 min_event_time = event_time;
439 min_scheduler = scheduler;
440 }
441 }
442 }
443
Austin Schuh87dd3832021-01-01 23:07:31 -0800444 if (min_scheduler) {
Austin Schuhc1ee1b62022-03-22 17:09:52 -0700445 VLOG(2) << "Oldest event " << min_event_time << " on scheduler "
Austin Schuh87dd3832021-01-01 23:07:31 -0800446 << min_scheduler->node_index_;
447 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800448 return std::make_tuple(min_event_time, min_scheduler);
449}
450
Austin Schuhe33c08d2022-02-03 18:15:21 -0800451void EventSchedulerScheduler::TemporarilyStopAndRun(std::function<void()> fn) {
452 const bool was_running = is_running_;
453 if (is_running_) {
454 is_running_ = false;
James Kuszmaul86e86c32022-07-21 17:39:47 -0700455 MaybeRunStopped();
Austin Schuhe33c08d2022-02-03 18:15:21 -0800456 }
457 fn();
458 if (was_running) {
James Kuszmaul86e86c32022-07-21 17:39:47 -0700459 MaybeRunOnStartup();
460 }
461}
462
463void EventSchedulerScheduler::MaybeRunOnStartup() {
464 is_running_ = true;
465 for (EventScheduler *scheduler : schedulers_) {
466 scheduler->MaybeRunOnStartup();
467 }
468 // We must trigger all the OnRun's *after* all the OnStartup callbacks are
469 // triggered because that is the contract that we have stated.
470 for (EventScheduler *scheduler : schedulers_) {
471 scheduler->MaybeRunOnRun();
Austin Schuhe33c08d2022-02-03 18:15:21 -0800472 }
473}
474
Alex Perrycb7da4b2019-08-28 19:35:56 -0700475} // namespace aos