blob: 959caea16baede9999873a4c949459a223158916 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#ifndef AOS_EVENTS_EVENT_SCHEDULER_H_
2#define AOS_EVENTS_EVENT_SCHEDULER_H_
3
4#include <algorithm>
5#include <map>
6#include <memory>
7#include <unordered_set>
8#include <utility>
9#include <vector>
10
11#include "aos/events/event_loop.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080012#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070013#include "aos/time/time.h"
14#include "glog/logging.h"
15
16namespace aos {
17
Austin Schuhac0771c2020-01-07 18:36:30 -080018// This clock is the basis for distributed time. It is used to synchronize time
19// between multiple nodes. This is a new type so conversions to and from the
20// monotonic and realtime clocks aren't implicit.
21class distributed_clock {
22 public:
23 typedef ::std::chrono::nanoseconds::rep rep;
24 typedef ::std::chrono::nanoseconds::period period;
25 typedef ::std::chrono::nanoseconds duration;
26 typedef ::std::chrono::time_point<distributed_clock> time_point;
27
28 // This clock is the base clock for the simulation and everything is synced to
29 // it. It never jumps.
30 static constexpr bool is_steady = true;
31
32 // Returns the epoch (0).
33 static constexpr time_point epoch() { return time_point(zero()); }
34
35 static constexpr duration zero() { return duration(0); }
36
37 static constexpr time_point min_time{
38 time_point(duration(::std::numeric_limits<duration::rep>::min()))};
39 static constexpr time_point max_time{
40 time_point(duration(::std::numeric_limits<duration::rep>::max()))};
41};
42
43std::ostream &operator<<(std::ostream &stream,
44 const aos::distributed_clock::time_point &now);
45
Austin Schuha9abc032021-01-01 16:46:19 -080046// Interface to handle converting time on a node to and from the distributed
47// clock accurately.
48class TimeConverter {
49 public:
50 virtual ~TimeConverter() {}
51
52 // Converts a time to the distributed clock for scheduling and cross-node
53 // time measurement.
54 virtual distributed_clock::time_point ToDistributedClock(
55 size_t node_index, monotonic_clock::time_point time) = 0;
56
57 // Takes the distributed time and converts it to the monotonic clock for this
58 // node.
59 virtual monotonic_clock::time_point FromDistributedClock(
60 size_t node_index, distributed_clock::time_point time) = 0;
61};
62
Austin Schuh8bd96322020-02-13 21:18:22 -080063class EventSchedulerScheduler;
64
Alex Perrycb7da4b2019-08-28 19:35:56 -070065class EventScheduler {
66 public:
67 using ChannelType =
Austin Schuh8bd96322020-02-13 21:18:22 -080068 std::multimap<monotonic_clock::time_point, std::function<void()>>;
Alex Perrycb7da4b2019-08-28 19:35:56 -070069 using Token = ChannelType::iterator;
70
71 // Schedule an event with a callback function
72 // Returns an iterator to the event
Austin Schuh8bd96322020-02-13 21:18:22 -080073 Token Schedule(monotonic_clock::time_point time,
Alex Perrycb7da4b2019-08-28 19:35:56 -070074 std::function<void()> callback);
75
Austin Schuh39788ff2019-12-01 18:22:57 -080076 // Schedules a callback when the event scheduler starts.
77 void ScheduleOnRun(std::function<void()> callback) {
78 on_run_.emplace_back(std::move(callback));
79 }
80
Alex Perrycb7da4b2019-08-28 19:35:56 -070081 Token InvalidToken() { return events_list_.end(); }
82
83 // Deschedule an event by its iterator
84 void Deschedule(Token token);
85
Austin Schuh8bd96322020-02-13 21:18:22 -080086 // Runs the OnRun callbacks.
87 void RunOnRun();
Alex Perrycb7da4b2019-08-28 19:35:56 -070088
Austin Schuh8bd96322020-02-13 21:18:22 -080089 // Returns true if events are being handled.
90 inline bool is_running() const;
Alex Perrycb7da4b2019-08-28 19:35:56 -070091
Austin Schuh8bd96322020-02-13 21:18:22 -080092 // Returns the timestamp of the next event to trigger.
93 aos::monotonic_clock::time_point OldestEvent();
94 // Handles the next event.
95 void CallOldestEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -070096
Austin Schuh8bd96322020-02-13 21:18:22 -080097 // Converts a time to the distributed clock for scheduling and cross-node time
98 // measurement.
99 distributed_clock::time_point ToDistributedClock(
100 monotonic_clock::time_point time) const {
Austin Schuhbe69cf32020-08-27 11:38:33 -0700101 return distributed_clock::epoch() +
102 std::chrono::duration_cast<std::chrono::nanoseconds>(
103 (time.time_since_epoch() - distributed_offset_) /
104 distributed_slope_);
Austin Schuh8bd96322020-02-13 21:18:22 -0800105 }
106
107 // Takes the distributed time and converts it to the monotonic clock for this
108 // node.
109 monotonic_clock::time_point FromDistributedClock(
110 distributed_clock::time_point time) const {
Austin Schuhbe69cf32020-08-27 11:38:33 -0700111 return monotonic_clock::epoch() +
112 std::chrono::duration_cast<std::chrono::nanoseconds>(
113 time.time_since_epoch() * distributed_slope_) +
114 distributed_offset_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800115 }
116
117 // Returns the current monotonic time on this node calculated from the
118 // distributed clock.
119 inline monotonic_clock::time_point monotonic_now() const;
120
121 // Sets the offset between the distributed and monotonic clock.
Austin Schuhbe69cf32020-08-27 11:38:33 -0700122 // monotonic = distributed * slope + offset;
123 void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
124 double distributed_slope) {
125 // TODO(austin): Use a starting point to improve precision.
126 // TODO(austin): Make slope be the slope of the offset, not the input,
127 // throught the calculation process.
128 distributed_offset_ = distributed_offset;
129 distributed_slope_ = distributed_slope;
Austin Schuh8bd96322020-02-13 21:18:22 -0800130
Austin Schuhbe69cf32020-08-27 11:38:33 -0700131 // Once we update the offset, now isn't going to be valid anymore.
132 // TODO(austin): Probably should instead use the piecewise linear function
133 // and evaluate it correctly.
134 monotonic_now_valid_ = false;
Austin Schuh8bd96322020-02-13 21:18:22 -0800135 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700136
137 private:
Austin Schuh8bd96322020-02-13 21:18:22 -0800138 friend class EventSchedulerScheduler;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700139 // Current execution time.
Austin Schuhbe69cf32020-08-27 11:38:33 -0700140 bool monotonic_now_valid_ = false;
141 monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700142
Austin Schuh8bd96322020-02-13 21:18:22 -0800143 // Offset to the distributed clock.
144 // distributed = monotonic + offset;
Austin Schuhbe69cf32020-08-27 11:38:33 -0700145 std::chrono::nanoseconds distributed_offset_ = std::chrono::seconds(0);
146 double distributed_slope_ = 1.0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800147
148 // List of functions to run (once) when running.
Austin Schuh39788ff2019-12-01 18:22:57 -0800149 std::vector<std::function<void()>> on_run_;
150
Alex Perrycb7da4b2019-08-28 19:35:56 -0700151 // Multimap holding times to run functions. These are stored in order, and
152 // the order is the callback tree.
153 ChannelType events_list_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800154
155 // Pointer to the actual scheduler.
156 EventSchedulerScheduler *scheduler_scheduler_ = nullptr;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700157};
158
Austin Schuh8bd96322020-02-13 21:18:22 -0800159// We need a heap of heaps...
160//
161// Events in a node have a very well defined progression of time. It is linear
162// and well represented by the monotonic clock.
163//
164// Events across nodes don't follow this well. Time skews between the two nodes
165// all the time. We also don't know the function ahead of time which converts
166// from each node's monotonic clock to the distributed clock (our unified base
167// time which is likely the average time between nodes).
168//
169// This pushes us towards merge sort. Sorting each node's events with a heap
170// like we used to be doing, and then sorting each of those nodes independently.
171class EventSchedulerScheduler {
172 public:
173 // Adds an event scheduler to the list.
174 void AddEventScheduler(EventScheduler *scheduler);
175
176 // Runs until there are no more events or Exit is called.
177 void Run();
178
179 // Stops running.
180 void Exit() { is_running_ = false; }
181
182 bool is_running() const { return is_running_; }
183
184 // Runs for a duration on the distributed clock. Time on the distributed
185 // clock should be very representative of time on each node, but won't be
186 // exactly the same.
187 void RunFor(distributed_clock::duration duration);
188
189 // Returns the current distributed time.
190 distributed_clock::time_point distributed_now() const { return now_; }
191
192 private:
193 // Handles running the OnRun functions.
194 void RunOnRun() {
195 CHECK(!is_running_);
196 is_running_ = true;
197 for (EventScheduler *scheduler : schedulers_) {
198 scheduler->RunOnRun();
199 }
200 }
201
202 // Returns the next event time and scheduler on which to run it.
203 std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
204
205 // True if we are running.
206 bool is_running_ = false;
207 // The current time.
208 distributed_clock::time_point now_ = distributed_clock::epoch();
209 // List of schedulers to run in sync.
210 std::vector<EventScheduler *> schedulers_;
211};
212
213inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
214 // Make sure we stay in sync.
Austin Schuhbe69cf32020-08-27 11:38:33 -0700215 if (monotonic_now_valid_) {
Austin Schuh2f8fd752020-09-01 22:38:28 -0700216 // We want time to be smooth, so confirm that it doesn't change too much
217 // while handling an event.
218 //
219 // There are 2 sources of error. There are numerical precision and interger
220 // rounding problems going from the monotonic clock to the distributed clock
221 // and back again. When we update the time function as well to transition
222 // line segments, we have a slight jump as well.
Austin Schuhbe69cf32020-08-27 11:38:33 -0700223 CHECK_NEAR(monotonic_now_,
224 FromDistributedClock(scheduler_scheduler_->distributed_now()),
Austin Schuh2f8fd752020-09-01 22:38:28 -0700225 std::chrono::nanoseconds(2));
Austin Schuhbe69cf32020-08-27 11:38:33 -0700226 return monotonic_now_;
227 } else {
228 return FromDistributedClock(scheduler_scheduler_->distributed_now());
229 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800230}
231
232inline bool EventScheduler::is_running() const {
233 return scheduler_scheduler_->is_running();
234}
235
Alex Perrycb7da4b2019-08-28 19:35:56 -0700236} // namespace aos
237
238#endif // AOS_EVENTS_EVENT_SCHEDULER_H_