blob: 668f2b6c706e467a8eac200977aa61771d53581a [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/event_scheduler.h"
2
3#include <algorithm>
4#include <deque>
5
6#include "aos/events/event_loop.h"
Tyler Chatow67ddb032020-01-12 14:30:04 -08007#include "aos/logging/implementations.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -07008
9namespace aos {
10
11EventScheduler::Token EventScheduler::Schedule(
Austin Schuh8bd96322020-02-13 21:18:22 -080012 monotonic_clock::time_point time, ::std::function<void()> callback) {
Alex Perrycb7da4b2019-08-28 19:35:56 -070013 return events_list_.emplace(time, callback);
14}
15
16void EventScheduler::Deschedule(EventScheduler::Token token) {
Brian Silvermanbd405c02020-06-23 16:25:23 -070017 // We basically want to DCHECK some nontrivial logic. Guard it with NDEBUG to ensure the compiler
18 // realizes it's all unnecessary when not doing debug checks.
19#ifndef NDEBUG
20 {
21 bool found = false;
22 auto i = events_list_.begin();
23 while (i != events_list_.end()) {
24 if (i == token) {
25 CHECK(!found) << ": The same iterator is in the multimap twice??";
26 found = true;
27 }
28 ++i;
29 }
30 CHECK(found) << ": Trying to deschedule an event which is not scheduled";
31 }
32#endif
Alex Perrycb7da4b2019-08-28 19:35:56 -070033 events_list_.erase(token);
34}
35
Austin Schuh8bd96322020-02-13 21:18:22 -080036aos::monotonic_clock::time_point EventScheduler::OldestEvent() {
37 if (events_list_.empty()) {
38 return monotonic_clock::max_time;
Austin Schuh39788ff2019-12-01 18:22:57 -080039 }
Austin Schuh8bd96322020-02-13 21:18:22 -080040
41 return events_list_.begin()->first;
Alex Perrycb7da4b2019-08-28 19:35:56 -070042}
43
Austin Schuh8bd96322020-02-13 21:18:22 -080044void EventScheduler::CallOldestEvent() {
45 CHECK_GT(events_list_.size(), 0u);
46 auto iter = events_list_.begin();
Austin Schuh87dd3832021-01-01 23:07:31 -080047 CHECK_EQ(monotonic_now(), iter->first)
48 << ": Time is wrong on node " << node_index_;
Austin Schuh8bd96322020-02-13 21:18:22 -080049
50 ::std::function<void()> callback = ::std::move(iter->second);
51 events_list_.erase(iter);
52 callback();
Austin Schuhb7c8d2a2021-07-19 19:22:12 -070053
54 converter_->ObserveTimePassed(scheduler_scheduler_->distributed_now());
Austin Schuh8bd96322020-02-13 21:18:22 -080055}
56
57void EventScheduler::RunOnRun() {
Austin Schuh39788ff2019-12-01 18:22:57 -080058 for (std::function<void()> &on_run : on_run_) {
59 on_run();
60 }
61 on_run_.clear();
Alex Perrycb7da4b2019-08-28 19:35:56 -070062}
63
Austin Schuh057d29f2021-08-21 23:05:15 -070064void EventScheduler::RunOnStartup() {
65 for (size_t i = 0; i < on_startup_.size(); ++i) {
66 on_startup_[i]();
67 }
68 on_startup_.clear();
69}
70
Austin Schuhac0771c2020-01-07 18:36:30 -080071std::ostream &operator<<(std::ostream &stream,
72 const aos::distributed_clock::time_point &now) {
73 // Print it the same way we print a monotonic time. Literally.
74 stream << monotonic_clock::time_point(now.time_since_epoch());
75 return stream;
76}
77
Austin Schuh8bd96322020-02-13 21:18:22 -080078void EventSchedulerScheduler::AddEventScheduler(EventScheduler *scheduler) {
79 CHECK(std::find(schedulers_.begin(), schedulers_.end(), scheduler) ==
80 schedulers_.end());
81 CHECK(scheduler->scheduler_scheduler_ == nullptr);
82
83 schedulers_.emplace_back(scheduler);
84 scheduler->scheduler_scheduler_ = this;
85}
86
87void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
88 distributed_clock::time_point end_time = now_ + duration;
89 logging::ScopedLogRestorer prev_logger;
90 RunOnRun();
91
92 // Run all the sub-event-schedulers.
93 while (is_running_) {
94 std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
95 OldestEvent();
96 // No events left, bail.
97 if (std::get<0>(oldest_event) == distributed_clock::max_time ||
98 std::get<0>(oldest_event) > end_time) {
99 is_running_ = false;
100 break;
101 }
102
103 // We get to pick our tradeoffs here. Either we assume that there are no
104 // backward step changes in our time function for each node, or we have to
Austin Schuh2f8fd752020-09-01 22:38:28 -0700105 // let time go backwards. We currently only really see this happen when 2
106 // events are scheduled for "now", time changes, and there is a nanosecond
107 // or two of rounding due to integer math.
108 //
109 // //aos/events/logging:logger_test triggers this.
110 CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
Austin Schuh8bd96322020-02-13 21:18:22 -0800111 << ": Simulated time went backwards by too much. Please investigate.";
112 now_ = std::get<0>(oldest_event);
113
114 std::get<1>(oldest_event)->CallOldestEvent();
115 }
116
117 now_ = end_time;
118}
119
120void EventSchedulerScheduler::Run() {
121 logging::ScopedLogRestorer prev_logger;
122 RunOnRun();
123 // Run all the sub-event-schedulers.
124 while (is_running_) {
125 std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
126 OldestEvent();
127 // No events left, bail.
128 if (std::get<0>(oldest_event) == distributed_clock::max_time) {
129 break;
130 }
131
132 // We get to pick our tradeoffs here. Either we assume that there are no
133 // backward step changes in our time function for each node, or we have to
Austin Schuh2f8fd752020-09-01 22:38:28 -0700134 // let time go backwards. We currently only really see this happen when 2
135 // events are scheduled for "now", time changes, and there is a nanosecond
136 // or two of rounding due to integer math.
137 //
138 // //aos/events/logging:logger_test triggers this.
139 CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
Austin Schuh8bd96322020-02-13 21:18:22 -0800140 << ": Simulated time went backwards by too much. Please investigate.";
141 now_ = std::get<0>(oldest_event);
142
143 std::get<1>(oldest_event)->CallOldestEvent();
144 }
145
146 is_running_ = false;
147}
148
149std::tuple<distributed_clock::time_point, EventScheduler *>
150EventSchedulerScheduler::OldestEvent() {
151 distributed_clock::time_point min_event_time = distributed_clock::max_time;
152 EventScheduler *min_scheduler = nullptr;
153
154 // TODO(austin): Don't linearly search... But for N=3, it is probably the
155 // fastest way to do this.
156 for (EventScheduler *scheduler : schedulers_) {
157 const monotonic_clock::time_point monotonic_event_time =
158 scheduler->OldestEvent();
159 if (monotonic_event_time != monotonic_clock::max_time) {
160 const distributed_clock::time_point event_time =
161 scheduler->ToDistributedClock(monotonic_event_time);
162 if (event_time < min_event_time) {
163 min_event_time = event_time;
164 min_scheduler = scheduler;
165 }
166 }
167 }
168
Austin Schuh87dd3832021-01-01 23:07:31 -0800169 if (min_scheduler) {
170 VLOG(1) << "Oldest event " << min_event_time << " on scheduler "
171 << min_scheduler->node_index_;
172 }
Austin Schuh8bd96322020-02-13 21:18:22 -0800173 return std::make_tuple(min_event_time, min_scheduler);
174}
175
Alex Perrycb7da4b2019-08-28 19:35:56 -0700176} // namespace aos