blob: e7b9641d3426f4f557922160ed9a926a0ae51d1c [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/event_scheduler.h"
2
3#include <algorithm>
4#include <deque>
5
6#include "aos/events/event_loop.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08007#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -07008
9namespace aos {
10
Austin Schuhef8f1ae2021-12-11 12:35:05 -080011EventScheduler::Token EventScheduler::Schedule(monotonic_clock::time_point time,
12 Event *callback) {
Alex Perrycb7da4b2019-08-28 19:35:56 -070013 return events_list_.emplace(time, callback);
14}
15
16void EventScheduler::Deschedule(EventScheduler::Token token) {
Brian Silverman7026e2d2021-11-11 16:15:35 -080017 // We basically want to DCHECK some nontrivial logic. Guard it with NDEBUG to
18 // ensure the compiler realizes it's all unnecessary when not doing debug
19 // checks.
Brian Silvermanbd405c02020-06-23 16:25:23 -070020#ifndef NDEBUG
21 {
22 bool found = false;
23 auto i = events_list_.begin();
24 while (i != events_list_.end()) {
25 if (i == token) {
26 CHECK(!found) << ": The same iterator is in the multimap twice??";
27 found = true;
28 }
29 ++i;
30 }
31 CHECK(found) << ": Trying to deschedule an event which is not scheduled";
32 }
33#endif
Alex Perrycb7da4b2019-08-28 19:35:56 -070034 events_list_.erase(token);
35}
36
Austin Schuh8bd96322020-02-13 21:18:22 -080037aos::monotonic_clock::time_point EventScheduler::OldestEvent() {
38 if (events_list_.empty()) {
39 return monotonic_clock::max_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080040 }
Austin Schuh8bd96322020-02-13 21:18:22 -080041
42 return events_list_.begin()->first;
Alex Perrycb7da4b2019-08-28 19:35:56 -070043}
44
Brian Silverman7026e2d2021-11-11 16:15:35 -080045void EventScheduler::Shutdown() { on_shutdown_(); }
Austin Schuh58646e22021-08-23 23:51:46 -070046
47void EventScheduler::Startup() {
48 ++boot_count_;
49 RunOnStartup();
50}
51
Austin Schuh8bd96322020-02-13 21:18:22 -080052void EventScheduler::CallOldestEvent() {
53 CHECK_GT(events_list_.size(), 0u);
54 auto iter = events_list_.begin();
Austin Schuh58646e22021-08-23 23:51:46 -070055 const logger::BootTimestamp t =
56 FromDistributedClock(scheduler_scheduler_->distributed_now());
Austin Schuhc1ee1b62022-03-22 17:09:52 -070057 VLOG(2) << "Got time back " << t;
Austin Schuh58646e22021-08-23 23:51:46 -070058 CHECK_EQ(t.boot, boot_count_);
59 CHECK_EQ(t.time, iter->first) << ": Time is wrong on node " << node_index_;
Austin Schuh8bd96322020-02-13 21:18:22 -080060
Austin Schuhef8f1ae2021-12-11 12:35:05 -080061 Event *callback = iter->second;
Austin Schuh8bd96322020-02-13 21:18:22 -080062 events_list_.erase(iter);
Austin Schuhef8f1ae2021-12-11 12:35:05 -080063 callback->Handle();
Austin Schuhb7c8d2a2021-07-19 19:22:12 -070064
65 converter_->ObserveTimePassed(scheduler_scheduler_->distributed_now());
Austin Schuh8bd96322020-02-13 21:18:22 -080066}
67
68void EventScheduler::RunOnRun() {
Austin Schuhe33c08d2022-02-03 18:15:21 -080069 while (!on_run_.empty()) {
70 std::function<void()> fn = std::move(*on_run_.begin());
71 on_run_.erase(on_run_.begin());
72 fn();
Austin Schuh39788ff2019-12-01 18:22:57 -080073 }
Alex Perrycb7da4b2019-08-28 19:35:56 -070074}
75
Austin Schuhe33c08d2022-02-03 18:15:21 -080076void EventScheduler::RunOnStartup() noexcept {
77 while (!on_startup_.empty()) {
78 std::function<void()> fn = std::move(*on_startup_.begin());
79 on_startup_.erase(on_startup_.begin());
80 fn();
Austin Schuh057d29f2021-08-21 23:05:15 -070081 }
Austin Schuh057d29f2021-08-21 23:05:15 -070082}
83
Austin Schuhe33c08d2022-02-03 18:15:21 -080084void EventScheduler::RunStarted() {
85 if (started_) {
86 started_();
87 }
88}
89
90void EventScheduler::RunStopped() {
91 if (stopped_) {
92 stopped_();
93 }
94}
Austin Schuh58646e22021-08-23 23:51:46 -070095
Austin Schuhac0771c2020-01-07 18:36:30 -080096std::ostream &operator<<(std::ostream &stream,
97 const aos::distributed_clock::time_point &now) {
98 // Print it the same way we print a monotonic time. Literally.
99 stream << monotonic_clock::time_point(now.time_since_epoch());
100 return stream;
101}
102
Austin Schuh8bd96322020-02-13 21:18:22 -0800103void EventSchedulerScheduler::AddEventScheduler(EventScheduler *scheduler) {
104 CHECK(std::find(schedulers_.begin(), schedulers_.end(), scheduler) ==
105 schedulers_.end());
106 CHECK(scheduler->scheduler_scheduler_ == nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -0700107 CHECK_EQ(scheduler->node_index(), schedulers_.size());
Austin Schuh8bd96322020-02-13 21:18:22 -0800108
109 schedulers_.emplace_back(scheduler);
110 scheduler->scheduler_scheduler_ = this;
111}
112
Austin Schuh58646e22021-08-23 23:51:46 -0700113void EventSchedulerScheduler::Reboot() {
114 const std::vector<logger::BootTimestamp> &times =
115 std::get<1>(reboots_.front());
116 CHECK_EQ(times.size(), schedulers_.size());
117
118 VLOG(1) << "Rebooting at " << now_;
119 for (const auto &time : times) {
120 VLOG(1) << " " << time;
121 }
122
123 is_running_ = false;
124
125 // Shut everything down.
126 std::vector<size_t> rebooted;
127 for (size_t node_index = 0; node_index < schedulers_.size(); ++node_index) {
128 if (schedulers_[node_index]->boot_count() == times[node_index].boot) {
129 continue;
130 } else {
131 rebooted.emplace_back(node_index);
132 CHECK_EQ(schedulers_[node_index]->boot_count() + 1,
133 times[node_index].boot);
Austin Schuhe33c08d2022-02-03 18:15:21 -0800134 schedulers_[node_index]->RunStopped();
Austin Schuh58646e22021-08-23 23:51:46 -0700135 schedulers_[node_index]->Shutdown();
136 }
137 }
138
139 // And start it back up again to reboot. When something starts back up
140 // (especially message_bridge), it could try to send stuff out. We want
141 // to move everything over to the new boot before doing that.
142 for (const size_t node_index : rebooted) {
143 CHECK_EQ(schedulers_[node_index]->boot_count() + 1, times[node_index].boot);
144 schedulers_[node_index]->Startup();
145 }
146
147 for (const size_t node_index : rebooted) {
148 schedulers_[node_index]->RunStarted();
149 }
150
151 for (const size_t node_index : rebooted) {
152 schedulers_[node_index]->RunOnRun();
153 }
154 is_running_ = true;
155}
156
Austin Schuh8bd96322020-02-13 21:18:22 -0800157void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
158 distributed_clock::time_point end_time = now_ + duration;
159 logging::ScopedLogRestorer prev_logger;
Austin Schuhe33c08d2022-02-03 18:15:21 -0800160 RunOnStartup();
Austin Schuh8bd96322020-02-13 21:18:22 -0800161 RunOnRun();
162
163 // Run all the sub-event-schedulers.
James Kuszmaulb67409b2022-06-20 16:25:03 -0700164 RunMaybeRealtimeLoop([this, end_time]() {
Austin Schuh8bd96322020-02-13 21:18:22 -0800165 std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
166 OldestEvent();
Austin Schuh58646e22021-08-23 23:51:46 -0700167 if (!reboots_.empty() &&
168 std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
169 // Reboot is next.
170 if (std::get<0>(reboots_.front()) > end_time) {
171 // Reboot is after our end time, give up.
172 is_running_ = false;
James Kuszmaulb67409b2022-06-20 16:25:03 -0700173 return;
Austin Schuh58646e22021-08-23 23:51:46 -0700174 }
175
176 CHECK_LE(now_,
177 std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
178 << ": Simulated time went backwards by too much. Please "
179 "investigate.";
180 now_ = std::get<0>(reboots_.front());
181 Reboot();
182 reboots_.erase(reboots_.begin());
James Kuszmaulb67409b2022-06-20 16:25:03 -0700183 return;
Austin Schuh58646e22021-08-23 23:51:46 -0700184 }
185
Austin Schuh8bd96322020-02-13 21:18:22 -0800186 // No events left, bail.
187 if (std::get<0>(oldest_event) == distributed_clock::max_time ||
188 std::get<0>(oldest_event) > end_time) {
189 is_running_ = false;
James Kuszmaulb67409b2022-06-20 16:25:03 -0700190 return;
Austin Schuh8bd96322020-02-13 21:18:22 -0800191 }
192
193 // We get to pick our tradeoffs here. Either we assume that there are no
194 // backward step changes in our time function for each node, or we have to
Austin Schuh2f8fd752020-09-01 22:38:28 -0700195 // let time go backwards. We currently only really see this happen when 2
196 // events are scheduled for "now", time changes, and there is a nanosecond
197 // or two of rounding due to integer math.
198 //
199 // //aos/events/logging:logger_test triggers this.
200 CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
Austin Schuh8bd96322020-02-13 21:18:22 -0800201 << ": Simulated time went backwards by too much. Please investigate.";
202 now_ = std::get<0>(oldest_event);
203
204 std::get<1>(oldest_event)->CallOldestEvent();
James Kuszmaulb67409b2022-06-20 16:25:03 -0700205 });
Austin Schuh8bd96322020-02-13 21:18:22 -0800206
207 now_ = end_time;
Austin Schuhe33c08d2022-02-03 18:15:21 -0800208
209 RunStopped();
Austin Schuh8bd96322020-02-13 21:18:22 -0800210}
211
212void EventSchedulerScheduler::Run() {
213 logging::ScopedLogRestorer prev_logger;
Austin Schuhe33c08d2022-02-03 18:15:21 -0800214 RunOnStartup();
Austin Schuh8bd96322020-02-13 21:18:22 -0800215 RunOnRun();
James Kuszmaulb67409b2022-06-20 16:25:03 -0700216 RunMaybeRealtimeLoop([this]() {
217 // Run all the sub-event-schedulers.
Austin Schuh8bd96322020-02-13 21:18:22 -0800218 std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
219 OldestEvent();
Austin Schuh58646e22021-08-23 23:51:46 -0700220 if (!reboots_.empty() &&
221 std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
222 // Reboot is next.
223 CHECK_LE(now_,
224 std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
225 << ": Simulated time went backwards by too much. Please "
226 "investigate.";
227 now_ = std::get<0>(reboots_.front());
228 Reboot();
229 reboots_.erase(reboots_.begin());
James Kuszmaulb67409b2022-06-20 16:25:03 -0700230 return;
Austin Schuh58646e22021-08-23 23:51:46 -0700231 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800232 // No events left, bail.
233 if (std::get<0>(oldest_event) == distributed_clock::max_time) {
James Kuszmaulb67409b2022-06-20 16:25:03 -0700234 is_running_ = false;
235 return;
Austin Schuh8bd96322020-02-13 21:18:22 -0800236 }
237
238 // We get to pick our tradeoffs here. Either we assume that there are no
239 // backward step changes in our time function for each node, or we have to
Austin Schuh2f8fd752020-09-01 22:38:28 -0700240 // let time go backwards. We currently only really see this happen when 2
241 // events are scheduled for "now", time changes, and there is a nanosecond
242 // or two of rounding due to integer math.
243 //
244 // //aos/events/logging:logger_test triggers this.
245 CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
Austin Schuh8bd96322020-02-13 21:18:22 -0800246 << ": Simulated time went backwards by too much. Please investigate.";
247 now_ = std::get<0>(oldest_event);
248
249 std::get<1>(oldest_event)->CallOldestEvent();
James Kuszmaulb67409b2022-06-20 16:25:03 -0700250 });
Austin Schuhe33c08d2022-02-03 18:15:21 -0800251
252 RunStopped();
Austin Schuh8bd96322020-02-13 21:18:22 -0800253}
254
James Kuszmaulb67409b2022-06-20 16:25:03 -0700255template <typename F>
256void EventSchedulerScheduler::RunMaybeRealtimeLoop(F loop_body) {
257 internal::TimerFd timerfd;
258 CHECK_LT(0.0, replay_rate_) << "Replay rate must be positive.";
259 distributed_clock::time_point last_distributed_clock =
260 std::get<0>(OldestEvent());
261 monotonic_clock::time_point last_monotonic_clock = monotonic_clock::now();
262 timerfd.SetTime(last_monotonic_clock, std::chrono::seconds(0));
263 epoll_.OnReadable(
264 timerfd.fd(), [this, &last_distributed_clock, &last_monotonic_clock,
265 &timerfd, loop_body]() {
266 const uint64_t read_result = timerfd.Read();
267 if (!is_running_) {
268 epoll_.Quit();
269 return;
270 }
271 CHECK_EQ(read_result, 1u);
272 // Call loop_body() at least once; if we are in infinite-speed replay,
273 // we don't actually want/need the context switches from the epoll
274 // setup, so just loop.
275 // Note: The performance impacts of this code have not been carefully
276 // inspected (e.g., how much does avoiding the context-switch help; does
277 // the timerfd_settime call matter).
278 // This is deliberately written to support the user changing replay
279 // rates dynamically.
280 do {
281 loop_body();
282 if (is_running_) {
283 const monotonic_clock::time_point next_trigger =
284 last_monotonic_clock +
285 std::chrono::duration_cast<std::chrono::nanoseconds>(
286 (now_ - last_distributed_clock) / replay_rate_);
287 timerfd.SetTime(next_trigger, std::chrono::seconds(0));
288 last_monotonic_clock = next_trigger;
289 last_distributed_clock = now_;
290 } else {
291 epoll_.Quit();
292 }
293 } while (replay_rate_ == std::numeric_limits<double>::infinity() &&
294 is_running_);
295 });
296
297 epoll_.Run();
298 epoll_.DeleteFd(timerfd.fd());
299}
300
Austin Schuh8bd96322020-02-13 21:18:22 -0800301std::tuple<distributed_clock::time_point, EventScheduler *>
302EventSchedulerScheduler::OldestEvent() {
303 distributed_clock::time_point min_event_time = distributed_clock::max_time;
304 EventScheduler *min_scheduler = nullptr;
305
306 // TODO(austin): Don't linearly search... But for N=3, it is probably the
307 // fastest way to do this.
308 for (EventScheduler *scheduler : schedulers_) {
309 const monotonic_clock::time_point monotonic_event_time =
310 scheduler->OldestEvent();
311 if (monotonic_event_time != monotonic_clock::max_time) {
312 const distributed_clock::time_point event_time =
313 scheduler->ToDistributedClock(monotonic_event_time);
314 if (event_time < min_event_time) {
315 min_event_time = event_time;
316 min_scheduler = scheduler;
317 }
318 }
319 }
320
Austin Schuh87dd3832021-01-01 23:07:31 -0800321 if (min_scheduler) {
Austin Schuhc1ee1b62022-03-22 17:09:52 -0700322 VLOG(2) << "Oldest event " << min_event_time << " on scheduler "
Austin Schuh87dd3832021-01-01 23:07:31 -0800323 << min_scheduler->node_index_;
324 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800325 return std::make_tuple(min_event_time, min_scheduler);
326}
327
Austin Schuhe33c08d2022-02-03 18:15:21 -0800328void EventSchedulerScheduler::TemporarilyStopAndRun(std::function<void()> fn) {
329 const bool was_running = is_running_;
330 if (is_running_) {
331 is_running_ = false;
332 RunStopped();
333 }
334 fn();
335 if (was_running) {
336 RunOnStartup();
337 RunOnRun();
338 }
339}
340
Alex Perrycb7da4b2019-08-28 19:35:56 -0700341} // namespace aos