blob: e6c732b8cce48077f94b99e7a782c9212b9703a7 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#ifndef AOS_EVENTS_EVENT_SCHEDULER_H_
2#define AOS_EVENTS_EVENT_SCHEDULER_H_
3
4#include <algorithm>
5#include <map>
6#include <memory>
7#include <unordered_set>
8#include <utility>
9#include <vector>
10
11#include "aos/events/event_loop.h"
Austin Schuh8bd96322020-02-13 21:18:22 -080012#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070013#include "aos/time/time.h"
14#include "glog/logging.h"
15
16namespace aos {
17
Austin Schuhac0771c2020-01-07 18:36:30 -080018// This clock is the basis for distributed time. It is used to synchronize time
19// between multiple nodes. This is a new type so conversions to and from the
20// monotonic and realtime clocks aren't implicit.
21class distributed_clock {
22 public:
23 typedef ::std::chrono::nanoseconds::rep rep;
24 typedef ::std::chrono::nanoseconds::period period;
25 typedef ::std::chrono::nanoseconds duration;
26 typedef ::std::chrono::time_point<distributed_clock> time_point;
27
28 // This clock is the base clock for the simulation and everything is synced to
29 // it. It never jumps.
30 static constexpr bool is_steady = true;
31
32 // Returns the epoch (0).
33 static constexpr time_point epoch() { return time_point(zero()); }
34
35 static constexpr duration zero() { return duration(0); }
36
37 static constexpr time_point min_time{
38 time_point(duration(::std::numeric_limits<duration::rep>::min()))};
39 static constexpr time_point max_time{
40 time_point(duration(::std::numeric_limits<duration::rep>::max()))};
41};
42
43std::ostream &operator<<(std::ostream &stream,
44 const aos::distributed_clock::time_point &now);
45
Austin Schuh8bd96322020-02-13 21:18:22 -080046class EventSchedulerScheduler;
47
Alex Perrycb7da4b2019-08-28 19:35:56 -070048class EventScheduler {
49 public:
50 using ChannelType =
Austin Schuh8bd96322020-02-13 21:18:22 -080051 std::multimap<monotonic_clock::time_point, std::function<void()>>;
Alex Perrycb7da4b2019-08-28 19:35:56 -070052 using Token = ChannelType::iterator;
53
54 // Schedule an event with a callback function
55 // Returns an iterator to the event
Austin Schuh8bd96322020-02-13 21:18:22 -080056 Token Schedule(monotonic_clock::time_point time,
Alex Perrycb7da4b2019-08-28 19:35:56 -070057 std::function<void()> callback);
58
Austin Schuh39788ff2019-12-01 18:22:57 -080059 // Schedules a callback when the event scheduler starts.
60 void ScheduleOnRun(std::function<void()> callback) {
61 on_run_.emplace_back(std::move(callback));
62 }
63
Alex Perrycb7da4b2019-08-28 19:35:56 -070064 Token InvalidToken() { return events_list_.end(); }
65
66 // Deschedule an event by its iterator
67 void Deschedule(Token token);
68
Austin Schuh8bd96322020-02-13 21:18:22 -080069 // Runs the OnRun callbacks.
70 void RunOnRun();
Alex Perrycb7da4b2019-08-28 19:35:56 -070071
Austin Schuh8bd96322020-02-13 21:18:22 -080072 // Returns true if events are being handled.
73 inline bool is_running() const;
Alex Perrycb7da4b2019-08-28 19:35:56 -070074
Austin Schuh8bd96322020-02-13 21:18:22 -080075 // Returns the timestamp of the next event to trigger.
76 aos::monotonic_clock::time_point OldestEvent();
77 // Handles the next event.
78 void CallOldestEvent();
Alex Perrycb7da4b2019-08-28 19:35:56 -070079
Austin Schuh8bd96322020-02-13 21:18:22 -080080 // Converts a time to the distributed clock for scheduling and cross-node time
81 // measurement.
82 distributed_clock::time_point ToDistributedClock(
83 monotonic_clock::time_point time) const {
84 return distributed_clock::epoch() + time.time_since_epoch() +
85 monotonic_offset_;
86 }
87
88 // Takes the distributed time and converts it to the monotonic clock for this
89 // node.
90 monotonic_clock::time_point FromDistributedClock(
91 distributed_clock::time_point time) const {
92 return monotonic_clock::epoch() + time.time_since_epoch() -
93 monotonic_offset_;
94 }
95
96 // Returns the current monotonic time on this node calculated from the
97 // distributed clock.
98 inline monotonic_clock::time_point monotonic_now() const;
99
100 // Sets the offset between the distributed and monotonic clock.
101 // distributed = monotonic + offset;
102 void SetDistributedOffset(std::chrono::nanoseconds monotonic_offset) {
103 monotonic_offset_ = monotonic_offset;
104 }
105
106 // Returns the offset used to convert to and from the distributed clock.
107 std::chrono::nanoseconds monotonic_offset() const {
108 return monotonic_offset_;
109 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700110
111 private:
Austin Schuh8bd96322020-02-13 21:18:22 -0800112 friend class EventSchedulerScheduler;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700113 // Current execution time.
Austin Schuh8bd96322020-02-13 21:18:22 -0800114 monotonic_clock::time_point now_ = monotonic_clock::epoch();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700115
Austin Schuh8bd96322020-02-13 21:18:22 -0800116 // Offset to the distributed clock.
117 // distributed = monotonic + offset;
118 std::chrono::nanoseconds monotonic_offset_ = std::chrono::seconds(0);
119
120 // List of functions to run (once) when running.
Austin Schuh39788ff2019-12-01 18:22:57 -0800121 std::vector<std::function<void()>> on_run_;
122
Alex Perrycb7da4b2019-08-28 19:35:56 -0700123 // Multimap holding times to run functions. These are stored in order, and
124 // the order is the callback tree.
125 ChannelType events_list_;
Austin Schuh8bd96322020-02-13 21:18:22 -0800126
127 // Pointer to the actual scheduler.
128 EventSchedulerScheduler *scheduler_scheduler_ = nullptr;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700129};
130
Austin Schuh8bd96322020-02-13 21:18:22 -0800131// We need a heap of heaps...
132//
133// Events in a node have a very well defined progression of time. It is linear
134// and well represented by the monotonic clock.
135//
136// Events across nodes don't follow this well. Time skews between the two nodes
137// all the time. We also don't know the function ahead of time which converts
138// from each node's monotonic clock to the distributed clock (our unified base
139// time which is likely the average time between nodes).
140//
141// This pushes us towards merge sort. Sorting each node's events with a heap
142// like we used to be doing, and then sorting each of those nodes independently.
143class EventSchedulerScheduler {
144 public:
145 // Adds an event scheduler to the list.
146 void AddEventScheduler(EventScheduler *scheduler);
147
148 // Runs until there are no more events or Exit is called.
149 void Run();
150
151 // Stops running.
152 void Exit() { is_running_ = false; }
153
154 bool is_running() const { return is_running_; }
155
156 // Runs for a duration on the distributed clock. Time on the distributed
157 // clock should be very representative of time on each node, but won't be
158 // exactly the same.
159 void RunFor(distributed_clock::duration duration);
160
161 // Returns the current distributed time.
162 distributed_clock::time_point distributed_now() const { return now_; }
163
164 private:
165 // Handles running the OnRun functions.
166 void RunOnRun() {
167 CHECK(!is_running_);
168 is_running_ = true;
169 for (EventScheduler *scheduler : schedulers_) {
170 scheduler->RunOnRun();
171 }
172 }
173
174 // Returns the next event time and scheduler on which to run it.
175 std::tuple<distributed_clock::time_point, EventScheduler *> OldestEvent();
176
177 // True if we are running.
178 bool is_running_ = false;
179 // The current time.
180 distributed_clock::time_point now_ = distributed_clock::epoch();
181 // List of schedulers to run in sync.
182 std::vector<EventScheduler *> schedulers_;
183};
184
185inline monotonic_clock::time_point EventScheduler::monotonic_now() const {
186 // Make sure we stay in sync.
187 CHECK_EQ(now_, FromDistributedClock(scheduler_scheduler_->distributed_now()));
188 return now_;
189}
190
191inline bool EventScheduler::is_running() const {
192 return scheduler_scheduler_->is_running();
193}
194
Alex Perrycb7da4b2019-08-28 19:35:56 -0700195} // namespace aos
196
197#endif // AOS_EVENTS_EVENT_SCHEDULER_H_