blob: d6ab858068ecfd87deb8ab8554ad5aa73033dabd [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#ifndef AOS_EVENTS_EVENT_SCHEDULER_H_
2#define AOS_EVENTS_EVENT_SCHEDULER_H_
3
4#include <algorithm>
5#include <map>
6#include <memory>
7#include <unordered_set>
8#include <utility>
9#include <vector>
10
11#include "aos/events/event_loop.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080012#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070013#include "aos/time/time.h"
14#include "glog/logging.h"
15
16namespace aos {
17
Austin Schuhac0771c2020-01-07 18:36:30 -080018// This clock is the basis for distributed time. It is used to synchronize time
19// between multiple nodes. This is a new type so conversions to and from the
20// monotonic and realtime clocks aren't implicit.
21class distributed_clock {
22 public:
23 typedef ::std::chrono::nanoseconds::rep rep;
24 typedef ::std::chrono::nanoseconds::period period;
25 typedef ::std::chrono::nanoseconds duration;
26 typedef ::std::chrono::time_point<distributed_clock> time_point;
27
28 // This clock is the base clock for the simulation and everything is synced to
29 // it. It never jumps.
30 static constexpr bool is_steady = true;
31
32 // Returns the epoch (0).
33 static constexpr time_point epoch() { return time_point(zero()); }
34
35 static constexpr duration zero() { return duration(0); }
36
37 static constexpr time_point min_time{
38 time_point(duration(::std::numeric_limits<duration::rep>::min()))};
39 static constexpr time_point max_time{
40 time_point(duration(::std::numeric_limits<duration::rep>::max()))};
41};
42
43std::ostream &operator<<(std::ostream &stream,
44 const aos::distributed_clock::time_point &now);
45
Austin Schuha9abc032021-01-01 16:46:19 -080046// Interface to handle converting time on a node to and from the distributed
47// clock accurately.
48class TimeConverter {
49 public:
50 virtual ~TimeConverter() {}
51
52 // Converts a time to the distributed clock for scheduling and cross-node
53 // time measurement.
54 virtual distributed_clock::time_point ToDistributedClock(
55 size_t node_index, monotonic_clock::time_point time) = 0;
56
57 // Takes the distributed time and converts it to the monotonic clock for this
58 // node.
59 virtual monotonic_clock::time_point FromDistributedClock(
60 size_t node_index, distributed_clock::time_point time) = 0;
Austin Schuhb7c8d2a2021-07-19 19:22:12 -070061
62 // Called whenever time passes this point and we can forget about it.
63 virtual void ObserveTimePassed(distributed_clock::time_point time) = 0;
Austin Schuha9abc032021-01-01 16:46:19 -080064};
65
Austin Schuh8bd96322020-02-13 21:18:22 -080066class EventSchedulerScheduler;
67
Alex Perrycb7da4b2019-08-28 19:35:56 -070068class EventScheduler {
69 public:
70 using ChannelType =
Austin Schuh8bd96322020-02-13 21:18:22 -080071 std::multimap<monotonic_clock::time_point, std::function<void()>>;
Alex Perrycb7da4b2019-08-28 19:35:56 -070072 using Token = ChannelType::iterator;
73
Austin Schuh87dd3832021-01-01 23:07:31 -080074 // Sets the time converter in use for this scheduler (and the corresponding
75 // node index)
76 void SetTimeConverter(size_t node_index, TimeConverter *converter) {
77 node_index_ = node_index;
78 converter_ = converter;
79 }
80
Alex Perrycb7da4b2019-08-28 19:35:56 -070081 // Schedule an event with a callback function
82 // Returns an iterator to the event
Austin Schuh8bd96322020-02-13 21:18:22 -080083 Token Schedule(monotonic_clock::time_point time,
Alex Perrycb7da4b2019-08-28 19:35:56 -070084 std::function<void()> callback);
85
Austin Schuh39788ff2019-12-01 18:22:57 -080086 // Schedules a callback when the event scheduler starts.
87 void ScheduleOnRun(std::function<void()> callback) {
88 on_run_.emplace_back(std::move(callback));
89 }
90
Austin Schuh057d29f2021-08-21 23:05:15 -070091 // Schedules a callback when the event scheduler starts.
92 void ScheduleOnStartup(std::function<void()> callback) {
93 on_startup_.emplace_back(std::move(callback));
94 }
95
Alex Perrycb7da4b2019-08-28 19:35:56 -070096 Token InvalidToken() { return events_list_.end(); }
97
98 // Deschedule an event by its iterator
99 void Deschedule(Token token);
100
Austin Schuh8bd96322020-02-13 21:18:22 -0800101 // Runs the OnRun callbacks.
102 void RunOnRun();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700103
Austin Schuh057d29f2021-08-21 23:05:15 -0700104 // Runs the OnStartup callbacks.
105 void RunOnStartup();
106
Austin Schuh8bd96322020-02-13 21:18:22 -0800107 // Returns true if events are being handled.
108 inline bool is_running() const;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700109
Austin Schuh8bd96322020-02-13 21:18:22 -0800110 // Returns the timestamp of the next event to trigger.
111 aos::monotonic_clock::time_point OldestEvent();
112 // Handles the next event.
113 void CallOldestEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700114
Austin Schuh8bd96322020-02-13 21:18:22 -0800115 // Converts a time to the distributed clock for scheduling and cross-node time
116 // measurement.
117 distributed_clock::time_point ToDistributedClock(
118 monotonic_clock::time_point time) const {
Austin Schuh87dd3832021-01-01 23:07:31 -0800119 return converter_->ToDistributedClock(node_index_, time);
Austin Schuh8bd96322020-02-13 21:18:22 -0800120 }
121
122 // Takes the distributed time and converts it to the monotonic clock for this
123 // node.
124 monotonic_clock::time_point FromDistributedClock(
125 distributed_clock::time_point time) const {
Austin Schuh87dd3832021-01-01 23:07:31 -0800126 return converter_->FromDistributedClock(node_index_, time);
Austin Schuh8bd96322020-02-13 21:18:22 -0800127 }
128
129 // Returns the current monotonic time on this node calculated from the
130 // distributed clock.
131 inline monotonic_clock::time_point monotonic_now() const;
132
Alex Perrycb7da4b2019-08-28 19:35:56 -0700133 private:
Austin Schuh8bd96322020-02-13 21:18:22 -0800134 friend class EventSchedulerScheduler;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700135 // Current execution time.
Austin Schuhbe69cf32020-08-27 11:38:33 -0700136 monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700137
Austin Schuh8bd96322020-02-13 21:18:22 -0800138 // List of functions to run (once) when running.
Austin Schuh39788ff2019-12-01 18:22:57 -0800139 std::vector<std::function<void()>> on_run_;
Austin Schuh057d29f2021-08-21 23:05:15 -0700140 std::vector<std::function<void()>> on_startup_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800141
Alex Perrycb7da4b2019-08-28 19:35:56 -0700142 // Multimap holding times to run functions. These are stored in order, and
143 // the order is the callback tree.
144 ChannelType events_list_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800145
146 // Pointer to the actual scheduler.
147 EventSchedulerScheduler *scheduler_scheduler_ = nullptr;
Austin Schuh87dd3832021-01-01 23:07:31 -0800148
149 // Node index handle to be handed back to the TimeConverter. This lets the
150 // same time converter be used for all the nodes, and the node index
151 // distinguish which one.
152 size_t node_index_ = 0;
153
154 // Converts time by doing nothing to it.
155 class UnityConverter final : public TimeConverter {
156 public:
157 distributed_clock::time_point ToDistributedClock(
158 size_t /*node_index*/, monotonic_clock::time_point time) override {
159 return distributed_clock::epoch() + time.time_since_epoch();
160 }
161
162 monotonic_clock::time_point FromDistributedClock(
163 size_t /*node_index*/, distributed_clock::time_point time) override {
164 return monotonic_clock::epoch() + time.time_since_epoch();
165 }
Austin Schuhb7c8d2a2021-07-19 19:22:12 -0700166
167 void ObserveTimePassed(distributed_clock::time_point /*time*/) override {}
Austin Schuh87dd3832021-01-01 23:07:31 -0800168 };
169
170 UnityConverter unity_converter_;
171
172 TimeConverter *converter_ = &unity_converter_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700173};
174
Austin Schuh8bd96322020-02-13 21:18:22 -0800175// We need a heap of heaps...
176//
177// Events in a node have a very well defined progression of time. It is linear
178// and well represented by the monotonic clock.
179//
180// Events across nodes don't follow this well. Time skews between the two nodes
181// all the time. We also don't know the function ahead of time which converts
182// from each node's monotonic clock to the distributed clock (our unified base
183// time which is likely the average time between nodes).
184//
185// This pushes us towards merge sort. Sorting each node's events with a heap
186// like we used to be doing, and then sorting each of those nodes independently.
187class EventSchedulerScheduler {
188 public:
189 // Adds an event scheduler to the list.
190 void AddEventScheduler(EventScheduler *scheduler);
191
192 // Runs until there are no more events or Exit is called.
193 void Run();
194
195 // Stops running.
196 void Exit() { is_running_ = false; }
197
198 bool is_running() const { return is_running_; }
199
200 // Runs for a duration on the distributed clock. Time on the distributed
201 // clock should be very representative of time on each node, but won't be
202 // exactly the same.
203 void RunFor(distributed_clock::duration duration);
204
205 // Returns the current distributed time.
206 distributed_clock::time_point distributed_now() const { return now_; }
207
Austin Schuh057d29f2021-08-21 23:05:15 -0700208 void RunOnStartup() {
209 CHECK(!is_running_);
210 for (EventScheduler *scheduler : schedulers_) {
211 scheduler->RunOnStartup();
212 }
213 }
214
Austin Schuh8bd96322020-02-13 21:18:22 -0800215 private:
216 // Handles running the OnRun functions.
217 void RunOnRun() {
218 CHECK(!is_running_);
219 is_running_ = true;
220 for (EventScheduler *scheduler : schedulers_) {
221 scheduler->RunOnRun();
222 }
223 }
224
225 // Returns the next event time and scheduler on which to run it.
226 std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
227
228 // True if we are running.
229 bool is_running_ = false;
230 // The current time.
231 distributed_clock::time_point now_ = distributed_clock::epoch();
232 // List of schedulers to run in sync.
233 std::vector<EventScheduler *> schedulers_;
234};
235
236inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
Austin Schuh87dd3832021-01-01 23:07:31 -0800237 return FromDistributedClock(scheduler_scheduler_->distributed_now());
Austin Schuh8bd96322020-02-13 21:18:22 -0800238}
239
240inline bool EventScheduler::is_running() const {
241 return scheduler_scheduler_->is_running();
242}
243
Alex Perrycb7da4b2019-08-28 19:35:56 -0700244} // namespace aos
245
246#endif // AOS_EVENTS_EVENT_SCHEDULER_H_