Sort messages between nodes properly
We used to assume the realtime clocks were in sync. This isn't
realistic. Use the timestamps on forwarded messages in each
direction to observe the network latency and the offset between nodes.
Since time is no longer exactly linear with all the adjustments, we need
to redo how events are scheduled. They can't be converted to the
distributed_clock once. They need to now be converted every time they
are compared between nodes.
Change-Id: I1888c1e6a12f475c321a73aa020b0dc0bab107b3
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index 26752b1..a9ae7c0 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -9,7 +9,7 @@
namespace aos {
EventScheduler::Token EventScheduler::Schedule(
- distributed_clock::time_point time, ::std::function<void()> callback) {
+ monotonic_clock::time_point time, ::std::function<void()> callback) {
return events_list_.emplace(time, callback);
}
@@ -17,43 +17,29 @@
events_list_.erase(token);
}
-void EventScheduler::RunFor(distributed_clock::duration duration) {
- const distributed_clock::time_point end_time =
- distributed_now() + duration;
- logging::ScopedLogRestorer prev_logger;
- is_running_ = true;
- for (std::function<void()> &on_run : on_run_) {
- on_run();
+aos::monotonic_clock::time_point EventScheduler::OldestEvent() {
+ if (events_list_.empty()) {
+ return monotonic_clock::max_time;
}
- on_run_.clear();
- while (!events_list_.empty() && is_running_) {
- auto iter = events_list_.begin();
- distributed_clock::time_point next_time = iter->first;
- if (next_time > end_time) {
- break;
- }
- now_ = iter->first;
- ::std::function<void()> callback = ::std::move(iter->second);
- events_list_.erase(iter);
- callback();
- }
- now_ = end_time;
+
+ return events_list_.begin()->first;
}
-void EventScheduler::Run() {
- logging::ScopedLogRestorer prev_logger;
- is_running_ = true;
+void EventScheduler::CallOldestEvent() {
+ CHECK_GT(events_list_.size(), 0u);
+ auto iter = events_list_.begin();
+ now_ = iter->first;
+
+ ::std::function<void()> callback = ::std::move(iter->second);
+ events_list_.erase(iter);
+ callback();
+}
+
+void EventScheduler::RunOnRun() {
for (std::function<void()> &on_run : on_run_) {
on_run();
}
on_run_.clear();
- while (!events_list_.empty() && is_running_) {
- auto iter = events_list_.begin();
- now_ = iter->first;
- ::std::function<void()> callback = ::std::move(iter->second);
- events_list_.erase(iter);
- callback();
- }
}
std::ostream &operator<<(std::ostream &stream,
@@ -63,4 +49,92 @@
return stream;
}
+void EventSchedulerScheduler::AddEventScheduler(EventScheduler *scheduler) {
+ CHECK(std::find(schedulers_.begin(), schedulers_.end(), scheduler) ==
+ schedulers_.end());
+ CHECK(scheduler->scheduler_scheduler_ == nullptr);
+
+ schedulers_.emplace_back(scheduler);
+ scheduler->scheduler_scheduler_ = this;
+}
+
+void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
+ distributed_clock::time_point end_time = now_ + duration;
+ logging::ScopedLogRestorer prev_logger;
+ RunOnRun();
+
+ // Run all the sub-event-schedulers.
+ while (is_running_) {
+ std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
+ OldestEvent();
+ // No events left, bail.
+ if (std::get<0>(oldest_event) == distributed_clock::max_time ||
+ std::get<0>(oldest_event) > end_time) {
+ is_running_ = false;
+ break;
+ }
+
+ // We get to pick our tradeoffs here. Either we assume that there are no
+ // backward step changes in our time function for each node, or we have to
+ // let time go backwards. This backwards time jump should be small, so we
+ // can check for it and bound it.
+ CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::milliseconds(100))
+ << ": Simulated time went backwards by too much. Please investigate.";
+ now_ = std::get<0>(oldest_event);
+
+ std::get<1>(oldest_event)->CallOldestEvent();
+ }
+
+ now_ = end_time;
+}
+
+void EventSchedulerScheduler::Run() {
+ logging::ScopedLogRestorer prev_logger;
+ RunOnRun();
+ // Run all the sub-event-schedulers.
+ while (is_running_) {
+ std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
+ OldestEvent();
+ // No events left, bail.
+ if (std::get<0>(oldest_event) == distributed_clock::max_time) {
+ break;
+ }
+
+ // We get to pick our tradeoffs here. Either we assume that there are no
+ // backward step changes in our time function for each node, or we have to
+ // let time go backwards. This backwards time jump should be small, so we
+ // can check for it and bound it.
+ CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::milliseconds(100))
+ << ": Simulated time went backwards by too much. Please investigate.";
+ now_ = std::get<0>(oldest_event);
+
+ std::get<1>(oldest_event)->CallOldestEvent();
+ }
+
+ is_running_ = false;
+}
+
+std::tuple<distributed_clock::time_point, EventScheduler *>
+EventSchedulerScheduler::OldestEvent() {
+ distributed_clock::time_point min_event_time = distributed_clock::max_time;
+ EventScheduler *min_scheduler = nullptr;
+
+ // TODO(austin): Don't linearly search... But for N=3, it is probably the
+ // fastest way to do this.
+ for (EventScheduler *scheduler : schedulers_) {
+ const monotonic_clock::time_point monotonic_event_time =
+ scheduler->OldestEvent();
+ if (monotonic_event_time != monotonic_clock::max_time) {
+ const distributed_clock::time_point event_time =
+ scheduler->ToDistributedClock(monotonic_event_time);
+ if (event_time < min_event_time) {
+ min_event_time = event_time;
+ min_scheduler = scheduler;
+ }
+ }
+ }
+
+ return std::make_tuple(min_event_time, min_scheduler);
+}
+
} // namespace aos