blob: c06638c4285a76cc4c1459227253043a6559678b [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/event_scheduler.h"
2
3#include <algorithm>
4#include <deque>
5
6#include "aos/events/event_loop.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08007#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -07008
9namespace aos {
10
Austin Schuhef8f1ae2021-12-11 12:35:05 -080011EventScheduler::Token EventScheduler::Schedule(monotonic_clock::time_point time,
12 Event *callback) {
Alex Perrycb7da4b2019-08-28 19:35:56 -070013 return events_list_.emplace(time, callback);
14}
15
16void EventScheduler::Deschedule(EventScheduler::Token token) {
Brian Silverman7026e2d2021-11-11 16:15:35 -080017 // We basically want to DCHECK some nontrivial logic. Guard it with NDEBUG to
18 // ensure the compiler realizes it's all unnecessary when not doing debug
19 // checks.
Brian Silvermanbd405c02020-06-23 16:25:23 -070020#ifndef NDEBUG
21 {
22 bool found = false;
23 auto i = events_list_.begin();
24 while (i != events_list_.end()) {
25 if (i == token) {
26 CHECK(!found) << ": The same iterator is in the multimap twice??";
27 found = true;
28 }
29 ++i;
30 }
31 CHECK(found) << ": Trying to deschedule an event which is not scheduled";
32 }
33#endif
Alex Perrycb7da4b2019-08-28 19:35:56 -070034 events_list_.erase(token);
35}
36
Austin Schuh8bd96322020-02-13 21:18:22 -080037aos::monotonic_clock::time_point EventScheduler::OldestEvent() {
38 if (events_list_.empty()) {
39 return monotonic_clock::max_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080040 }
Austin Schuh8bd96322020-02-13 21:18:22 -080041
42 return events_list_.begin()->first;
Alex Perrycb7da4b2019-08-28 19:35:56 -070043}
44
Brian Silverman7026e2d2021-11-11 16:15:35 -080045void EventScheduler::Shutdown() { on_shutdown_(); }
Austin Schuh58646e22021-08-23 23:51:46 -070046
47void EventScheduler::Startup() {
48 ++boot_count_;
49 RunOnStartup();
50}
51
Austin Schuh8bd96322020-02-13 21:18:22 -080052void EventScheduler::CallOldestEvent() {
53 CHECK_GT(events_list_.size(), 0u);
54 auto iter = events_list_.begin();
Austin Schuh58646e22021-08-23 23:51:46 -070055 const logger::BootTimestamp t =
56 FromDistributedClock(scheduler_scheduler_->distributed_now());
57 VLOG(1) << "Got time back " << t;
58 CHECK_EQ(t.boot, boot_count_);
59 CHECK_EQ(t.time, iter->first) << ": Time is wrong on node " << node_index_;
Austin Schuh8bd96322020-02-13 21:18:22 -080060
Austin Schuhef8f1ae2021-12-11 12:35:05 -080061 Event *callback = iter->second;
Austin Schuh8bd96322020-02-13 21:18:22 -080062 events_list_.erase(iter);
Austin Schuhef8f1ae2021-12-11 12:35:05 -080063 callback->Handle();
Austin Schuhb7c8d2a2021-07-19 19:22:12 -070064
65 converter_->ObserveTimePassed(scheduler_scheduler_->distributed_now());
Austin Schuh8bd96322020-02-13 21:18:22 -080066}
67
68void EventScheduler::RunOnRun() {
Austin Schuh39788ff2019-12-01 18:22:57 -080069 for (std::function<void()> &on_run : on_run_) {
70 on_run();
71 }
72 on_run_.clear();
Alex Perrycb7da4b2019-08-28 19:35:56 -070073}
74
Austin Schuh057d29f2021-08-21 23:05:15 -070075void EventScheduler::RunOnStartup() {
76 for (size_t i = 0; i < on_startup_.size(); ++i) {
77 on_startup_[i]();
78 }
79 on_startup_.clear();
80}
81
Austin Schuh58646e22021-08-23 23:51:46 -070082void EventScheduler::RunStarted() { started_(); }
83
Austin Schuhac0771c2020-01-07 18:36:30 -080084std::ostream &operator<<(std::ostream &stream,
85 const aos::distributed_clock::time_point &now) {
86 // Print it the same way we print a monotonic time. Literally.
87 stream << monotonic_clock::time_point(now.time_since_epoch());
88 return stream;
89}
90
Austin Schuh8bd96322020-02-13 21:18:22 -080091void EventSchedulerScheduler::AddEventScheduler(EventScheduler *scheduler) {
92 CHECK(std::find(schedulers_.begin(), schedulers_.end(), scheduler) ==
93 schedulers_.end());
94 CHECK(scheduler->scheduler_scheduler_ == nullptr);
Austin Schuh58646e22021-08-23 23:51:46 -070095 CHECK_EQ(scheduler->node_index(), schedulers_.size());
Austin Schuh8bd96322020-02-13 21:18:22 -080096
97 schedulers_.emplace_back(scheduler);
98 scheduler->scheduler_scheduler_ = this;
99}
100
Austin Schuh58646e22021-08-23 23:51:46 -0700101void EventSchedulerScheduler::Reboot() {
102 const std::vector<logger::BootTimestamp> &times =
103 std::get<1>(reboots_.front());
104 CHECK_EQ(times.size(), schedulers_.size());
105
106 VLOG(1) << "Rebooting at " << now_;
107 for (const auto &time : times) {
108 VLOG(1) << " " << time;
109 }
110
111 is_running_ = false;
112
113 // Shut everything down.
114 std::vector<size_t> rebooted;
115 for (size_t node_index = 0; node_index < schedulers_.size(); ++node_index) {
116 if (schedulers_[node_index]->boot_count() == times[node_index].boot) {
117 continue;
118 } else {
119 rebooted.emplace_back(node_index);
120 CHECK_EQ(schedulers_[node_index]->boot_count() + 1,
121 times[node_index].boot);
122 schedulers_[node_index]->Shutdown();
123 }
124 }
125
126 // And start it back up again to reboot. When something starts back up
127 // (especially message_bridge), it could try to send stuff out. We want
128 // to move everything over to the new boot before doing that.
129 for (const size_t node_index : rebooted) {
130 CHECK_EQ(schedulers_[node_index]->boot_count() + 1, times[node_index].boot);
131 schedulers_[node_index]->Startup();
132 }
133
134 for (const size_t node_index : rebooted) {
135 schedulers_[node_index]->RunStarted();
136 }
137
138 for (const size_t node_index : rebooted) {
139 schedulers_[node_index]->RunOnRun();
140 }
141 is_running_ = true;
142}
143
Austin Schuh8bd96322020-02-13 21:18:22 -0800144void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
145 distributed_clock::time_point end_time = now_ + duration;
146 logging::ScopedLogRestorer prev_logger;
147 RunOnRun();
148
149 // Run all the sub-event-schedulers.
150 while (is_running_) {
151 std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
152 OldestEvent();
Austin Schuh58646e22021-08-23 23:51:46 -0700153 if (!reboots_.empty() &&
154 std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
155 // Reboot is next.
156 if (std::get<0>(reboots_.front()) > end_time) {
157 // Reboot is after our end time, give up.
158 is_running_ = false;
159 break;
160 }
161
162 CHECK_LE(now_,
163 std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
164 << ": Simulated time went backwards by too much. Please "
165 "investigate.";
166 now_ = std::get<0>(reboots_.front());
167 Reboot();
168 reboots_.erase(reboots_.begin());
169 continue;
170 }
171
Austin Schuh8bd96322020-02-13 21:18:22 -0800172 // No events left, bail.
173 if (std::get<0>(oldest_event) == distributed_clock::max_time ||
174 std::get<0>(oldest_event) > end_time) {
175 is_running_ = false;
176 break;
177 }
178
179 // We get to pick our tradeoffs here. Either we assume that there are no
180 // backward step changes in our time function for each node, or we have to
Austin Schuh2f8fd752020-09-01 22:38:28 -0700181 // let time go backwards. We currently only really see this happen when 2
182 // events are scheduled for "now", time changes, and there is a nanosecond
183 // or two of rounding due to integer math.
184 //
185 // //aos/events/logging:logger_test triggers this.
186 CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
Austin Schuh8bd96322020-02-13 21:18:22 -0800187 << ": Simulated time went backwards by too much. Please investigate.";
188 now_ = std::get<0>(oldest_event);
189
190 std::get<1>(oldest_event)->CallOldestEvent();
191 }
192
193 now_ = end_time;
194}
195
196void EventSchedulerScheduler::Run() {
197 logging::ScopedLogRestorer prev_logger;
198 RunOnRun();
199 // Run all the sub-event-schedulers.
200 while (is_running_) {
201 std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
202 OldestEvent();
Austin Schuh58646e22021-08-23 23:51:46 -0700203 if (!reboots_.empty() &&
204 std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
205 // Reboot is next.
206 CHECK_LE(now_,
207 std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
208 << ": Simulated time went backwards by too much. Please "
209 "investigate.";
210 now_ = std::get<0>(reboots_.front());
211 Reboot();
212 reboots_.erase(reboots_.begin());
213 continue;
214 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800215 // No events left, bail.
216 if (std::get<0>(oldest_event) == distributed_clock::max_time) {
217 break;
218 }
219
220 // We get to pick our tradeoffs here. Either we assume that there are no
221 // backward step changes in our time function for each node, or we have to
Austin Schuh2f8fd752020-09-01 22:38:28 -0700222 // let time go backwards. We currently only really see this happen when 2
223 // events are scheduled for "now", time changes, and there is a nanosecond
224 // or two of rounding due to integer math.
225 //
226 // //aos/events/logging:logger_test triggers this.
227 CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
Austin Schuh8bd96322020-02-13 21:18:22 -0800228 << ": Simulated time went backwards by too much. Please investigate.";
229 now_ = std::get<0>(oldest_event);
230
231 std::get<1>(oldest_event)->CallOldestEvent();
232 }
233
234 is_running_ = false;
235}
236
237std::tuple<distributed_clock::time_point, EventScheduler *>
238EventSchedulerScheduler::OldestEvent() {
239 distributed_clock::time_point min_event_time = distributed_clock::max_time;
240 EventScheduler *min_scheduler = nullptr;
241
242 // TODO(austin): Don't linearly search... But for N=3, it is probably the
243 // fastest way to do this.
244 for (EventScheduler *scheduler : schedulers_) {
245 const monotonic_clock::time_point monotonic_event_time =
246 scheduler->OldestEvent();
247 if (monotonic_event_time != monotonic_clock::max_time) {
248 const distributed_clock::time_point event_time =
249 scheduler->ToDistributedClock(monotonic_event_time);
250 if (event_time < min_event_time) {
251 min_event_time = event_time;
252 min_scheduler = scheduler;
253 }
254 }
255 }
256
Austin Schuh87dd3832021-01-01 23:07:31 -0800257 if (min_scheduler) {
258 VLOG(1) << "Oldest event " << min_event_time << " on scheduler "
259 << min_scheduler->node_index_;
260 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800261 return std::make_tuple(min_event_time, min_scheduler);
262}
263
Alex Perrycb7da4b2019-08-28 19:35:56 -0700264} // namespace aos