blob: c5081404a5a0da618319e1eebab0c6b6022f4eaa [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#ifndef AOS_EVENTS_EVENT_SCHEDULER_H_
2#define AOS_EVENTS_EVENT_SCHEDULER_H_
3
4#include <algorithm>
5#include <map>
6#include <memory>
7#include <unordered_set>
8#include <utility>
9#include <vector>
10
James Kuszmaulb67409b2022-06-20 16:25:03 -070011#include "aos/events/epoll.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070012#include "aos/events/event_loop.h"
Austin Schuh58646e22021-08-23 23:51:46 -070013#include "aos/events/logging/boot_timestamp.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080014#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070015#include "aos/time/time.h"
16#include "glog/logging.h"
17
18namespace aos {
19
Austin Schuhac0771c2020-01-07 18:36:30 -080020// This clock is the basis for distributed time. It is used to synchronize time
21// between multiple nodes. This is a new type so conversions to and from the
22// monotonic and realtime clocks aren't implicit.
23class distributed_clock {
24 public:
25 typedef ::std::chrono::nanoseconds::rep rep;
26 typedef ::std::chrono::nanoseconds::period period;
27 typedef ::std::chrono::nanoseconds duration;
28 typedef ::std::chrono::time_point<distributed_clock> time_point;
29
30 // This clock is the base clock for the simulation and everything is synced to
31 // it. It never jumps.
32 static constexpr bool is_steady = true;
33
34 // Returns the epoch (0).
35 static constexpr time_point epoch() { return time_point(zero()); }
36
37 static constexpr duration zero() { return duration(0); }
38
39 static constexpr time_point min_time{
40 time_point(duration(::std::numeric_limits<duration::rep>::min()))};
41 static constexpr time_point max_time{
42 time_point(duration(::std::numeric_limits<duration::rep>::max()))};
43};
44
45std::ostream &operator<<(std::ostream &stream,
46 const aos::distributed_clock::time_point &now);
47
Austin Schuha9abc032021-01-01 16:46:19 -080048// Interface to handle converting time on a node to and from the distributed
49// clock accurately.
50class TimeConverter {
51 public:
52 virtual ~TimeConverter() {}
53
Austin Schuh58646e22021-08-23 23:51:46 -070054 // Returns the boot UUID for a node and boot. Note: the boot UUID for
55 // subsequent calls needs to be the same each time.
56 virtual UUID boot_uuid(size_t node_index, size_t boot_count) = 0;
57
58 void set_reboot_found(
59 std::function<void(distributed_clock::time_point,
60 const std::vector<logger::BootTimestamp> &)>
61 fn) {
62 reboot_found_ = fn;
63 }
64
Austin Schuha9abc032021-01-01 16:46:19 -080065 // Converts a time to the distributed clock for scheduling and cross-node
66 // time measurement.
67 virtual distributed_clock::time_point ToDistributedClock(
Austin Schuh58646e22021-08-23 23:51:46 -070068 size_t node_index, logger::BootTimestamp time) = 0;
Austin Schuha9abc032021-01-01 16:46:19 -080069
70 // Takes the distributed time and converts it to the monotonic clock for this
71 // node.
Austin Schuh58646e22021-08-23 23:51:46 -070072 virtual logger::BootTimestamp FromDistributedClock(
73 size_t node_index, distributed_clock::time_point time,
74 size_t boot_count) = 0;
Austin Schuhb7c8d2a2021-07-19 19:22:12 -070075
76 // Called whenever time passes this point and we can forget about it.
77 virtual void ObserveTimePassed(distributed_clock::time_point time) = 0;
Austin Schuh58646e22021-08-23 23:51:46 -070078
79 protected:
80 std::function<void(distributed_clock::time_point,
81 const std::vector<logger::BootTimestamp> &)>
82 reboot_found_;
Austin Schuha9abc032021-01-01 16:46:19 -080083};
84
Austin Schuh8bd96322020-02-13 21:18:22 -080085class EventSchedulerScheduler;
86
Alex Perrycb7da4b2019-08-28 19:35:56 -070087class EventScheduler {
88 public:
Austin Schuhef8f1ae2021-12-11 12:35:05 -080089 class Event {
90 public:
91 virtual void Handle() noexcept = 0;
92 virtual ~Event() {}
93 };
94
95 using ChannelType = std::multimap<monotonic_clock::time_point, Event *>;
Alex Perrycb7da4b2019-08-28 19:35:56 -070096 using Token = ChannelType::iterator;
Austin Schuh58646e22021-08-23 23:51:46 -070097 EventScheduler(size_t node_index) : node_index_(node_index) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -070098
Austin Schuh87dd3832021-01-01 23:07:31 -080099 // Sets the time converter in use for this scheduler (and the corresponding
100 // node index)
101 void SetTimeConverter(size_t node_index, TimeConverter *converter) {
Austin Schuh58646e22021-08-23 23:51:46 -0700102 CHECK_EQ(node_index_, node_index);
Austin Schuh87dd3832021-01-01 23:07:31 -0800103 converter_ = converter;
104 }
105
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800106 UUID boot_uuid() { return converter_->boot_uuid(node_index_, boot_count_); }
Austin Schuh58646e22021-08-23 23:51:46 -0700107
Alex Perrycb7da4b2019-08-28 19:35:56 -0700108 // Schedule an event with a callback function
109 // Returns an iterator to the event
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800110 Token Schedule(monotonic_clock::time_point time, Event *callback);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700111
James Kuszmaul86e86c32022-07-21 17:39:47 -0700112 // Schedules a callback whenever the event scheduler starts, after we have
113 // entered the running state. Callbacks are cleared after being called once.
114 // Will not get called until a node starts (a node does not start until its
115 // monotonic clock has reached at least monotonic_clock::epoch()).
Austin Schuh39788ff2019-12-01 18:22:57 -0800116 void ScheduleOnRun(std::function<void()> callback) {
117 on_run_.emplace_back(std::move(callback));
118 }
119
James Kuszmaul86e86c32022-07-21 17:39:47 -0700120 // Schedules a callback whenever the event scheduler starts, before we have
121 // entered the running state. Callbacks are cleared after being called once.
122 // Will not get called until a node starts (a node does not start until its
123 // monotonic clock has reached at least monotonic_clock::epoch()).
Austin Schuh057d29f2021-08-21 23:05:15 -0700124 void ScheduleOnStartup(std::function<void()> callback) {
125 on_startup_.emplace_back(std::move(callback));
126 }
127
James Kuszmaul86e86c32022-07-21 17:39:47 -0700128 // Schedules a callback for whenever a node reboots, after we have exited the
129 // running state. Does not get called when the event scheduler stops (unless
130 // it is stopping to execute the reboot).
Austin Schuh58646e22021-08-23 23:51:46 -0700131 void set_on_shutdown(std::function<void()> callback) {
132 on_shutdown_ = std::move(callback);
133 }
134
James Kuszmaul86e86c32022-07-21 17:39:47 -0700135 // Identical to ScheduleOnStartup, except that only one callback may get set
136 // and it will not be cleared after being called.
Austin Schuh58646e22021-08-23 23:51:46 -0700137 void set_started(std::function<void()> callback) {
138 started_ = std::move(callback);
139 }
140
James Kuszmaul86e86c32022-07-21 17:39:47 -0700141 // Schedules a callback for whenever the scheduler exits the running state
142 // (running will be false during the callback). This includes both node
143 // reboots and the end of regular execution. Will not be called if the node
144 // never started.
Austin Schuhe33c08d2022-02-03 18:15:21 -0800145 void set_stopped(std::function<void()> callback) {
146 stopped_ = std::move(callback);
147 }
148
Alex Perrycb7da4b2019-08-28 19:35:56 -0700149 Token InvalidToken() { return events_list_.end(); }
150
151 // Deschedule an event by its iterator
152 void Deschedule(Token token);
153
Austin Schuh58646e22021-08-23 23:51:46 -0700154 // Runs the Started callback.
James Kuszmaul86e86c32022-07-21 17:39:47 -0700155 void MaybeRunStopped();
Austin Schuh58646e22021-08-23 23:51:46 -0700156
Austin Schuh8bd96322020-02-13 21:18:22 -0800157 // Returns true if events are being handled.
158 inline bool is_running() const;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700159
Austin Schuh8bd96322020-02-13 21:18:22 -0800160 // Returns the timestamp of the next event to trigger.
Austin Schuhe12b5eb2022-08-29 12:39:27 -0700161 std::pair<distributed_clock::time_point, monotonic_clock::time_point>
162 OldestEvent();
Austin Schuh8bd96322020-02-13 21:18:22 -0800163 // Handles the next event.
164 void CallOldestEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700165
Austin Schuh8bd96322020-02-13 21:18:22 -0800166 // Converts a time to the distributed clock for scheduling and cross-node time
167 // measurement.
168 distributed_clock::time_point ToDistributedClock(
169 monotonic_clock::time_point time) const {
Austin Schuh58646e22021-08-23 23:51:46 -0700170 return converter_->ToDistributedClock(node_index_,
171 {.boot = boot_count_, .time = time});
Austin Schuh8bd96322020-02-13 21:18:22 -0800172 }
173
174 // Takes the distributed time and converts it to the monotonic clock for this
175 // node.
Austin Schuh58646e22021-08-23 23:51:46 -0700176 logger::BootTimestamp FromDistributedClock(
Austin Schuh8bd96322020-02-13 21:18:22 -0800177 distributed_clock::time_point time) const {
Austin Schuh58646e22021-08-23 23:51:46 -0700178 return converter_->FromDistributedClock(node_index_, time, boot_count_);
Austin Schuh8bd96322020-02-13 21:18:22 -0800179 }
180
181 // Returns the current monotonic time on this node calculated from the
182 // distributed clock.
183 inline monotonic_clock::time_point monotonic_now() const;
184
Austin Schuh58646e22021-08-23 23:51:46 -0700185 // Returns the current monotonic time on this node calculated from the
186 // distributed clock.
187 inline distributed_clock::time_point distributed_now() const;
188
189 size_t boot_count() const { return boot_count_; }
190
191 size_t node_index() const { return node_index_; }
192
James Kuszmaul86e86c32022-07-21 17:39:47 -0700193 private:
194 friend class EventSchedulerScheduler;
195
196 // Runs the OnRun callbacks.
197 void RunOnRun();
198
199 // Runs the OnStartup callbacks.
200 void RunOnStartup() noexcept;
201
202 // Runs the Started callback.
203 void RunStarted();
204
Austin Schuh58646e22021-08-23 23:51:46 -0700205 // For implementing reboots.
206 void Shutdown();
207 void Startup();
208
James Kuszmaul86e86c32022-07-21 17:39:47 -0700209 void MaybeRunOnStartup();
210 void MaybeRunOnRun();
Austin Schuh58646e22021-08-23 23:51:46 -0700211
Alex Perrycb7da4b2019-08-28 19:35:56 -0700212 // Current execution time.
Austin Schuhbe69cf32020-08-27 11:38:33 -0700213 monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700214
Austin Schuh58646e22021-08-23 23:51:46 -0700215 size_t boot_count_ = 0;
216
Austin Schuh8bd96322020-02-13 21:18:22 -0800217 // List of functions to run (once) when running.
Austin Schuh39788ff2019-12-01 18:22:57 -0800218 std::vector<std::function<void()>> on_run_;
Austin Schuh057d29f2021-08-21 23:05:15 -0700219 std::vector<std::function<void()>> on_startup_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800220
Alex Perrycb7da4b2019-08-28 19:35:56 -0700221 // Multimap holding times to run functions. These are stored in order, and
222 // the order is the callback tree.
223 ChannelType events_list_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800224
225 // Pointer to the actual scheduler.
226 EventSchedulerScheduler *scheduler_scheduler_ = nullptr;
Austin Schuh87dd3832021-01-01 23:07:31 -0800227
228 // Node index handle to be handed back to the TimeConverter. This lets the
229 // same time converter be used for all the nodes, and the node index
230 // distinguish which one.
231 size_t node_index_ = 0;
232
James Kuszmaul86e86c32022-07-21 17:39:47 -0700233 // Whether this individual scheduler is currently running.
234 bool is_running_ = false;
235 // Whether we have called all the startup handlers during this boot.
236 bool called_started_ = false;
Austin Schuhe12b5eb2022-08-29 12:39:27 -0700237 std::optional<distributed_clock::time_point> cached_epoch_;
238 monotonic_clock::time_point cached_event_list_monotonic_time_ =
239 monotonic_clock::max_time;
240 distributed_clock::time_point cached_event_list_time_ =
241 distributed_clock::max_time;
James Kuszmaul86e86c32022-07-21 17:39:47 -0700242
243 std::function<void()> started_;
244 std::function<void()> stopped_;
245 std::function<void()> on_shutdown_;
246
Austin Schuh87dd3832021-01-01 23:07:31 -0800247 // Converts time by doing nothing to it.
248 class UnityConverter final : public TimeConverter {
249 public:
250 distributed_clock::time_point ToDistributedClock(
Austin Schuh58646e22021-08-23 23:51:46 -0700251 size_t /*node_index*/, logger::BootTimestamp time) override {
252 CHECK_EQ(time.boot, 0u) << ": Reboots unsupported by default.";
253 return distributed_clock::epoch() + time.time.time_since_epoch();
Austin Schuh87dd3832021-01-01 23:07:31 -0800254 }
255
Austin Schuh58646e22021-08-23 23:51:46 -0700256 logger::BootTimestamp FromDistributedClock(
257 size_t /*node_index*/, distributed_clock::time_point time,
258 size_t boot_count) override {
259 CHECK_EQ(boot_count, 0u);
260 return logger::BootTimestamp{
261 .boot = boot_count,
262 .time = monotonic_clock::epoch() + time.time_since_epoch()};
Austin Schuh87dd3832021-01-01 23:07:31 -0800263 }
Austin Schuhb7c8d2a2021-07-19 19:22:12 -0700264
265 void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
Austin Schuh58646e22021-08-23 23:51:46 -0700266
267 UUID boot_uuid(size_t /*node_index*/, size_t boot_count) override {
268 CHECK_EQ(boot_count, 0u);
269 return uuid_;
270 }
271
272 private:
273 const UUID uuid_ = UUID::Random();
Austin Schuh87dd3832021-01-01 23:07:31 -0800274 };
275
276 UnityConverter unity_converter_;
277
278 TimeConverter *converter_ = &unity_converter_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700279};
280
Austin Schuh8bd96322020-02-13 21:18:22 -0800281// We need a heap of heaps...
282//
283// Events in a node have a very well defined progression of time. It is linear
284// and well represented by the monotonic clock.
285//
286// Events across nodes don't follow this well. Time skews between the two nodes
287// all the time. We also don't know the function ahead of time which converts
288// from each node's monotonic clock to the distributed clock (our unified base
289// time which is likely the average time between nodes).
290//
291// This pushes us towards merge sort. Sorting each node's events with a heap
292// like we used to be doing, and then sorting each of those nodes independently.
293class EventSchedulerScheduler {
294 public:
295 // Adds an event scheduler to the list.
296 void AddEventScheduler(EventScheduler *scheduler);
297
298 // Runs until there are no more events or Exit is called.
299 void Run();
300
301 // Stops running.
302 void Exit() { is_running_ = false; }
303
Austin Schuh8bd96322020-02-13 21:18:22 -0800304 // Runs for a duration on the distributed clock. Time on the distributed
305 // clock should be very representative of time on each node, but won't be
306 // exactly the same.
307 void RunFor(distributed_clock::duration duration);
308
James Kuszmaulb67409b2022-06-20 16:25:03 -0700309 // Sets the realtime replay rate. A value of 1.0 will cause the scheduler to
310 // try to play events in realtime. 0.5 will run at half speed. Use infinity
311 // (the default) to run as fast as possible. This can be changed during
312 // run-time.
313 void SetReplayRate(double replay_rate) { replay_rate_ = replay_rate; }
314 internal::EPoll *epoll() { return &epoll_; }
315
James Kuszmaul86e86c32022-07-21 17:39:47 -0700316 // Run until time. fn_realtime_offset is a function that returns the
317 // realtime offset.
318 // Returns true if it ran until time (i.e., Exit() was not called before
319 // end_time).
320 bool RunUntil(realtime_clock::time_point end_time, EventScheduler *scheduler,
321 std::function<std::chrono::nanoseconds()> fn_realtime_offset);
322
Austin Schuh8bd96322020-02-13 21:18:22 -0800323 // Returns the current distributed time.
324 distributed_clock::time_point distributed_now() const { return now_; }
325
Austin Schuh58646e22021-08-23 23:51:46 -0700326 void SetTimeConverter(TimeConverter *time_converter) {
327 time_converter->set_reboot_found(
328 [this](distributed_clock::time_point reboot_time,
329 const std::vector<logger::BootTimestamp> &node_times) {
330 if (!reboots_.empty()) {
331 CHECK_GT(reboot_time, std::get<0>(reboots_.back()));
332 }
333 reboots_.emplace_back(reboot_time, node_times);
334 });
Austin Schuh057d29f2021-08-21 23:05:15 -0700335 }
336
Austin Schuhe33c08d2022-02-03 18:15:21 -0800337 // Runs the provided callback now. Stops everything, runs the callback, then
338 // starts it all up again. This lets us do operations like starting and
339 // stopping applications while running.
340 void TemporarilyStopAndRun(std::function<void()> fn);
341
Austin Schuh8bd96322020-02-13 21:18:22 -0800342 private:
Austin Schuh58646e22021-08-23 23:51:46 -0700343 void Reboot();
344
James Kuszmaul86e86c32022-07-21 17:39:47 -0700345 void MaybeRunStopped();
346 void MaybeRunOnStartup();
347
Austin Schuh8bd96322020-02-13 21:18:22 -0800348 // Returns the next event time and scheduler on which to run it.
349 std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
350
James Kuszmaulb67409b2022-06-20 16:25:03 -0700351 // Handles running loop_body repeatedly until complete. loop_body should
352 // return the next time at which it wants to be called, and set is_running_ to
353 // false once we should stop.
354 template <typename F>
355 void RunMaybeRealtimeLoop(F loop_body);
356
Austin Schuh8bd96322020-02-13 21:18:22 -0800357 // True if we are running.
358 bool is_running_ = false;
359 // The current time.
360 distributed_clock::time_point now_ = distributed_clock::epoch();
361 // List of schedulers to run in sync.
362 std::vector<EventScheduler *> schedulers_;
Austin Schuh58646e22021-08-23 23:51:46 -0700363
364 // List of when to reboot each node.
365 std::vector<std::tuple<distributed_clock::time_point,
366 std::vector<logger::BootTimestamp>>>
367 reboots_;
James Kuszmaulb67409b2022-06-20 16:25:03 -0700368
369 double replay_rate_ = std::numeric_limits<double>::infinity();
370 internal::EPoll epoll_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800371};
372
Austin Schuh58646e22021-08-23 23:51:46 -0700373inline distributed_clock::time_point EventScheduler::distributed_now() const {
374 return scheduler_scheduler_->distributed_now();
375}
Austin Schuh8bd96322020-02-13 21:18:22 -0800376inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
Austin Schuh58646e22021-08-23 23:51:46 -0700377 const logger::BootTimestamp t =
378 FromDistributedClock(scheduler_scheduler_->distributed_now());
Austin Schuh60e77942022-05-16 17:48:24 -0700379 CHECK_EQ(t.boot, boot_count_)
380 << ": "
381 << " " << t << " d " << scheduler_scheduler_->distributed_now();
Austin Schuh58646e22021-08-23 23:51:46 -0700382 return t.time;
Austin Schuh8bd96322020-02-13 21:18:22 -0800383}
384
James Kuszmaul86e86c32022-07-21 17:39:47 -0700385inline bool EventScheduler::is_running() const { return is_running_; }
Austin Schuh8bd96322020-02-13 21:18:22 -0800386
Alex Perrycb7da4b2019-08-28 19:35:56 -0700387} // namespace aos
388
389#endif // AOS_EVENTS_EVENT_SCHEDULER_H_