blob: b64728c20f932b50b4c0094e456535d592b79c89 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#ifndef AOS_EVENTS_EVENT_SCHEDULER_H_
2#define AOS_EVENTS_EVENT_SCHEDULER_H_
3
4#include <algorithm>
5#include <map>
6#include <memory>
7#include <unordered_set>
8#include <utility>
9#include <vector>
10
11#include "aos/events/event_loop.h"
Austin Schuh58646e22021-08-23 23:51:46 -070012#include "aos/events/logging/boot_timestamp.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080013#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070014#include "aos/time/time.h"
15#include "glog/logging.h"
16
17namespace aos {
18
Austin Schuhac0771c2020-01-07 18:36:30 -080019// This clock is the basis for distributed time. It is used to synchronize time
20// between multiple nodes. This is a new type so conversions to and from the
21// monotonic and realtime clocks aren't implicit.
22class distributed_clock {
23 public:
24 typedef ::std::chrono::nanoseconds::rep rep;
25 typedef ::std::chrono::nanoseconds::period period;
26 typedef ::std::chrono::nanoseconds duration;
27 typedef ::std::chrono::time_point<distributed_clock> time_point;
28
29 // This clock is the base clock for the simulation and everything is synced to
30 // it. It never jumps.
31 static constexpr bool is_steady = true;
32
33 // Returns the epoch (0).
34 static constexpr time_point epoch() { return time_point(zero()); }
35
36 static constexpr duration zero() { return duration(0); }
37
38 static constexpr time_point min_time{
39 time_point(duration(::std::numeric_limits<duration::rep>::min()))};
40 static constexpr time_point max_time{
41 time_point(duration(::std::numeric_limits<duration::rep>::max()))};
42};
43
44std::ostream &operator<<(std::ostream &stream,
45 const aos::distributed_clock::time_point &now);
46
Austin Schuha9abc032021-01-01 16:46:19 -080047// Interface to handle converting time on a node to and from the distributed
48// clock accurately.
49class TimeConverter {
50 public:
51 virtual ~TimeConverter() {}
52
Austin Schuh58646e22021-08-23 23:51:46 -070053 // Returns the boot UUID for a node and boot. Note: the boot UUID for
54 // subsequent calls needs to be the same each time.
55 virtual UUID boot_uuid(size_t node_index, size_t boot_count) = 0;
56
57 void set_reboot_found(
58 std::function<void(distributed_clock::time_point,
59 const std::vector<logger::BootTimestamp> &)>
60 fn) {
61 reboot_found_ = fn;
62 }
63
Austin Schuha9abc032021-01-01 16:46:19 -080064 // Converts a time to the distributed clock for scheduling and cross-node
65 // time measurement.
66 virtual distributed_clock::time_point ToDistributedClock(
Austin Schuh58646e22021-08-23 23:51:46 -070067 size_t node_index, logger::BootTimestamp time) = 0;
Austin Schuha9abc032021-01-01 16:46:19 -080068
69 // Takes the distributed time and converts it to the monotonic clock for this
70 // node.
Austin Schuh58646e22021-08-23 23:51:46 -070071 virtual logger::BootTimestamp FromDistributedClock(
72 size_t node_index, distributed_clock::time_point time,
73 size_t boot_count) = 0;
Austin Schuhb7c8d2a2021-07-19 19:22:12 -070074
75 // Called whenever time passes this point and we can forget about it.
76 virtual void ObserveTimePassed(distributed_clock::time_point time) = 0;
Austin Schuh58646e22021-08-23 23:51:46 -070077
78 protected:
79 std::function<void(distributed_clock::time_point,
80 const std::vector<logger::BootTimestamp> &)>
81 reboot_found_;
Austin Schuha9abc032021-01-01 16:46:19 -080082};
83
Austin Schuh8bd96322020-02-13 21:18:22 -080084class EventSchedulerScheduler;
85
Alex Perrycb7da4b2019-08-28 19:35:56 -070086class EventScheduler {
87 public:
Austin Schuhef8f1ae2021-12-11 12:35:05 -080088 class Event {
89 public:
90 virtual void Handle() noexcept = 0;
91 virtual ~Event() {}
92 };
93
94 using ChannelType = std::multimap<monotonic_clock::time_point, Event *>;
Alex Perrycb7da4b2019-08-28 19:35:56 -070095 using Token = ChannelType::iterator;
Austin Schuh58646e22021-08-23 23:51:46 -070096 EventScheduler(size_t node_index) : node_index_(node_index) {}
Alex Perrycb7da4b2019-08-28 19:35:56 -070097
Austin Schuh87dd3832021-01-01 23:07:31 -080098 // Sets the time converter in use for this scheduler (and the corresponding
99 // node index)
100 void SetTimeConverter(size_t node_index, TimeConverter *converter) {
Austin Schuh58646e22021-08-23 23:51:46 -0700101 CHECK_EQ(node_index_, node_index);
Austin Schuh87dd3832021-01-01 23:07:31 -0800102 converter_ = converter;
103 }
104
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800105 UUID boot_uuid() { return converter_->boot_uuid(node_index_, boot_count_); }
Austin Schuh58646e22021-08-23 23:51:46 -0700106
Alex Perrycb7da4b2019-08-28 19:35:56 -0700107 // Schedule an event with a callback function
108 // Returns an iterator to the event
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800109 Token Schedule(monotonic_clock::time_point time, Event *callback);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700110
Austin Schuh39788ff2019-12-01 18:22:57 -0800111 // Schedules a callback when the event scheduler starts.
112 void ScheduleOnRun(std::function<void()> callback) {
113 on_run_.emplace_back(std::move(callback));
114 }
115
Austin Schuh057d29f2021-08-21 23:05:15 -0700116 // Schedules a callback when the event scheduler starts.
117 void ScheduleOnStartup(std::function<void()> callback) {
118 on_startup_.emplace_back(std::move(callback));
119 }
120
Austin Schuh58646e22021-08-23 23:51:46 -0700121 void set_on_shutdown(std::function<void()> callback) {
122 on_shutdown_ = std::move(callback);
123 }
124
125 void set_started(std::function<void()> callback) {
126 started_ = std::move(callback);
127 }
128
Austin Schuhe33c08d2022-02-03 18:15:21 -0800129 void set_stopped(std::function<void()> callback) {
130 stopped_ = std::move(callback);
131 }
132
Austin Schuh58646e22021-08-23 23:51:46 -0700133 std::function<void()> started_;
Austin Schuhe33c08d2022-02-03 18:15:21 -0800134 std::function<void()> stopped_;
Austin Schuh58646e22021-08-23 23:51:46 -0700135 std::function<void()> on_shutdown_;
136
Alex Perrycb7da4b2019-08-28 19:35:56 -0700137 Token InvalidToken() { return events_list_.end(); }
138
139 // Deschedule an event by its iterator
140 void Deschedule(Token token);
141
Austin Schuh8bd96322020-02-13 21:18:22 -0800142 // Runs the OnRun callbacks.
143 void RunOnRun();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700144
Austin Schuh057d29f2021-08-21 23:05:15 -0700145 // Runs the OnStartup callbacks.
Austin Schuhe33c08d2022-02-03 18:15:21 -0800146 void RunOnStartup() noexcept;
Austin Schuh057d29f2021-08-21 23:05:15 -0700147
Austin Schuh58646e22021-08-23 23:51:46 -0700148 // Runs the Started callback.
149 void RunStarted();
Austin Schuhe33c08d2022-02-03 18:15:21 -0800150 // Runs the Started callback.
151 void RunStopped();
Austin Schuh58646e22021-08-23 23:51:46 -0700152
Austin Schuh8bd96322020-02-13 21:18:22 -0800153 // Returns true if events are being handled.
154 inline bool is_running() const;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700155
Austin Schuh8bd96322020-02-13 21:18:22 -0800156 // Returns the timestamp of the next event to trigger.
Austin Schuh58646e22021-08-23 23:51:46 -0700157 monotonic_clock::time_point OldestEvent();
Austin Schuh8bd96322020-02-13 21:18:22 -0800158 // Handles the next event.
159 void CallOldestEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700160
Austin Schuh8bd96322020-02-13 21:18:22 -0800161 // Converts a time to the distributed clock for scheduling and cross-node time
162 // measurement.
163 distributed_clock::time_point ToDistributedClock(
164 monotonic_clock::time_point time) const {
Austin Schuh58646e22021-08-23 23:51:46 -0700165 return converter_->ToDistributedClock(node_index_,
166 {.boot = boot_count_, .time = time});
Austin Schuh8bd96322020-02-13 21:18:22 -0800167 }
168
169 // Takes the distributed time and converts it to the monotonic clock for this
170 // node.
Austin Schuh58646e22021-08-23 23:51:46 -0700171 logger::BootTimestamp FromDistributedClock(
Austin Schuh8bd96322020-02-13 21:18:22 -0800172 distributed_clock::time_point time) const {
Austin Schuh58646e22021-08-23 23:51:46 -0700173 return converter_->FromDistributedClock(node_index_, time, boot_count_);
Austin Schuh8bd96322020-02-13 21:18:22 -0800174 }
175
176 // Returns the current monotonic time on this node calculated from the
177 // distributed clock.
178 inline monotonic_clock::time_point monotonic_now() const;
179
Austin Schuh58646e22021-08-23 23:51:46 -0700180 // Returns the current monotonic time on this node calculated from the
181 // distributed clock.
182 inline distributed_clock::time_point distributed_now() const;
183
184 size_t boot_count() const { return boot_count_; }
185
186 size_t node_index() const { return node_index_; }
187
188 // For implementing reboots.
189 void Shutdown();
190 void Startup();
191
Alex Perrycb7da4b2019-08-28 19:35:56 -0700192 private:
Austin Schuh8bd96322020-02-13 21:18:22 -0800193 friend class EventSchedulerScheduler;
Austin Schuh58646e22021-08-23 23:51:46 -0700194
Alex Perrycb7da4b2019-08-28 19:35:56 -0700195 // Current execution time.
Austin Schuhbe69cf32020-08-27 11:38:33 -0700196 monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700197
Austin Schuh58646e22021-08-23 23:51:46 -0700198 size_t boot_count_ = 0;
199
Austin Schuh8bd96322020-02-13 21:18:22 -0800200 // List of functions to run (once) when running.
Austin Schuh39788ff2019-12-01 18:22:57 -0800201 std::vector<std::function<void()>> on_run_;
Austin Schuh057d29f2021-08-21 23:05:15 -0700202 std::vector<std::function<void()>> on_startup_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800203
Alex Perrycb7da4b2019-08-28 19:35:56 -0700204 // Multimap holding times to run functions. These are stored in order, and
205 // the order is the callback tree.
206 ChannelType events_list_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800207
208 // Pointer to the actual scheduler.
209 EventSchedulerScheduler *scheduler_scheduler_ = nullptr;
Austin Schuh87dd3832021-01-01 23:07:31 -0800210
211 // Node index handle to be handed back to the TimeConverter. This lets the
212 // same time converter be used for all the nodes, and the node index
213 // distinguish which one.
214 size_t node_index_ = 0;
215
216 // Converts time by doing nothing to it.
217 class UnityConverter final : public TimeConverter {
218 public:
219 distributed_clock::time_point ToDistributedClock(
Austin Schuh58646e22021-08-23 23:51:46 -0700220 size_t /*node_index*/, logger::BootTimestamp time) override {
221 CHECK_EQ(time.boot, 0u) << ": Reboots unsupported by default.";
222 return distributed_clock::epoch() + time.time.time_since_epoch();
Austin Schuh87dd3832021-01-01 23:07:31 -0800223 }
224
Austin Schuh58646e22021-08-23 23:51:46 -0700225 logger::BootTimestamp FromDistributedClock(
226 size_t /*node_index*/, distributed_clock::time_point time,
227 size_t boot_count) override {
228 CHECK_EQ(boot_count, 0u);
229 return logger::BootTimestamp{
230 .boot = boot_count,
231 .time = monotonic_clock::epoch() + time.time_since_epoch()};
Austin Schuh87dd3832021-01-01 23:07:31 -0800232 }
Austin Schuhb7c8d2a2021-07-19 19:22:12 -0700233
234 void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
Austin Schuh58646e22021-08-23 23:51:46 -0700235
236 UUID boot_uuid(size_t /*node_index*/, size_t boot_count) override {
237 CHECK_EQ(boot_count, 0u);
238 return uuid_;
239 }
240
241 private:
242 const UUID uuid_ = UUID::Random();
Austin Schuh87dd3832021-01-01 23:07:31 -0800243 };
244
245 UnityConverter unity_converter_;
246
247 TimeConverter *converter_ = &unity_converter_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700248};
249
Austin Schuh8bd96322020-02-13 21:18:22 -0800250// We need a heap of heaps...
251//
252// Events in a node have a very well defined progression of time. It is linear
253// and well represented by the monotonic clock.
254//
255// Events across nodes don't follow this well. Time skews between the two nodes
256// all the time. We also don't know the function ahead of time which converts
257// from each node's monotonic clock to the distributed clock (our unified base
258// time which is likely the average time between nodes).
259//
260// This pushes us towards merge sort. Sorting each node's events with a heap
261// like we used to be doing, and then sorting each of those nodes independently.
262class EventSchedulerScheduler {
263 public:
264 // Adds an event scheduler to the list.
265 void AddEventScheduler(EventScheduler *scheduler);
266
267 // Runs until there are no more events or Exit is called.
268 void Run();
269
270 // Stops running.
271 void Exit() { is_running_ = false; }
272
273 bool is_running() const { return is_running_; }
274
275 // Runs for a duration on the distributed clock. Time on the distributed
276 // clock should be very representative of time on each node, but won't be
277 // exactly the same.
278 void RunFor(distributed_clock::duration duration);
279
280 // Returns the current distributed time.
281 distributed_clock::time_point distributed_now() const { return now_; }
282
Austin Schuh057d29f2021-08-21 23:05:15 -0700283 void RunOnStartup() {
284 CHECK(!is_running_);
285 for (EventScheduler *scheduler : schedulers_) {
286 scheduler->RunOnStartup();
287 }
Austin Schuh58646e22021-08-23 23:51:46 -0700288 for (EventScheduler *scheduler : schedulers_) {
289 scheduler->RunStarted();
290 }
291 }
292
Austin Schuhe33c08d2022-02-03 18:15:21 -0800293 void RunStopped() {
294 CHECK(!is_running_);
295 for (EventScheduler *scheduler : schedulers_) {
296 scheduler->RunStopped();
297 }
298 }
299
Austin Schuh58646e22021-08-23 23:51:46 -0700300 void SetTimeConverter(TimeConverter *time_converter) {
301 time_converter->set_reboot_found(
302 [this](distributed_clock::time_point reboot_time,
303 const std::vector<logger::BootTimestamp> &node_times) {
304 if (!reboots_.empty()) {
305 CHECK_GT(reboot_time, std::get<0>(reboots_.back()));
306 }
307 reboots_.emplace_back(reboot_time, node_times);
308 });
Austin Schuh057d29f2021-08-21 23:05:15 -0700309 }
310
Austin Schuhe33c08d2022-02-03 18:15:21 -0800311 // Runs the provided callback now. Stops everything, runs the callback, then
312 // starts it all up again. This lets us do operations like starting and
313 // stopping applications while running.
314 void TemporarilyStopAndRun(std::function<void()> fn);
315
Austin Schuh8bd96322020-02-13 21:18:22 -0800316 private:
317 // Handles running the OnRun functions.
318 void RunOnRun() {
319 CHECK(!is_running_);
320 is_running_ = true;
321 for (EventScheduler *scheduler : schedulers_) {
322 scheduler->RunOnRun();
323 }
324 }
325
Austin Schuh58646e22021-08-23 23:51:46 -0700326 void Reboot();
327
Austin Schuh8bd96322020-02-13 21:18:22 -0800328 // Returns the next event time and scheduler on which to run it.
329 std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
330
331 // True if we are running.
332 bool is_running_ = false;
333 // The current time.
334 distributed_clock::time_point now_ = distributed_clock::epoch();
335 // List of schedulers to run in sync.
336 std::vector<EventScheduler *> schedulers_;
Austin Schuh58646e22021-08-23 23:51:46 -0700337
338 // List of when to reboot each node.
339 std::vector<std::tuple<distributed_clock::time_point,
340 std::vector<logger::BootTimestamp>>>
341 reboots_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800342};
343
Austin Schuh58646e22021-08-23 23:51:46 -0700344inline distributed_clock::time_point EventScheduler::distributed_now() const {
345 return scheduler_scheduler_->distributed_now();
346}
Austin Schuh8bd96322020-02-13 21:18:22 -0800347inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
Austin Schuh58646e22021-08-23 23:51:46 -0700348 const logger::BootTimestamp t =
349 FromDistributedClock(scheduler_scheduler_->distributed_now());
350 CHECK_EQ(t.boot, boot_count_) << ": " << " " << t << " d "
351 << scheduler_scheduler_->distributed_now();
352 return t.time;
Austin Schuh8bd96322020-02-13 21:18:22 -0800353}
354
355inline bool EventScheduler::is_running() const {
356 return scheduler_scheduler_->is_running();
357}
358
Alex Perrycb7da4b2019-08-28 19:35:56 -0700359} // namespace aos
360
361#endif // AOS_EVENTS_EVENT_SCHEDULER_H_