Allow simulated nodes to startup after global startup
This lets us not boot nodes until their monotonic clock has reached
zero.
This also changes the semantics of OnStartup slightly--even if all the
nodes startup at the start of the simulation, they will each complete
their own startup sequence before going to the next node. This doesn't
appear to have had any negative consequences (and is similar
to if the nodes had tiny monotonic clock offsets that forced several of
the nodes to start late), but is a change.
Change-Id: I25d343b9509a3cdae6db9747f60a212f1cb21187
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index e7b9641..97c1e83 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -10,6 +10,7 @@
EventScheduler::Token EventScheduler::Schedule(monotonic_clock::time_point time,
Event *callback) {
+ CHECK_LE(monotonic_clock::epoch(), time);
return events_list_.emplace(time, callback);
}
@@ -35,6 +36,12 @@
}
aos::monotonic_clock::time_point EventScheduler::OldestEvent() {
+ // If we haven't started yet, schedule a special event for the epoch to allow
+ // ourselves to boot.
+ if (!called_started_) {
+ return aos::monotonic_clock::epoch();
+ }
+
if (events_list_.empty()) {
return monotonic_clock::max_time;
}
@@ -42,14 +49,27 @@
return events_list_.begin()->first;
}
-void EventScheduler::Shutdown() { on_shutdown_(); }
+void EventScheduler::Shutdown() {
+ CHECK(!is_running_);
+ on_shutdown_();
+}
void EventScheduler::Startup() {
++boot_count_;
- RunOnStartup();
+ CHECK(!is_running_);
+ MaybeRunOnStartup();
+ CHECK(called_started_);
}
void EventScheduler::CallOldestEvent() {
+ if (!called_started_) {
+ // If we haven't started, start.
+ MaybeRunOnStartup();
+ MaybeRunOnRun();
+ CHECK(called_started_);
+ return;
+ }
+ CHECK(is_running_);
CHECK_GT(events_list_.size(), 0u);
auto iter = events_list_.begin();
const logger::BootTimestamp t =
@@ -66,6 +86,7 @@
}
void EventScheduler::RunOnRun() {
+ CHECK(is_running_);
while (!on_run_.empty()) {
std::function<void()> fn = std::move(*on_run_.begin());
on_run_.erase(on_run_.begin());
@@ -75,6 +96,7 @@
void EventScheduler::RunOnStartup() noexcept {
while (!on_startup_.empty()) {
+ CHECK(!is_running_);
std::function<void()> fn = std::move(*on_startup_.begin());
on_startup_.erase(on_startup_.begin());
fn();
@@ -82,14 +104,39 @@
}
void EventScheduler::RunStarted() {
+ CHECK(!is_running_);
if (started_) {
started_();
}
+ is_running_ = true;
}
-void EventScheduler::RunStopped() {
- if (stopped_) {
- stopped_();
+void EventScheduler::MaybeRunStopped() {
+ CHECK(is_running_);
+ is_running_ = false;
+ if (called_started_) {
+ called_started_ = false;
+ if (stopped_) {
+ stopped_();
+ }
+ }
+}
+
+void EventScheduler::MaybeRunOnStartup() {
+ CHECK(!called_started_);
+ CHECK(!is_running_);
+ const logger::BootTimestamp t =
+ FromDistributedClock(scheduler_scheduler_->distributed_now());
+ if (t.boot == boot_count_ && t.time >= monotonic_clock::epoch()) {
+ called_started_ = true;
+ RunOnStartup();
+ }
+}
+
+void EventScheduler::MaybeRunOnRun() {
+ if (called_started_) {
+ RunStarted();
+ RunOnRun();
}
}
@@ -110,6 +157,88 @@
scheduler->scheduler_scheduler_ = this;
}
+void EventSchedulerScheduler::MaybeRunStopped() {
+ CHECK(!is_running_);
+ for (EventScheduler *scheduler : schedulers_) {
+ if (scheduler->is_running()) {
+ scheduler->MaybeRunStopped();
+ }
+ }
+}
+
+bool EventSchedulerScheduler::RunUntil(
+ realtime_clock::time_point end_time, EventScheduler *scheduler,
+ std::function<std::chrono::nanoseconds()> fn_realtime_offset) {
+ logging::ScopedLogRestorer prev_logger;
+ MaybeRunOnStartup();
+
+ bool reached_end_time = false;
+
+ RunMaybeRealtimeLoop([this, scheduler, end_time, fn_realtime_offset,
+ &reached_end_time]() {
+ std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
+ OldestEvent();
+ aos::distributed_clock::time_point oldest_event_time_distributed =
+ std::get<0>(oldest_event);
+ logger::BootTimestamp test_time_monotonic =
+ scheduler->FromDistributedClock(oldest_event_time_distributed);
+ realtime_clock::time_point oldest_event_realtime(
+ test_time_monotonic.time_since_epoch() + fn_realtime_offset());
+
+ if ((std::get<0>(oldest_event) == distributed_clock::max_time) ||
+ (oldest_event_realtime > end_time &&
+ (reboots_.empty() ||
+ std::get<0>(reboots_.front()) > oldest_event_time_distributed))) {
+ is_running_ = false;
+ reached_end_time = true;
+
+ // We have to nudge our time back to the distributed time
+ // corresponding to our desired realtime time.
+ const aos::monotonic_clock::time_point end_monotonic =
+ aos::monotonic_clock::epoch() + end_time.time_since_epoch() -
+ fn_realtime_offset();
+ const aos::distributed_clock::time_point end_time_distributed =
+ scheduler->ToDistributedClock(end_monotonic);
+
+ now_ = end_time_distributed;
+
+ return;
+ }
+
+ if (!reboots_.empty() &&
+ std::get<0>(reboots_.front()) <= std::get<0>(oldest_event)) {
+ // Reboot is next.
+ CHECK_LE(now_,
+ std::get<0>(reboots_.front()) + std::chrono::nanoseconds(1))
+ << ": Simulated time went backwards by too much. Please "
+ "investigate.";
+ now_ = std::get<0>(reboots_.front());
+ Reboot();
+ reboots_.erase(reboots_.begin());
+ return;
+ }
+
+ // We get to pick our tradeoffs here. Either we assume that there are
+ // no backward step changes in our time function for each node, or we
+ // have to let time go backwards. We currently only really see this
+ // happen when 2 events are scheduled for "now", time changes, and
+ // there is a nanosecond or two of rounding due to integer math.
+ //
+ // //aos/events/logging:logger_test triggers this.
+ CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
+ << ": Simulated time went backwards by too much. Please "
+ "investigate.";
+
+ now_ = std::get<0>(oldest_event);
+
+ std::get<1>(oldest_event)->CallOldestEvent();
+ });
+
+ MaybeRunStopped();
+
+ return reached_end_time;
+}
+
void EventSchedulerScheduler::Reboot() {
const std::vector<logger::BootTimestamp> × =
std::get<1>(reboots_.front());
@@ -131,7 +260,7 @@
rebooted.emplace_back(node_index);
CHECK_EQ(schedulers_[node_index]->boot_count() + 1,
times[node_index].boot);
- schedulers_[node_index]->RunStopped();
+ schedulers_[node_index]->MaybeRunStopped();
schedulers_[node_index]->Shutdown();
}
}
@@ -140,16 +269,10 @@
// (especially message_bridge), it could try to send stuff out. We want
// to move everything over to the new boot before doing that.
for (const size_t node_index : rebooted) {
- CHECK_EQ(schedulers_[node_index]->boot_count() + 1, times[node_index].boot);
schedulers_[node_index]->Startup();
}
-
for (const size_t node_index : rebooted) {
- schedulers_[node_index]->RunStarted();
- }
-
- for (const size_t node_index : rebooted) {
- schedulers_[node_index]->RunOnRun();
+ schedulers_[node_index]->MaybeRunOnRun();
}
is_running_ = true;
}
@@ -157,8 +280,7 @@
void EventSchedulerScheduler::RunFor(distributed_clock::duration duration) {
distributed_clock::time_point end_time = now_ + duration;
logging::ScopedLogRestorer prev_logger;
- RunOnStartup();
- RunOnRun();
+ MaybeRunOnStartup();
// Run all the sub-event-schedulers.
RunMaybeRealtimeLoop([this, end_time]() {
@@ -199,6 +321,7 @@
// //aos/events/logging:logger_test triggers this.
CHECK_LE(now_, std::get<0>(oldest_event) + std::chrono::nanoseconds(1))
<< ": Simulated time went backwards by too much. Please investigate.";
+ // push time forwards
now_ = std::get<0>(oldest_event);
std::get<1>(oldest_event)->CallOldestEvent();
@@ -206,15 +329,15 @@
now_ = end_time;
- RunStopped();
+ MaybeRunStopped();
}
void EventSchedulerScheduler::Run() {
logging::ScopedLogRestorer prev_logger;
- RunOnStartup();
- RunOnRun();
+ MaybeRunOnStartup();
+
+ // Run all the sub-event-schedulers.
RunMaybeRealtimeLoop([this]() {
- // Run all the sub-event-schedulers.
std::tuple<distributed_clock::time_point, EventScheduler *> oldest_event =
OldestEvent();
if (!reboots_.empty() &&
@@ -249,7 +372,7 @@
std::get<1>(oldest_event)->CallOldestEvent();
});
- RunStopped();
+ MaybeRunStopped();
}
template <typename F>
@@ -329,12 +452,23 @@
const bool was_running = is_running_;
if (is_running_) {
is_running_ = false;
- RunStopped();
+ MaybeRunStopped();
}
fn();
if (was_running) {
- RunOnStartup();
- RunOnRun();
+ MaybeRunOnStartup();
+ }
+}
+
+void EventSchedulerScheduler::MaybeRunOnStartup() {
+ is_running_ = true;
+ for (EventScheduler *scheduler : schedulers_) {
+ scheduler->MaybeRunOnStartup();
+ }
+ // We must trigger all the OnRun's *after* all the OnStartup callbacks are
+ // triggered because that is the contract that we have stated.
+ for (EventScheduler *scheduler : schedulers_) {
+ scheduler->MaybeRunOnRun();
}
}