Sort messages between nodes properly
We used to assume the realtime clocks were in sync. This isn't
realistic. Use the timestamps on forwarded messages in each
direction to observe the network latency and the offset between nodes.
Since time is no longer exactly linear with all the adjustments, we need
to redo how events are scheduled. They can't be converted to the
distributed_clock once. They need to now be converted every time they
are compared between nodes.
Change-Id: I1888c1e6a12f475c321a73aa020b0dc0bab107b3
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index faa9fe9..cf58b46 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -36,7 +36,6 @@
public:
SimulatedWatcher(
SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
- NodeEventLoopFactory *node_event_loop_factory,
const Channel *channel,
std::function<void(const Context &context, const void *message)> fn);
@@ -60,7 +59,6 @@
SimulatedEventLoop *simulated_event_loop_;
EventHandler<SimulatedWatcher> event_;
EventScheduler *scheduler_;
- NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
SimulatedChannel *simulated_channel_ = nullptr;
};
@@ -272,7 +270,6 @@
class SimulatedTimerHandler : public TimerHandler {
public:
explicit SimulatedTimerHandler(EventScheduler *scheduler,
- NodeEventLoopFactory *node_event_loop_factory,
SimulatedEventLoop *simulated_event_loop,
::std::function<void()> fn);
~SimulatedTimerHandler() { Disable(); }
@@ -288,7 +285,6 @@
SimulatedEventLoop *simulated_event_loop_;
EventHandler<SimulatedTimerHandler> event_;
EventScheduler *scheduler_;
- NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
monotonic_clock::time_point base_;
@@ -298,7 +294,6 @@
class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
public:
SimulatedPhasedLoopHandler(EventScheduler *scheduler,
- NodeEventLoopFactory *node_event_loop_factory,
SimulatedEventLoop *simulated_event_loop,
::std::function<void(int)> fn,
const monotonic_clock::duration interval,
@@ -314,7 +309,6 @@
EventHandler<SimulatedPhasedLoopHandler> event_;
EventScheduler *scheduler_;
- NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
};
@@ -387,17 +381,17 @@
TimerHandler *AddTimer(::std::function<void()> callback) override {
CHECK(!is_running());
- return NewTimer(::std::unique_ptr<TimerHandler>(new SimulatedTimerHandler(
- scheduler_, node_event_loop_factory_, this, callback)));
+ return NewTimer(::std::unique_ptr<TimerHandler>(
+ new SimulatedTimerHandler(scheduler_, this, callback)));
}
PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset =
::std::chrono::seconds(0)) override {
- return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
- new SimulatedPhasedLoopHandler(scheduler_, node_event_loop_factory_,
- this, callback, interval, offset)));
+ return NewPhasedLoop(
+ ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
+ scheduler_, this, callback, interval, offset)));
}
void OnRun(::std::function<void()> on_run) override {
@@ -483,8 +477,8 @@
std::function<void(const Context &channel, const void *message)> watcher) {
TakeWatcher(channel);
- std::unique_ptr<SimulatedWatcher> shm_watcher(new SimulatedWatcher(
- this, scheduler_, node_event_loop_factory_, channel, std::move(watcher)));
+ std::unique_ptr<SimulatedWatcher> shm_watcher(
+ new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
NewWatcher(std::move(shm_watcher));
@@ -526,13 +520,12 @@
SimulatedWatcher::SimulatedWatcher(
SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
- NodeEventLoopFactory *node_event_loop_factory, const Channel *channel,
+ const Channel *channel,
std::function<void(const Context &context, const void *message)> fn)
: WatcherState(simulated_event_loop, channel, std::move(fn)),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
- node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
SimulatedWatcher::~SimulatedWatcher() {
@@ -594,10 +587,9 @@
}
void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
- token_ = scheduler_->Schedule(
- node_event_loop_factory_->ToDistributedClock(
- event_time + simulated_event_loop_->send_delay()),
- [this]() { simulated_event_loop_->HandleEvent(); });
+ token_ =
+ scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
+ [this]() { simulated_event_loop_->HandleEvent(); });
}
void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
@@ -643,13 +635,12 @@
}
SimulatedTimerHandler::SimulatedTimerHandler(
- EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
- SimulatedEventLoop *simulated_event_loop, ::std::function<void()> fn)
+ EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
+ ::std::function<void()> fn)
: TimerHandler(simulated_event_loop, std::move(fn)),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
- node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
@@ -661,12 +652,10 @@
repeat_offset_ = repeat_offset;
if (base < monotonic_now) {
token_ = scheduler_->Schedule(
- node_event_loop_factory_->ToDistributedClock(monotonic_now),
- [this]() { simulated_event_loop_->HandleEvent(); });
+ monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
} else {
token_ = scheduler_->Schedule(
- node_event_loop_factory_->ToDistributedClock(base),
- [this]() { simulated_event_loop_->HandleEvent(); });
+ base, [this]() { simulated_event_loop_->HandleEvent(); });
}
event_.set_event_time(base_);
simulated_event_loop_->AddEvent(&event_);
@@ -683,8 +672,7 @@
// Reschedule.
while (base_ <= monotonic_now) base_ += repeat_offset_;
token_ = scheduler_->Schedule(
- node_event_loop_factory_->ToDistributedClock(base_),
- [this]() { simulated_event_loop_->HandleEvent(); });
+ base_, [this]() { simulated_event_loop_->HandleEvent(); });
event_.set_event_time(base_);
simulated_event_loop_->AddEvent(&event_);
} else {
@@ -703,15 +691,13 @@
}
SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
- EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
- SimulatedEventLoop *simulated_event_loop, ::std::function<void(int)> fn,
- const monotonic_clock::duration interval,
+ EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
+ ::std::function<void(int)> fn, const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
: PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
- node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
@@ -737,29 +723,27 @@
void SimulatedPhasedLoopHandler::Schedule(
monotonic_clock::time_point sleep_time) {
token_ = scheduler_->Schedule(
- node_event_loop_factory_->ToDistributedClock(sleep_time),
- [this]() { simulated_event_loop_->HandleEvent(); });
+ sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
event_.set_event_time(sleep_time);
simulated_event_loop_->AddEvent(&event_);
}
NodeEventLoopFactory::NodeEventLoopFactory(
- EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
- const Node *node,
+ EventSchedulerScheduler *scheduler_scheduler,
+ SimulatedEventLoopFactory *factory, const Node *node,
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
*raw_event_loops)
- : scheduler_(scheduler),
- factory_(factory),
- node_(node),
- raw_event_loops_(raw_event_loops) {}
+ : factory_(factory), node_(node), raw_event_loops_(raw_event_loops) {
+ scheduler_scheduler->AddEventScheduler(&scheduler_);
+}
SimulatedEventLoopFactory::SimulatedEventLoopFactory(
const Configuration *configuration)
: configuration_(CHECK_NOTNULL(configuration)),
nodes_(configuration::GetNodes(configuration_)) {
for (const Node *node : nodes_) {
- node_factories_.emplace_back(
- new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
+ node_factories_.emplace_back(new NodeEventLoopFactory(
+ &scheduler_scheduler_, this, node, &raw_event_loops_));
}
if (configuration::MultiNode(configuration)) {
@@ -797,11 +781,14 @@
::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
std::string_view name) {
+ CHECK(!scheduler_.is_running())
+ << ": Can't create an event loop while running";
+
pid_t tid = tid_;
++tid_;
::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
- scheduler_, this, &channels_, factory_->configuration(), raw_event_loops_,
- node_, tid));
+ &scheduler_, this, &channels_, factory_->configuration(),
+ raw_event_loops_, node_, tid));
result->set_name(name);
result->set_send_delay(factory_->send_delay());
return std::move(result);
@@ -812,7 +799,7 @@
raw_event_loops_) {
event_loop.second(true);
}
- scheduler_.RunFor(duration);
+ scheduler_scheduler_.RunFor(duration);
for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
raw_event_loops_) {
event_loop.second(false);
@@ -824,7 +811,7 @@
raw_event_loops_) {
event_loop.second(true);
}
- scheduler_.Run();
+ scheduler_scheduler_.Run();
for (const std::pair<EventLoop *, std::function<void(bool)>> &event_loop :
raw_event_loops_) {
event_loop.second(false);