blob: 468d90401c4361b24b0454cb06b5bb31ed70cd47 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#ifndef AOS_EVENTS_EVENT_SCHEDULER_H_
2#define AOS_EVENTS_EVENT_SCHEDULER_H_
3
4#include <algorithm>
5#include <map>
6#include <memory>
7#include <unordered_set>
8#include <utility>
9#include <vector>
10
11#include "aos/events/event_loop.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080012#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070013#include "aos/time/time.h"
14#include "glog/logging.h"
15
16namespace aos {
17
Austin Schuhac0771c2020-01-07 18:36:30 -080018// This clock is the basis for distributed time. It is used to synchronize time
19// between multiple nodes. This is a new type so conversions to and from the
20// monotonic and realtime clocks aren't implicit.
21class distributed_clock {
22 public:
23 typedef ::std::chrono::nanoseconds::rep rep;
24 typedef ::std::chrono::nanoseconds::period period;
25 typedef ::std::chrono::nanoseconds duration;
26 typedef ::std::chrono::time_point<distributed_clock> time_point;
27
28 // This clock is the base clock for the simulation and everything is synced to
29 // it. It never jumps.
30 static constexpr bool is_steady = true;
31
32 // Returns the epoch (0).
33 static constexpr time_point epoch() { return time_point(zero()); }
34
35 static constexpr duration zero() { return duration(0); }
36
37 static constexpr time_point min_time{
38 time_point(duration(::std::numeric_limits<duration::rep>::min()))};
39 static constexpr time_point max_time{
40 time_point(duration(::std::numeric_limits<duration::rep>::max()))};
41};
42
43std::ostream &operator<<(std::ostream &stream,
44 const aos::distributed_clock::time_point &now);
45
Austin Schuh8bd96322020-02-13 21:18:22 -080046class EventSchedulerScheduler;
47
Alex Perrycb7da4b2019-08-28 19:35:56 -070048class EventScheduler {
49 public:
50 using ChannelType =
Austin Schuh8bd96322020-02-13 21:18:22 -080051 std::multimap<monotonic_clock::time_point, std::function<void()>>;
Alex Perrycb7da4b2019-08-28 19:35:56 -070052 using Token = ChannelType::iterator;
53
54 // Schedule an event with a callback function
55 // Returns an iterator to the event
Austin Schuh8bd96322020-02-13 21:18:22 -080056 Token Schedule(monotonic_clock::time_point time,
Alex Perrycb7da4b2019-08-28 19:35:56 -070057 std::function<void()> callback);
58
Austin Schuh39788ff2019-12-01 18:22:57 -080059 // Schedules a callback when the event scheduler starts.
60 void ScheduleOnRun(std::function<void()> callback) {
61 on_run_.emplace_back(std::move(callback));
62 }
63
Alex Perrycb7da4b2019-08-28 19:35:56 -070064 Token InvalidToken() { return events_list_.end(); }
65
66 // Deschedule an event by its iterator
67 void Deschedule(Token token);
68
Austin Schuh8bd96322020-02-13 21:18:22 -080069 // Runs the OnRun callbacks.
70 void RunOnRun();
Alex Perrycb7da4b2019-08-28 19:35:56 -070071
Austin Schuh8bd96322020-02-13 21:18:22 -080072 // Returns true if events are being handled.
73 inline bool is_running() const;
Alex Perrycb7da4b2019-08-28 19:35:56 -070074
Austin Schuh8bd96322020-02-13 21:18:22 -080075 // Returns the timestamp of the next event to trigger.
76 aos::monotonic_clock::time_point OldestEvent();
77 // Handles the next event.
78 void CallOldestEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -070079
Austin Schuh8bd96322020-02-13 21:18:22 -080080 // Converts a time to the distributed clock for scheduling and cross-node time
81 // measurement.
82 distributed_clock::time_point ToDistributedClock(
83 monotonic_clock::time_point time) const {
Austin Schuhbe69cf32020-08-27 11:38:33 -070084 return distributed_clock::epoch() +
85 std::chrono::duration_cast<std::chrono::nanoseconds>(
86 (time.time_since_epoch() - distributed_offset_) /
87 distributed_slope_);
Austin Schuh8bd96322020-02-13 21:18:22 -080088 }
89
90 // Takes the distributed time and converts it to the monotonic clock for this
91 // node.
92 monotonic_clock::time_point FromDistributedClock(
93 distributed_clock::time_point time) const {
Austin Schuhbe69cf32020-08-27 11:38:33 -070094 return monotonic_clock::epoch() +
95 std::chrono::duration_cast<std::chrono::nanoseconds>(
96 time.time_since_epoch() * distributed_slope_) +
97 distributed_offset_;
Austin Schuh8bd96322020-02-13 21:18:22 -080098 }
99
100 // Returns the current monotonic time on this node calculated from the
101 // distributed clock.
102 inline monotonic_clock::time_point monotonic_now() const;
103
104 // Sets the offset between the distributed and monotonic clock.
Austin Schuhbe69cf32020-08-27 11:38:33 -0700105 // monotonic = distributed * slope + offset;
106 void SetDistributedOffset(std::chrono::nanoseconds distributed_offset,
107 double distributed_slope) {
108 // TODO(austin): Use a starting point to improve precision.
109 // TODO(austin): Make slope be the slope of the offset, not the input,
110 // throught the calculation process.
111 distributed_offset_ = distributed_offset;
112 distributed_slope_ = distributed_slope;
Austin Schuh8bd96322020-02-13 21:18:22 -0800113
Austin Schuhbe69cf32020-08-27 11:38:33 -0700114 // Once we update the offset, now isn't going to be valid anymore.
115 // TODO(austin): Probably should instead use the piecewise linear function
116 // and evaluate it correctly.
117 monotonic_now_valid_ = false;
Austin Schuh8bd96322020-02-13 21:18:22 -0800118 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700119
120 private:
Austin Schuh8bd96322020-02-13 21:18:22 -0800121 friend class EventSchedulerScheduler;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700122 // Current execution time.
Austin Schuhbe69cf32020-08-27 11:38:33 -0700123 bool monotonic_now_valid_ = false;
124 monotonic_clock::time_point monotonic_now_ = monotonic_clock::epoch();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700125
Austin Schuh8bd96322020-02-13 21:18:22 -0800126 // Offset to the distributed clock.
127 // distributed = monotonic + offset;
Austin Schuhbe69cf32020-08-27 11:38:33 -0700128 std::chrono::nanoseconds distributed_offset_ = std::chrono::seconds(0);
129 double distributed_slope_ = 1.0;
Austin Schuh8bd96322020-02-13 21:18:22 -0800130
131 // List of functions to run (once) when running.
Austin Schuh39788ff2019-12-01 18:22:57 -0800132 std::vector<std::function<void()>> on_run_;
133
Alex Perrycb7da4b2019-08-28 19:35:56 -0700134 // Multimap holding times to run functions. These are stored in order, and
135 // the order is the callback tree.
136 ChannelType events_list_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800137
138 // Pointer to the actual scheduler.
139 EventSchedulerScheduler *scheduler_scheduler_ = nullptr;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700140};
141
Austin Schuh8bd96322020-02-13 21:18:22 -0800142// We need a heap of heaps...
143//
144// Events in a node have a very well defined progression of time. It is linear
145// and well represented by the monotonic clock.
146//
147// Events across nodes don't follow this well. Time skews between the two nodes
148// all the time. We also don't know the function ahead of time which converts
149// from each node's monotonic clock to the distributed clock (our unified base
150// time which is likely the average time between nodes).
151//
152// This pushes us towards merge sort. Sorting each node's events with a heap
153// like we used to be doing, and then sorting each of those nodes independently.
154class EventSchedulerScheduler {
155 public:
156 // Adds an event scheduler to the list.
157 void AddEventScheduler(EventScheduler *scheduler);
158
159 // Runs until there are no more events or Exit is called.
160 void Run();
161
162 // Stops running.
163 void Exit() { is_running_ = false; }
164
165 bool is_running() const { return is_running_; }
166
167 // Runs for a duration on the distributed clock. Time on the distributed
168 // clock should be very representative of time on each node, but won't be
169 // exactly the same.
170 void RunFor(distributed_clock::duration duration);
171
172 // Returns the current distributed time.
173 distributed_clock::time_point distributed_now() const { return now_; }
174
175 private:
176 // Handles running the OnRun functions.
177 void RunOnRun() {
178 CHECK(!is_running_);
179 is_running_ = true;
180 for (EventScheduler *scheduler : schedulers_) {
181 scheduler->RunOnRun();
182 }
183 }
184
185 // Returns the next event time and scheduler on which to run it.
186 std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
187
188 // True if we are running.
189 bool is_running_ = false;
190 // The current time.
191 distributed_clock::time_point now_ = distributed_clock::epoch();
192 // List of schedulers to run in sync.
193 std::vector<EventScheduler *> schedulers_;
194};
195
196inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
197 // Make sure we stay in sync.
Austin Schuhbe69cf32020-08-27 11:38:33 -0700198 if (monotonic_now_valid_) {
199 CHECK_NEAR(monotonic_now_,
200 FromDistributedClock(scheduler_scheduler_->distributed_now()),
201 std::chrono::nanoseconds(1));
202 return monotonic_now_;
203 } else {
204 return FromDistributedClock(scheduler_scheduler_->distributed_now());
205 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800206}
207
208inline bool EventScheduler::is_running() const {
209 return scheduler_scheduler_->is_running();
210}
211
Alex Perrycb7da4b2019-08-28 19:35:56 -0700212} // namespace aos
213
214#endif // AOS_EVENTS_EVENT_SCHEDULER_H_