Add NodeEventLoopFactory
This lets us create event loops on separate nodes which can't
communicate with each other. Next step is to add a message proxy
between them, then teach the logger to replay onto multiple nodes.
Change-Id: I06b2836365aea13d696535c52a78ca0c862a7b1e
diff --git a/aos/configuration.cc b/aos/configuration.cc
index a40c47c..095add5 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -591,6 +591,7 @@
}
return nullptr;
}
+
const Node *GetMyNode(const Configuration *config) {
const std::string hostname = (FLAGS_override_hostname.size() > 0)
? FLAGS_override_hostname
@@ -616,6 +617,8 @@
return nullptr;
}
+bool MultiNode(const Configuration *config) { return config->has_nodes(); }
+
bool ChannelIsSendableOnNode(const Channel *channel, const Node *node) {
if (node == nullptr) {
return true;
diff --git a/aos/configuration.h b/aos/configuration.h
index ef30fce..88687ad 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -69,6 +69,9 @@
const Node *GetNodeFromHostname(const Configuration *config,
std::string_view name);
+// Returns true if we are running in a multinode configuration.
+bool MultiNode(const Configuration *config);
+
// Returns true if the provided channel is sendable on the provided node.
bool ChannelIsSendableOnNode(const Channel *channel, const Node *node);
// Returns true if the provided channel is able to be watched or fetched on the
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index ea5fd3d..4f24d78 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -90,17 +90,17 @@
flatbuffer_ = FlatbufferDetachedBuffer<Configuration>(
JsonToFlatbuffer(json, Configuration::MiniReflectTypeTable()));
- my_node_ = my_node;
+ my_node_ = configuration::GetNode(&flatbuffer_.message(), my_node);
}
- std::string_view my_node() const { return my_node_; }
+ const Node *my_node() const { return my_node_; }
const Configuration *configuration() { return &flatbuffer_.message(); }
private:
FlatbufferDetachedBuffer<Configuration> flatbuffer_;
- std::string my_node_;
+ const Node *my_node_ = nullptr;
};
class AbstractEventLoopTestBase
@@ -134,7 +134,7 @@
const Configuration *configuration() { return factory_->configuration(); }
- std::string_view my_node() const { return factory_->my_node(); }
+ const Node *my_node() const { return factory_->my_node(); }
// Ends the given event loop at the given time from now.
void EndEventLoop(EventLoop *loop, ::std::chrono::milliseconds duration) {
diff --git a/aos/events/event_scheduler.cc b/aos/events/event_scheduler.cc
index b5a530e..57f20ae 100644
--- a/aos/events/event_scheduler.cc
+++ b/aos/events/event_scheduler.cc
@@ -8,7 +8,7 @@
namespace aos {
EventScheduler::Token EventScheduler::Schedule(
- ::aos::monotonic_clock::time_point time, ::std::function<void()> callback) {
+ distributed_clock::time_point time, ::std::function<void()> callback) {
return events_list_.emplace(time, callback);
}
@@ -16,9 +16,9 @@
events_list_.erase(token);
}
-void EventScheduler::RunFor(monotonic_clock::duration duration) {
- const ::aos::monotonic_clock::time_point end_time =
- monotonic_now() + duration;
+void EventScheduler::RunFor(distributed_clock::duration duration) {
+ const distributed_clock::time_point end_time =
+ distributed_now() + duration;
is_running_ = true;
for (std::function<void()> &on_run : on_run_) {
on_run();
@@ -26,7 +26,7 @@
on_run_.clear();
while (!events_list_.empty() && is_running_) {
auto iter = events_list_.begin();
- ::aos::monotonic_clock::time_point next_time = iter->first;
+ distributed_clock::time_point next_time = iter->first;
if (next_time > end_time) {
break;
}
@@ -53,4 +53,11 @@
}
}
+std::ostream &operator<<(std::ostream &stream,
+ const aos::distributed_clock::time_point &now) {
+ // Print it the same way we print a monotonic time. Literally.
+ stream << monotonic_clock::time_point(now.time_since_epoch());
+ return stream;
+}
+
} // namespace aos
diff --git a/aos/events/event_scheduler.h b/aos/events/event_scheduler.h
index 432f4ad..400a307 100644
--- a/aos/events/event_scheduler.h
+++ b/aos/events/event_scheduler.h
@@ -14,15 +14,43 @@
namespace aos {
+// This clock is the basis for distributed time. It is used to synchronize time
+// between multiple nodes. This is a new type so conversions to and from the
+// monotonic and realtime clocks aren't implicit.
+class distributed_clock {
+ public:
+ typedef ::std::chrono::nanoseconds::rep rep;
+ typedef ::std::chrono::nanoseconds::period period;
+ typedef ::std::chrono::nanoseconds duration;
+ typedef ::std::chrono::time_point<distributed_clock> time_point;
+
+ // This clock is the base clock for the simulation and everything is synced to
+ // it. It never jumps.
+ static constexpr bool is_steady = true;
+
+ // Returns the epoch (0).
+ static constexpr time_point epoch() { return time_point(zero()); }
+
+ static constexpr duration zero() { return duration(0); }
+
+ static constexpr time_point min_time{
+ time_point(duration(::std::numeric_limits<duration::rep>::min()))};
+ static constexpr time_point max_time{
+ time_point(duration(::std::numeric_limits<duration::rep>::max()))};
+};
+
+std::ostream &operator<<(std::ostream &stream,
+ const aos::distributed_clock::time_point &now);
+
class EventScheduler {
public:
using ChannelType =
- std::multimap<monotonic_clock::time_point, std::function<void()>>;
+ std::multimap<distributed_clock::time_point, std::function<void()>>;
using Token = ChannelType::iterator;
// Schedule an event with a callback function
// Returns an iterator to the event
- Token Schedule(monotonic_clock::time_point time,
+ Token Schedule(distributed_clock::time_point time,
std::function<void()> callback);
// Schedules a callback when the event scheduler starts.
@@ -38,30 +66,17 @@
// Runs until exited.
void Run();
// Runs for a duration.
- void RunFor(monotonic_clock::duration duration);
+ void RunFor(distributed_clock::duration duration);
void Exit() { is_running_ = false; }
bool is_running() const { return is_running_; }
- monotonic_clock::time_point monotonic_now() const { return now_; }
-
- realtime_clock::time_point realtime_now() const {
- return realtime_clock::time_point(monotonic_now().time_since_epoch() +
- realtime_offset_);
- }
-
- // Sets realtime clock to realtime_now for a given monotonic clock.
- void SetRealtimeOffset(monotonic_clock::time_point monotonic_now,
- realtime_clock::time_point realtime_now) {
- realtime_offset_ =
- realtime_now.time_since_epoch() - monotonic_now.time_since_epoch();
- }
+ distributed_clock::time_point distributed_now() const { return now_; }
private:
// Current execution time.
- monotonic_clock::time_point now_ = monotonic_clock::epoch();
- std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
+ distributed_clock::time_point now_ = distributed_clock::epoch();
std::vector<std::function<void()>> on_run_;
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 1d1dd8b..80ec8e7 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -31,7 +31,7 @@
reader.Register();
std::unique_ptr<aos::EventLoop> printer_event_loop =
- reader.event_loop_factory()->MakeEventLoop("printer");
+ reader.event_loop_factory()->MakeEventLoop("printer", reader.node());
printer_event_loop->SkipTimingReport();
bool found_channel = false;
@@ -66,7 +66,7 @@
<< aos::FlatbufferToJson(
channel->schema(),
static_cast<const uint8_t *>(message))
- << '\n';
+ << std::endl;
} else {
std::cout << context.realtime_event_time << " ("
<< context.monotonic_event_time << ") "
@@ -75,7 +75,7 @@
<< aos::FlatbufferToJson(
channel->schema(),
static_cast<const uint8_t *>(message))
- << '\n';
+ << std::endl;
}
});
found_channel = true;
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 51dc10c..2de17d7 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -276,13 +276,16 @@
void LogReader::Register() {
event_loop_factory_unique_ptr_ =
- std::make_unique<SimulatedEventLoopFactory>(configuration(), node());
+ std::make_unique<SimulatedEventLoopFactory>(configuration());
Register(event_loop_factory_unique_ptr_.get());
}
void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
- event_loop_unique_ptr_ = event_loop_factory_->MakeEventLoop("log_reader");
+ node_event_loop_factory_ =
+ event_loop_factory_->GetNodeEventLoopFactory(node());
+ event_loop_unique_ptr_ =
+ event_loop_factory->MakeEventLoop("log_reader", node());
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
// This is only really relevant when we are replaying into a simulation.
@@ -355,8 +358,8 @@
"this.";
// If we have access to the factory, use it to fix the realtime time.
- if (event_loop_factory_ != nullptr) {
- event_loop_factory_->SetRealtimeOffset(
+ if (node_event_loop_factory_ != nullptr) {
+ node_event_loop_factory_->SetRealtimeOffset(
monotonic_clock::time_point(chrono::nanoseconds(
channel_data.message().monotonic_sent_time())),
realtime_clock::time_point(chrono::nanoseconds(
@@ -410,6 +413,7 @@
event_loop_ = nullptr;
event_loop_factory_unique_ptr_.reset();
event_loop_factory_ = nullptr;
+ node_event_loop_factory_ = nullptr;
}
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
@@ -434,6 +438,9 @@
}
void LogReader::MakeRemappedConfig() {
+ CHECK(!event_loop_)
+ << ": Can't change the mapping after the events are scheduled.";
+
// If no remapping occurred and we are using the original config, then there
// is nothing interesting to do here.
if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 337109b..54b55d8 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -152,6 +152,7 @@
std::vector<std::unique_ptr<RawSender>> channels_;
std::unique_ptr<EventLoop> event_loop_unique_ptr_;
+ NodeEventLoopFactory *node_event_loop_factory_ = nullptr;
EventLoop *event_loop_ = nullptr;
TimerHandler *timer_handler_;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 5323493..55d0ecc 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -208,8 +208,10 @@
MultinodeLoggerTest()
: config_(aos::configuration::ReadConfig(
"aos/events/logging/multinode_pingpong_config.json")),
- event_loop_factory_(&config_.message(), "pi1"),
- ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+ event_loop_factory_(&config_.message()),
+ ping_event_loop_(event_loop_factory_.MakeEventLoop(
+ "ping", configuration::GetNode(event_loop_factory_.configuration(),
+ "pi1"))),
ping_(ping_event_loop_.get()) {}
// Config and factory.
@@ -233,8 +235,10 @@
LOG(INFO) << "Logging data to " << logfile;
{
+ const Node *pi1 =
+ configuration::GetNode(event_loop_factory_.configuration(), "pi1");
std::unique_ptr<EventLoop> pong_event_loop =
- event_loop_factory_.MakeEventLoop("pong");
+ event_loop_factory_.MakeEventLoop("pong", pi1);
std::unique_ptr<aos::RawSender> pong_sender(
pong_event_loop->MakeRawSender(aos::configuration::GetChannel(
@@ -262,7 +266,7 @@
DetachedBufferWriter writer(logfile);
std::unique_ptr<EventLoop> logger_event_loop =
- event_loop_factory_.MakeEventLoop("logger");
+ event_loop_factory_.MakeEventLoop("logger", pi1);
event_loop_factory_.RunFor(chrono::milliseconds(95));
@@ -276,20 +280,23 @@
// TODO(austin): Also replay as pi2 or pi3 and make sure we see the pong
// messages. This won't work today yet until the log reading code gets
// significantly better.
- SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration(), reader.node());
+ SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
// This sends out the fetched messages and advances time to the start of the
// log file.
reader.Register(&log_reader_factory);
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+
ASSERT_NE(reader.node(), nullptr);
EXPECT_EQ(reader.node()->name()->string_view(), "pi1");
reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
std::unique_ptr<EventLoop> test_event_loop =
- log_reader_factory.MakeEventLoop("test");
+ log_reader_factory.MakeEventLoop("test", pi1);
int ping_count = 10;
int pong_count = 10;
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index e34a23f..0d8a488 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -36,6 +36,7 @@
public:
SimulatedWatcher(
SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
const Channel *channel,
std::function<void(const Context &context, const void *message)> fn);
@@ -59,6 +60,7 @@
SimulatedEventLoop *simulated_event_loop_;
EventHandler<SimulatedWatcher> event_;
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
SimulatedChannel *simulated_channel_ = nullptr;
};
@@ -102,10 +104,6 @@
const Channel *channel() const { return channel_; }
- ::aos::monotonic_clock::time_point monotonic_now() const {
- return scheduler_->monotonic_now();
- }
-
private:
const Channel *channel_;
@@ -186,12 +184,12 @@
message_ = MakeSimulatedMessage(simulated_channel_->max_size());
// Now fill in the message. size is already populated above, and
- // queue_index will be populated in queue_. Put this at the back of the
- // data segment.
+ // queue_index will be populated in simulated_channel_. Put this at the
+ // back of the data segment.
memcpy(message_->data() + simulated_channel_->max_size() - size, msg, size);
- return Send(size, monotonic_remote_time, realtime_remote_time,
- remote_queue_index);
+ return DoSend(size, monotonic_remote_time, realtime_remote_time,
+ remote_queue_index);
}
private:
@@ -204,9 +202,11 @@
class SimulatedFetcher : public RawFetcher {
public:
- explicit SimulatedFetcher(EventLoop *event_loop, SimulatedChannel *queue)
- : RawFetcher(event_loop, queue->channel()), queue_(queue) {}
- ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
+ explicit SimulatedFetcher(EventLoop *event_loop,
+ SimulatedChannel *simulated_channel)
+ : RawFetcher(event_loop, simulated_channel->channel()),
+ simulated_channel_(simulated_channel) {}
+ ~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
if (msgs_.size() == 0) {
@@ -222,8 +222,8 @@
if (msgs_.size() == 0) {
// TODO(austin): Can we just do this logic unconditionally? It is a lot
// simpler. And call clear, obviously.
- if (!msg_ && queue_->latest_message()) {
- SetMsg(queue_->latest_message());
+ if (!msg_ && simulated_channel_->latest_message()) {
+ SetMsg(simulated_channel_->latest_message());
return std::make_pair(true, event_loop()->monotonic_now());
} else {
return std::make_pair(false, monotonic_clock::min_time);
@@ -260,7 +260,7 @@
msgs_.emplace_back(buffer);
}
- SimulatedChannel *queue_;
+ SimulatedChannel *simulated_channel_;
std::shared_ptr<SimulatedMessage> msg_;
// Messages queued up but not in use.
@@ -270,6 +270,7 @@
class SimulatedTimerHandler : public TimerHandler {
public:
explicit SimulatedTimerHandler(EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
SimulatedEventLoop *simulated_event_loop,
::std::function<void()> fn);
~SimulatedTimerHandler() { Disable(); }
@@ -285,6 +286,7 @@
SimulatedEventLoop *simulated_event_loop_;
EventHandler<SimulatedTimerHandler> event_;
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
monotonic_clock::time_point base_;
@@ -294,6 +296,7 @@
class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
public:
SimulatedPhasedLoopHandler(EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
SimulatedEventLoop *simulated_event_loop,
::std::function<void(int)> fn,
const monotonic_clock::duration interval,
@@ -309,6 +312,7 @@
EventHandler<SimulatedPhasedLoopHandler> event_;
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
};
@@ -316,6 +320,7 @@
public:
explicit SimulatedEventLoop(
EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
*channels,
const Configuration *configuration,
@@ -324,6 +329,7 @@
const Node *node, pid_t tid)
: EventLoop(CHECK_NOTNULL(configuration)),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
channels_(channels),
raw_event_loops_(raw_event_loops),
node_(node),
@@ -361,11 +367,11 @@
}
::aos::monotonic_clock::time_point monotonic_now() override {
- return scheduler_->monotonic_now();
+ return node_event_loop_factory_->monotonic_now();
}
::aos::realtime_clock::time_point realtime_now() override {
- return scheduler_->realtime_now();
+ return node_event_loop_factory_->realtime_now();
}
::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
@@ -379,17 +385,17 @@
TimerHandler *AddTimer(::std::function<void()> callback) override {
CHECK(!is_running());
- return NewTimer(::std::unique_ptr<TimerHandler>(
- new SimulatedTimerHandler(scheduler_, this, callback)));
+ return NewTimer(::std::unique_ptr<TimerHandler>(new SimulatedTimerHandler(
+ scheduler_, node_event_loop_factory_, this, callback)));
}
PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset =
::std::chrono::seconds(0)) override {
- return NewPhasedLoop(
- ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
- scheduler_, this, callback, interval, offset)));
+ return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
+ new SimulatedPhasedLoopHandler(scheduler_, node_event_loop_factory_,
+ this, callback, interval, offset)));
}
void OnRun(::std::function<void()> on_run) override {
@@ -433,6 +439,7 @@
pid_t GetTid() override { return tid_; }
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
*raw_event_loops_;
@@ -459,17 +466,13 @@
}
}
-std::chrono::nanoseconds SimulatedEventLoopFactory::send_delay() const {
- return send_delay_;
-}
-
void SimulatedEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &channel, const void *message)> watcher) {
TakeWatcher(channel);
- std::unique_ptr<SimulatedWatcher> shm_watcher(
- new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
+ std::unique_ptr<SimulatedWatcher> shm_watcher(new SimulatedWatcher(
+ this, scheduler_, node_event_loop_factory_, channel, std::move(watcher)));
GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
NewWatcher(std::move(shm_watcher));
@@ -511,12 +514,13 @@
SimulatedWatcher::SimulatedWatcher(
SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
- const Channel *channel,
+ NodeEventLoopFactory *node_event_loop_factory, const Channel *channel,
std::function<void(const Context &context, const void *message)> fn)
: WatcherState(simulated_event_loop, channel, std::move(fn)),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
SimulatedWatcher::~SimulatedWatcher() {
@@ -574,9 +578,10 @@
}
void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
- token_ =
- scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
- [this]() { simulated_event_loop_->HandleEvent(); });
+ token_ = scheduler_->Schedule(
+ node_event_loop_factory_->ToDistributedClock(
+ event_time + simulated_event_loop_->send_delay()),
+ [this]() { simulated_event_loop_->HandleEvent(); });
}
void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
@@ -622,12 +627,13 @@
}
SimulatedTimerHandler::SimulatedTimerHandler(
- EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
- ::std::function<void()> fn)
+ EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+ SimulatedEventLoop *simulated_event_loop, ::std::function<void()> fn)
: TimerHandler(simulated_event_loop, std::move(fn)),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
@@ -639,10 +645,12 @@
repeat_offset_ = repeat_offset;
if (base < monotonic_now) {
token_ = scheduler_->Schedule(
- monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(monotonic_now),
+ [this]() { simulated_event_loop_->HandleEvent(); });
} else {
token_ = scheduler_->Schedule(
- base, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(base),
+ [this]() { simulated_event_loop_->HandleEvent(); });
}
event_.set_event_time(base_);
simulated_event_loop_->AddEvent(&event_);
@@ -655,7 +663,8 @@
// Reschedule.
while (base_ <= monotonic_now) base_ += repeat_offset_;
token_ = scheduler_->Schedule(
- base_, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(base_),
+ [this]() { simulated_event_loop_->HandleEvent(); });
event_.set_event_time(base_);
simulated_event_loop_->AddEvent(&event_);
} else {
@@ -674,13 +683,15 @@
}
SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
- EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
- ::std::function<void(int)> fn, const monotonic_clock::duration interval,
+ EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+ SimulatedEventLoop *simulated_event_loop, ::std::function<void(int)> fn,
+ const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
: PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
@@ -702,51 +713,76 @@
void SimulatedPhasedLoopHandler::Schedule(
monotonic_clock::time_point sleep_time) {
token_ = scheduler_->Schedule(
- sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(sleep_time),
+ [this]() { simulated_event_loop_->HandleEvent(); });
event_.set_event_time(sleep_time);
simulated_event_loop_->AddEvent(&event_);
}
+NodeEventLoopFactory::NodeEventLoopFactory(
+ EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+ const Node *node,
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ *raw_event_loops)
+ : scheduler_(scheduler),
+ factory_(factory),
+ node_(node),
+ raw_event_loops_(raw_event_loops) {}
+
SimulatedEventLoopFactory::SimulatedEventLoopFactory(
const Configuration *configuration)
- : configuration_(CHECK_NOTNULL(configuration)), node_(nullptr) {
- CHECK(!configuration_->has_nodes())
- << ": Got a configuration with multiple nodes and no node was selected.";
-}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
- const Configuration *configuration, std::string_view node_name)
- : SimulatedEventLoopFactory(
- configuration, configuration::GetNode(configuration, node_name)) {}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
- const Configuration *configuration, const Node *node)
- : configuration_(CHECK_NOTNULL(configuration)), node_(node) {
- if (node != nullptr) {
- CHECK(configuration_->has_nodes())
- << ": Got a configuration with no nodes and node \""
- << node->name()->string_view() << "\" was selected.";
- bool found = false;
- for (const Node *node : *configuration_->nodes()) {
- if (node == node_) {
- found = true;
- break;
- }
+ : configuration_(CHECK_NOTNULL(configuration)) {
+ if (configuration::MultiNode(configuration_)) {
+ for (const Node *node : *configuration->nodes()) {
+ nodes_.emplace_back(node);
}
- CHECK(found) << ": node must be a pointer in the configuration.";
+ } else {
+ nodes_.emplace_back(nullptr);
+ }
+
+ for (const Node *node : nodes_) {
+ node_factories_.emplace_back(
+ new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
}
}
SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
+NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
+ const Node *node) {
+ auto result = std::find_if(
+ node_factories_.begin(), node_factories_.end(),
+ [node](const std::unique_ptr<NodeEventLoopFactory> &node_factory) {
+ return node_factory->node() == node;
+ });
+
+ CHECK(result != node_factories_.end())
+ << ": Failed to find node " << FlatbufferToJson(node);
+
+ return result->get();
+}
+
::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
+ std::string_view name, const Node *node) {
+ if (node == nullptr) {
+ CHECK(!configuration::MultiNode(configuration()))
+ << ": Can't make a single node event loop in a multi-node world.";
+ } else {
+ CHECK(configuration::MultiNode(configuration()))
+ << ": Can't make a multi-node event loop in a single-node world.";
+ }
+ return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
+}
+
+::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
std::string_view name) {
pid_t tid = tid_;
++tid_;
::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
- &scheduler_, &channels_, configuration_, &raw_event_loops_, node_, tid));
+ scheduler_, this, &channels_, factory_->configuration(), raw_event_loops_,
+ node_, tid));
result->set_name(name);
- result->set_send_delay(send_delay_);
+ result->set_send_delay(factory_->send_delay());
return std::move(result);
}
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index de37c03..019172b 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -23,66 +23,162 @@
// Class for simulated fetchers.
class SimulatedChannel;
+class NodeEventLoopFactory;
+
+// There are 2 concepts needed to support multi-node simulations.
+// 1) The node. This is implemented with NodeEventLoopFactory.
+// 2) The "robot" which runs multiple nodes. This is implemented with
+// SimulatedEventLoopFactory.
+//
+// To make things easier, SimulatedEventLoopFactory takes an optional Node
+// argument if you want to make event loops without interacting with the
+// NodeEventLoopFactory object.
+//
+// The basic flow goes something like as follows:
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// std::unique_ptr<EventLoop> event_loop = factory.MakeEventLoop("ping", pi1);
+//
+// Or
+//
+// SimulatedEventLoopFactory factory(config);
+// const Node *pi1 = configuration::GetNode(factory.configuration(), "pi1");
+// NodeEventLoopFactory *pi1_factory = factory.GetNodeEventLoopFactory(pi1);
+// std::unique_ptr<EventLoop> event_loop = pi1_factory.MakeEventLoop("ping");
+//
+// The distributed_clock is used to be the base time. NodeEventLoopFactory has
+// all the information needed to adjust both the realtime and monotonic clocks
+// relative to the distributed_clock.
class SimulatedEventLoopFactory {
public:
// Constructs a SimulatedEventLoopFactory with the provided configuration.
// This configuration must remain in scope for the lifetime of the factory and
// all sub-objects.
SimulatedEventLoopFactory(const Configuration *configuration);
- SimulatedEventLoopFactory(const Configuration *configuration,
- std::string_view node_name);
- SimulatedEventLoopFactory(const Configuration *configuration,
- const Node *node);
~SimulatedEventLoopFactory();
- ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
+ // Creates an event loop. If running in a multi-node environment, node needs
+ // to point to the node to create this event loop on.
+ ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name,
+ const Node *node = nullptr);
+
+ // Returns the NodeEventLoopFactory for the provided node. The returned
+ // NodeEventLoopFactory is owned by the SimulatedEventLoopFactory and has a
+ // lifetime identical to the factory.
+ NodeEventLoopFactory *GetNodeEventLoopFactory(const Node *node);
// Starts executing the event loops unconditionally.
void Run();
// Executes the event loops for a duration.
- void RunFor(monotonic_clock::duration duration);
+ void RunFor(distributed_clock::duration duration);
// Stops executing all event loops. Meant to be called from within an event
// loop handler.
void Exit() { scheduler_.Exit(); }
- // Sets the simulated send delay for the factory.
+ const std::vector<const Node *> &nodes() const { return nodes_; }
+
+ // Sets the simulated send delay for all messages sent within a single node.
void set_send_delay(std::chrono::nanoseconds send_delay);
- std::chrono::nanoseconds send_delay() const;
+ std::chrono::nanoseconds send_delay() const { return send_delay_; }
+
+ // Sets the simulated network delay for messages forwarded between nodes.
+ void set_network_delay(std::chrono::nanoseconds network_delay);
+ std::chrono::nanoseconds network_delay() const { return network_delay_; }
+
+ // Returns the clock used to synchronize the nodes.
+ distributed_clock::time_point distributed_now() const {
+ return scheduler_.distributed_now();
+ }
+
+ // Returns the configuration used for everything.
+ const Configuration *configuration() const { return configuration_; }
+
+ private:
+ const Configuration *const configuration_;
+ EventScheduler scheduler_;
+ // List of event loops to manage running and not running for.
+ // The function is a callback used to set and clear the running bool on each
+ // event loop.
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ raw_event_loops_;
+
+ std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+ std::chrono::nanoseconds network_delay_ = std::chrono::microseconds(100);
+
+ std::vector<std::unique_ptr<NodeEventLoopFactory>> node_factories_;
+
+ std::vector<const Node *> nodes_;
+};
+
+// This class holds all the state required to be a single node.
+class NodeEventLoopFactory {
+ public:
+ ::std::unique_ptr<EventLoop> MakeEventLoop(std::string_view name);
// Returns the node that this factory is running as, or nullptr if this is a
// single node setup.
const Node *node() const { return node_; }
- monotonic_clock::time_point monotonic_now() const {
- return scheduler_.monotonic_now();
- }
- realtime_clock::time_point realtime_now() const {
- return scheduler_.realtime_now();
- }
-
// Sets realtime clock to realtime_now for a given monotonic clock.
void SetRealtimeOffset(monotonic_clock::time_point monotonic_now,
realtime_clock::time_point realtime_now) {
- scheduler_.SetRealtimeOffset(monotonic_now, realtime_now);
+ realtime_offset_ =
+ realtime_now.time_since_epoch() - monotonic_now.time_since_epoch();
}
- private:
- const Configuration *const configuration_;
- EventScheduler scheduler_;
- // Map from name, type to queue.
- absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
- // List of event loops to manage running and not running for.
- std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
- raw_event_loops_;
+ // Returns the current time on both clocks.
+ inline monotonic_clock::time_point monotonic_now() const;
+ inline realtime_clock::time_point realtime_now() const;
- std::chrono::nanoseconds send_delay_ = std::chrono::microseconds(50);
+ // Converts a time to the distributed clock for scheduling and cross-node time
+ // measurement.
+ inline distributed_clock::time_point ToDistributedClock(
+ monotonic_clock::time_point time) const;
+
+ private:
+ friend class SimulatedEventLoopFactory;
+ NodeEventLoopFactory(
+ EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+ const Node *node,
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ *raw_event_loops);
+
+ EventScheduler *const scheduler_;
+ SimulatedEventLoopFactory *const factory_;
const Node *const node_;
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ *const raw_event_loops_;
+
+ std::chrono::nanoseconds monotonic_offset_ = std::chrono::seconds(0);
+ std::chrono::nanoseconds realtime_offset_ = std::chrono::seconds(0);
+
+ // Map from name, type to queue.
+ absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> channels_;
+
+ // pid so we get unique timing reports.
pid_t tid_ = 0;
};
+inline monotonic_clock::time_point NodeEventLoopFactory::monotonic_now() const {
+ return monotonic_clock::time_point(
+ factory_->distributed_now().time_since_epoch() + monotonic_offset_);
+}
+
+inline realtime_clock::time_point NodeEventLoopFactory::realtime_now() const {
+ return realtime_clock::time_point(monotonic_now().time_since_epoch() +
+ realtime_offset_);
+}
+
+inline distributed_clock::time_point NodeEventLoopFactory::ToDistributedClock(
+ monotonic_clock::time_point time) const {
+ return distributed_clock::time_point(time.time_since_epoch() -
+ monotonic_offset_);
+}
+
} // namespace aos
#endif // AOS_EVENTS_SIMULATED_EVENT_LOOP_H_
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 4328d6f..b373aa6 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -15,11 +15,11 @@
public:
::std::unique_ptr<EventLoop> Make(std::string_view name) override {
MaybeMake();
- return event_loop_factory_->MakeEventLoop(name);
+ return event_loop_factory_->MakeEventLoop(name, my_node());
}
::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
MaybeMake();
- return event_loop_factory_->MakeEventLoop(name);
+ return event_loop_factory_->MakeEventLoop(name, my_node());
}
void Run() override { event_loop_factory_->Run(); }
@@ -38,8 +38,8 @@
void MaybeMake() {
if (!event_loop_factory_) {
if (configuration()->has_nodes()) {
- event_loop_factory_ = std::make_unique<SimulatedEventLoopFactory>(
- configuration(), my_node());
+ event_loop_factory_ =
+ std::make_unique<SimulatedEventLoopFactory>(configuration());
} else {
event_loop_factory_ =
std::make_unique<SimulatedEventLoopFactory>(configuration());
@@ -64,12 +64,13 @@
int counter = 0;
EventScheduler scheduler;
- scheduler.Schedule(::aos::monotonic_clock::now(),
- [&counter]() { counter += 1; });
+ scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
+ [&counter]() { counter += 1; });
scheduler.Run();
EXPECT_EQ(counter, 1);
- auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
- [&counter]() { counter += 1; });
+ auto token =
+ scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(2),
+ [&counter]() { counter += 1; });
scheduler.Deschedule(token);
scheduler.Run();
EXPECT_EQ(counter, 1);
@@ -80,8 +81,9 @@
int counter = 0;
EventScheduler scheduler;
- auto token = scheduler.Schedule(::aos::monotonic_clock::now(),
- [&counter]() { counter += 1; });
+ auto token =
+ scheduler.Schedule(distributed_clock::epoch() + chrono::seconds(1),
+ [&counter]() { counter += 1; });
scheduler.Deschedule(token);
scheduler.Run();
EXPECT_EQ(counter, 0);
@@ -100,8 +102,6 @@
simulated_event_loop_factory.RunFor(chrono::seconds(1));
EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
- simulated_event_loop_factory.monotonic_now());
- EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
event_loop->monotonic_now());
}
@@ -125,8 +125,6 @@
simulated_event_loop_factory.RunFor(chrono::seconds(1));
EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
- simulated_event_loop_factory.monotonic_now());
- EXPECT_EQ(::aos::monotonic_clock::epoch() + chrono::seconds(1),
event_loop->monotonic_now());
EXPECT_EQ(counter, 10);
}