blob: 848a1cf1244454d35a87b8a5782672ebecbe1607 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#ifndef AOS_EVENTS_EVENT_SCHEDULER_H_
2#define AOS_EVENTS_EVENT_SCHEDULER_H_
3
4#include <algorithm>
5#include <map>
6#include <memory>
7#include <unordered_set>
8#include <utility>
9#include <vector>
10
Philipp Schrader790cb542023-07-05 21:06:52 -070011#include "glog/logging.h"
12
James Kuszmaulb67409b2022-06-20 16:25:03 -070013#include "aos/events/epoll.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070014#include "aos/events/event_loop.h"
Austin Schuh58646e22021-08-23 23:51:46 -070015#include "aos/events/logging/boot_timestamp.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080016#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070017#include "aos/time/time.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070018
19namespace aos {
20
Austin Schuhac0771c2020-01-07 18:36:30 -080021// This clock is the basis for distributed time. It is used to synchronize time
22// between multiple nodes. This is a new type so conversions to and from the
23// monotonic and realtime clocks aren't implicit.
24class distributed_clock {
25 public:
26 typedef ::std::chrono::nanoseconds::rep rep;
27 typedef ::std::chrono::nanoseconds::period period;
28 typedef ::std::chrono::nanoseconds duration;
29 typedef ::std::chrono::time_point<distributed_clock> time_point;
30
31 // This clock is the base clock for the simulation and everything is synced to
32 // it. It never jumps.
33 static constexpr bool is_steady = true;
34
35 // Returns the epoch (0).
36 static constexpr time_point epoch() { return time_point(zero()); }
37
38 static constexpr duration zero() { return duration(0); }
39
40 static constexpr time_point min_time{
41 time_point(duration(::std::numeric_limits<duration::rep>::min()))};
42 static constexpr time_point max_time{
43 time_point(duration(::std::numeric_limits<duration::rep>::max()))};
44};
45
46std::ostream &operator<<(std::ostream &stream,
47 const aos::distributed_clock::time_point &now);
48
Austin Schuha9abc032021-01-01 16:46:19 -080049// Interface to handle converting time on a node to and from the distributed
50// clock accurately.
51class TimeConverter {
52 public:
53 virtual ~TimeConverter() {}
54
Austin Schuh58646e22021-08-23 23:51:46 -070055 // Returns the boot UUID for a node and boot. Note: the boot UUID for
56 // subsequent calls needs to be the same each time.
57 virtual UUID boot_uuid(size_t node_index, size_t boot_count) = 0;
58
59 void set_reboot_found(
60 std::function<void(distributed_clock::time_point,
61 const std::vector<logger::BootTimestamp> &)>
62 fn) {
63 reboot_found_ = fn;
64 }
65
Austin Schuha9abc032021-01-01 16:46:19 -080066 // Converts a time to the distributed clock for scheduling and cross-node
67 // time measurement.
68 virtual distributed_clock::time_point ToDistributedClock(
Austin Schuh58646e22021-08-23 23:51:46 -070069 size_t node_index, logger::BootTimestamp time) = 0;
Austin Schuha9abc032021-01-01 16:46:19 -080070
71 // Takes the distributed time and converts it to the monotonic clock for this
72 // node.
Austin Schuh58646e22021-08-23 23:51:46 -070073 virtual logger::BootTimestamp FromDistributedClock(
74 size_t node_index, distributed_clock::time_point time,
75 size_t boot_count) = 0;
Austin Schuhb7c8d2a2021-07-19 19:22:12 -070076
77 // Called whenever time passes this point and we can forget about it.
78 virtual void ObserveTimePassed(distributed_clock::time_point time) = 0;
Austin Schuh58646e22021-08-23 23:51:46 -070079
80 protected:
81 std::function<void(distributed_clock::time_point,
82 const std::vector<logger::BootTimestamp> &)>
83 reboot_found_;
Austin Schuha9abc032021-01-01 16:46:19 -080084};
85
Austin Schuh8bd96322020-02-13 21:18:22 -080086class EventSchedulerScheduler;
87
Alex Perrycb7da4b2019-08-28 19:35:56 -070088class EventScheduler {
89 public:
Austin Schuhef8f1ae2021-12-11 12:35:05 -080090 class Event {
91 public:
92 virtual void Handle() noexcept = 0;
93 virtual ~Event() {}
94 };
95
96 using ChannelType = std::multimap<monotonic_clock::time_point, Event *>;
Alex Perrycb7da4b2019-08-28 19:35:56 -070097 using Token = ChannelType::iterator;
Austin Schuh58646e22021-08-23 23:51:46 -070098 EventScheduler(size_t node_index) : node_index_(node_index) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -070099
Austin Schuh87dd3832021-01-01 23:07:31 -0800100 // Sets the time converter in use for this scheduler (and the corresponding
101 // node index)
102 void SetTimeConverter(size_t node_index, TimeConverter *converter) {
Austin Schuh58646e22021-08-23 23:51:46 -0700103 CHECK_EQ(node_index_, node_index);
Austin Schuh87dd3832021-01-01 23:07:31 -0800104 converter_ = converter;
105 }
106
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800107 UUID boot_uuid() { return converter_->boot_uuid(node_index_, boot_count_); }
Austin Schuh58646e22021-08-23 23:51:46 -0700108
Alex Perrycb7da4b2019-08-28 19:35:56 -0700109 // Schedule an event with a callback function
110 // Returns an iterator to the event
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800111 Token Schedule(monotonic_clock::time_point time, Event *callback);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700112
James Kuszmaul86e86c32022-07-21 17:39:47 -0700113 // Schedules a callback whenever the event scheduler starts, after we have
114 // entered the running state. Callbacks are cleared after being called once.
115 // Will not get called until a node starts (a node does not start until its
116 // monotonic clock has reached at least monotonic_clock::epoch()).
Austin Schuh39788ff2019-12-01 18:22:57 -0800117 void ScheduleOnRun(std::function<void()> callback) {
118 on_run_.emplace_back(std::move(callback));
119 }
120
James Kuszmaul86e86c32022-07-21 17:39:47 -0700121 // Schedules a callback whenever the event scheduler starts, before we have
122 // entered the running state. Callbacks are cleared after being called once.
123 // Will not get called until a node starts (a node does not start until its
124 // monotonic clock has reached at least monotonic_clock::epoch()).
Austin Schuh057d29f2021-08-21 23:05:15 -0700125 void ScheduleOnStartup(std::function<void()> callback) {
126 on_startup_.emplace_back(std::move(callback));
127 }
128
James Kuszmaul86e86c32022-07-21 17:39:47 -0700129 // Schedules a callback for whenever a node reboots, after we have exited the
130 // running state. Does not get called when the event scheduler stops (unless
131 // it is stopping to execute the reboot).
Austin Schuh58646e22021-08-23 23:51:46 -0700132 void set_on_shutdown(std::function<void()> callback) {
133 on_shutdown_ = std::move(callback);
134 }
135
James Kuszmaul86e86c32022-07-21 17:39:47 -0700136 // Identical to ScheduleOnStartup, except that only one callback may get set
137 // and it will not be cleared after being called.
Austin Schuh58646e22021-08-23 23:51:46 -0700138 void set_started(std::function<void()> callback) {
139 started_ = std::move(callback);
140 }
141
James Kuszmaul86e86c32022-07-21 17:39:47 -0700142 // Schedules a callback for whenever the scheduler exits the running state
143 // (running will be false during the callback). This includes both node
144 // reboots and the end of regular execution. Will not be called if the node
145 // never started.
Austin Schuhe33c08d2022-02-03 18:15:21 -0800146 void set_stopped(std::function<void()> callback) {
147 stopped_ = std::move(callback);
148 }
149
Alex Perrycb7da4b2019-08-28 19:35:56 -0700150 Token InvalidToken() { return events_list_.end(); }
151
152 // Deschedule an event by its iterator
153 void Deschedule(Token token);
154
Austin Schuh58646e22021-08-23 23:51:46 -0700155 // Runs the Started callback.
James Kuszmaul86e86c32022-07-21 17:39:47 -0700156 void MaybeRunStopped();
Austin Schuh58646e22021-08-23 23:51:46 -0700157
Austin Schuh8bd96322020-02-13 21:18:22 -0800158 // Returns true if events are being handled.
159 inline bool is_running() const;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700160
Austin Schuh8bd96322020-02-13 21:18:22 -0800161 // Returns the timestamp of the next event to trigger.
Austin Schuhe12b5eb2022-08-29 12:39:27 -0700162 std::pair<distributed_clock::time_point, monotonic_clock::time_point>
163 OldestEvent();
Austin Schuh8bd96322020-02-13 21:18:22 -0800164 // Handles the next event.
165 void CallOldestEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700166
Austin Schuh8bd96322020-02-13 21:18:22 -0800167 // Converts a time to the distributed clock for scheduling and cross-node time
168 // measurement.
169 distributed_clock::time_point ToDistributedClock(
170 monotonic_clock::time_point time) const {
Austin Schuh58646e22021-08-23 23:51:46 -0700171 return converter_->ToDistributedClock(node_index_,
172 {.boot = boot_count_, .time = time});
Austin Schuh8bd96322020-02-13 21:18:22 -0800173 }
174
175 // Takes the distributed time and converts it to the monotonic clock for this
176 // node.
Austin Schuh58646e22021-08-23 23:51:46 -0700177 logger::BootTimestamp FromDistributedClock(
Austin Schuh8bd96322020-02-13 21:18:22 -0800178 distributed_clock::time_point time) const {
Austin Schuh58646e22021-08-23 23:51:46 -0700179 return converter_->FromDistributedClock(node_index_, time, boot_count_);
Austin Schuh8bd96322020-02-13 21:18:22 -0800180 }
181
182 // Returns the current monotonic time on this node calculated from the
183 // distributed clock.
184 inline monotonic_clock::time_point monotonic_now() const;
185
Austin Schuh58646e22021-08-23 23:51:46 -0700186 // Returns the current monotonic time on this node calculated from the
187 // distributed clock.
188 inline distributed_clock::time_point distributed_now() const;
189
190 size_t boot_count() const { return boot_count_; }
191
192 size_t node_index() const { return node_index_; }
193
James Kuszmaul86e86c32022-07-21 17:39:47 -0700194 private:
195 friend class EventSchedulerScheduler;
196
197 // Runs the OnRun callbacks.
198 void RunOnRun();
199
200 // Runs the OnStartup callbacks.
201 void RunOnStartup() noexcept;
202
203 // Runs the Started callback.
204 void RunStarted();
205
Austin Schuh58646e22021-08-23 23:51:46 -0700206 // For implementing reboots.
207 void Shutdown();
208 void Startup();
209
James Kuszmaul86e86c32022-07-21 17:39:47 -0700210 void MaybeRunOnStartup();
211 void MaybeRunOnRun();
Austin Schuh58646e22021-08-23 23:51:46 -0700212
Alex Perrycb7da4b2019-08-28 19:35:56 -0700213 // Current execution time.
Austin Schuhbe69cf32020-08-27 11:38:33 -0700214 monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700215
Austin Schuh58646e22021-08-23 23:51:46 -0700216 size_t boot_count_ = 0;
217
Austin Schuh8bd96322020-02-13 21:18:22 -0800218 // List of functions to run (once) when running.
Austin Schuh39788ff2019-12-01 18:22:57 -0800219 std::vector<std::function<void()>> on_run_;
Austin Schuh057d29f2021-08-21 23:05:15 -0700220 std::vector<std::function<void()>> on_startup_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800221
Alex Perrycb7da4b2019-08-28 19:35:56 -0700222 // Multimap holding times to run functions. These are stored in order, and
223 // the order is the callback tree.
224 ChannelType events_list_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800225
226 // Pointer to the actual scheduler.
227 EventSchedulerScheduler *scheduler_scheduler_ = nullptr;
Austin Schuh87dd3832021-01-01 23:07:31 -0800228
229 // Node index handle to be handed back to the TimeConverter. This lets the
230 // same time converter be used for all the nodes, and the node index
231 // distinguish which one.
232 size_t node_index_ = 0;
233
James Kuszmaul86e86c32022-07-21 17:39:47 -0700234 // Whether this individual scheduler is currently running.
235 bool is_running_ = false;
236 // Whether we have called all the startup handlers during this boot.
237 bool called_started_ = false;
Austin Schuhe12b5eb2022-08-29 12:39:27 -0700238 std::optional<distributed_clock::time_point> cached_epoch_;
239 monotonic_clock::time_point cached_event_list_monotonic_time_ =
240 monotonic_clock::max_time;
241 distributed_clock::time_point cached_event_list_time_ =
242 distributed_clock::max_time;
James Kuszmaul86e86c32022-07-21 17:39:47 -0700243
244 std::function<void()> started_;
245 std::function<void()> stopped_;
246 std::function<void()> on_shutdown_;
247
Austin Schuh87dd3832021-01-01 23:07:31 -0800248 // Converts time by doing nothing to it.
249 class UnityConverter final : public TimeConverter {
250 public:
251 distributed_clock::time_point ToDistributedClock(
Austin Schuh58646e22021-08-23 23:51:46 -0700252 size_t /*node_index*/, logger::BootTimestamp time) override {
253 CHECK_EQ(time.boot, 0u) << ": Reboots unsupported by default.";
254 return distributed_clock::epoch() + time.time.time_since_epoch();
Austin Schuh87dd3832021-01-01 23:07:31 -0800255 }
256
Austin Schuh58646e22021-08-23 23:51:46 -0700257 logger::BootTimestamp FromDistributedClock(
258 size_t /*node_index*/, distributed_clock::time_point time,
259 size_t boot_count) override {
260 CHECK_EQ(boot_count, 0u);
261 return logger::BootTimestamp{
262 .boot = boot_count,
263 .time = monotonic_clock::epoch() + time.time_since_epoch()};
Austin Schuh87dd3832021-01-01 23:07:31 -0800264 }
Austin Schuhb7c8d2a2021-07-19 19:22:12 -0700265
266 void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
Austin Schuh58646e22021-08-23 23:51:46 -0700267
268 UUID boot_uuid(size_t /*node_index*/, size_t boot_count) override {
269 CHECK_EQ(boot_count, 0u);
270 return uuid_;
271 }
272
273 private:
274 const UUID uuid_ = UUID::Random();
Austin Schuh87dd3832021-01-01 23:07:31 -0800275 };
276
277 UnityConverter unity_converter_;
278
279 TimeConverter *converter_ = &unity_converter_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700280};
281
Austin Schuh8bd96322020-02-13 21:18:22 -0800282// We need a heap of heaps...
283//
284// Events in a node have a very well defined progression of time. It is linear
285// and well represented by the monotonic clock.
286//
287// Events across nodes don't follow this well. Time skews between the two nodes
288// all the time. We also don't know the function ahead of time which converts
289// from each node's monotonic clock to the distributed clock (our unified base
290// time which is likely the average time between nodes).
291//
292// This pushes us towards merge sort. Sorting each node's events with a heap
293// like we used to be doing, and then sorting each of those nodes independently.
294class EventSchedulerScheduler {
295 public:
296 // Adds an event scheduler to the list.
297 void AddEventScheduler(EventScheduler *scheduler);
298
299 // Runs until there are no more events or Exit is called.
300 void Run();
301
302 // Stops running.
303 void Exit() { is_running_ = false; }
304
Austin Schuh8bd96322020-02-13 21:18:22 -0800305 // Runs for a duration on the distributed clock. Time on the distributed
306 // clock should be very representative of time on each node, but won't be
307 // exactly the same.
308 void RunFor(distributed_clock::duration duration);
309
James Kuszmaulb67409b2022-06-20 16:25:03 -0700310 // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
311 // try to play events in realtime. 0.5 will run at half speed. Use infinity
312 // (the default) to run as fast as possible. This can be changed during
313 // run-time.
314 void SetReplayRate(double replay_rate) { replay_rate_ = replay_rate; }
315 internal::EPoll *epoll() { return &epoll_; }
316
James Kuszmaul86e86c32022-07-21 17:39:47 -0700317 // Run until time. fn_realtime_offset is a function that returns the
318 // realtime offset.
319 // Returns true if it ran until time (i.e., Exit() was not called before
320 // end_time).
321 bool RunUntil(realtime_clock::time_point end_time, EventScheduler *scheduler,
322 std::function<std::chrono::nanoseconds()> fn_realtime_offset);
323
Austin Schuh8bd96322020-02-13 21:18:22 -0800324 // Returns the current distributed time.
325 distributed_clock::time_point distributed_now() const { return now_; }
326
Austin Schuh58646e22021-08-23 23:51:46 -0700327 void SetTimeConverter(TimeConverter *time_converter) {
328 time_converter->set_reboot_found(
329 [this](distributed_clock::time_point reboot_time,
330 const std::vector<logger::BootTimestamp> &node_times) {
331 if (!reboots_.empty()) {
332 CHECK_GT(reboot_time, std::get<0>(reboots_.back()));
333 }
334 reboots_.emplace_back(reboot_time, node_times);
335 });
Austin Schuh057d29f2021-08-21 23:05:15 -0700336 }
337
Austin Schuhe33c08d2022-02-03 18:15:21 -0800338 // Runs the provided callback now. Stops everything, runs the callback, then
339 // starts it all up again. This lets us do operations like starting and
340 // stopping applications while running.
341 void TemporarilyStopAndRun(std::function<void()> fn);
342
Austin Schuh8bd96322020-02-13 21:18:22 -0800343 private:
Austin Schuh58646e22021-08-23 23:51:46 -0700344 void Reboot();
345
James Kuszmaul86e86c32022-07-21 17:39:47 -0700346 void MaybeRunStopped();
347 void MaybeRunOnStartup();
348
Austin Schuh8bd96322020-02-13 21:18:22 -0800349 // Returns the next event time and scheduler on which to run it.
350 std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
351
James Kuszmaulb67409b2022-06-20 16:25:03 -0700352 // Handles running loop_body repeatedly until complete. loop_body should
353 // return the next time at which it wants to be called, and set is_running_ to
354 // false once we should stop.
355 template <typename F>
356 void RunMaybeRealtimeLoop(F loop_body);
357
Austin Schuh8bd96322020-02-13 21:18:22 -0800358 // True if we are running.
359 bool is_running_ = false;
360 // The current time.
361 distributed_clock::time_point now_ = distributed_clock::epoch();
362 // List of schedulers to run in sync.
363 std::vector<EventScheduler *> schedulers_;
Austin Schuh58646e22021-08-23 23:51:46 -0700364
365 // List of when to reboot each node.
366 std::vector<std::tuple<distributed_clock::time_point,
367 std::vector<logger::BootTimestamp>>>
368 reboots_;
James Kuszmaulb67409b2022-06-20 16:25:03 -0700369
370 double replay_rate_ = std::numeric_limits<double>::infinity();
371 internal::EPoll epoll_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800372};
373
Austin Schuh58646e22021-08-23 23:51:46 -0700374inline distributed_clock::time_point EventScheduler::distributed_now() const {
375 return scheduler_scheduler_->distributed_now();
376}
Austin Schuh8bd96322020-02-13 21:18:22 -0800377inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
Austin Schuh58646e22021-08-23 23:51:46 -0700378 const logger::BootTimestamp t =
379 FromDistributedClock(scheduler_scheduler_->distributed_now());
Austin Schuh60e77942022-05-16 17:48:24 -0700380 CHECK_EQ(t.boot, boot_count_)
381 << ": "
382 << " " << t << " d " << scheduler_scheduler_->distributed_now();
Austin Schuh58646e22021-08-23 23:51:46 -0700383 return t.time;
Austin Schuh8bd96322020-02-13 21:18:22 -0800384}
385
James Kuszmaul86e86c32022-07-21 17:39:47 -0700386inline bool EventScheduler::is_running() const { return is_running_; }
Austin Schuh8bd96322020-02-13 21:18:22 -0800387
Alex Perrycb7da4b2019-08-28 19:35:56 -0700388} // namespace aos
389
390#endif // AOS_EVENTS_EVENT_SCHEDULER_H_