Add NodeEventLoopFactory
This lets us create event loops on separate nodes which can't
communicate with each other. Next step is to add a message proxy
between them, then teach the logger to replay onto multiple nodes.
Change-Id: I06b2836365aea13d696535c52a78ca0c862a7b1e
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index e34a23f..0d8a488 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -36,6 +36,7 @@
public:
SimulatedWatcher(
SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
const Channel *channel,
std::function<void(const Context &context, const void *message)> fn);
@@ -59,6 +60,7 @@
SimulatedEventLoop *simulated_event_loop_;
EventHandler<SimulatedWatcher> event_;
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
SimulatedChannel *simulated_channel_ = nullptr;
};
@@ -102,10 +104,6 @@
const Channel *channel() const { return channel_; }
- ::aos::monotonic_clock::time_point monotonic_now() const {
- return scheduler_->monotonic_now();
- }
-
private:
const Channel *channel_;
@@ -186,12 +184,12 @@
message_ = MakeSimulatedMessage(simulated_channel_->max_size());
// Now fill in the message. size is already populated above, and
- // queue_index will be populated in queue_. Put this at the back of the
- // data segment.
+ // queue_index will be populated in simulated_channel_. Put this at the
+ // back of the data segment.
memcpy(message_->data() + simulated_channel_->max_size() - size, msg, size);
- return Send(size, monotonic_remote_time, realtime_remote_time,
- remote_queue_index);
+ return DoSend(size, monotonic_remote_time, realtime_remote_time,
+ remote_queue_index);
}
private:
@@ -204,9 +202,11 @@
class SimulatedFetcher : public RawFetcher {
public:
- explicit SimulatedFetcher(EventLoop *event_loop, SimulatedChannel *queue)
- : RawFetcher(event_loop, queue->channel()), queue_(queue) {}
- ~SimulatedFetcher() { queue_->UnregisterFetcher(this); }
+ explicit SimulatedFetcher(EventLoop *event_loop,
+ SimulatedChannel *simulated_channel)
+ : RawFetcher(event_loop, simulated_channel->channel()),
+ simulated_channel_(simulated_channel) {}
+ ~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
if (msgs_.size() == 0) {
@@ -222,8 +222,8 @@
if (msgs_.size() == 0) {
// TODO(austin): Can we just do this logic unconditionally? It is a lot
// simpler. And call clear, obviously.
- if (!msg_ && queue_->latest_message()) {
- SetMsg(queue_->latest_message());
+ if (!msg_ && simulated_channel_->latest_message()) {
+ SetMsg(simulated_channel_->latest_message());
return std::make_pair(true, event_loop()->monotonic_now());
} else {
return std::make_pair(false, monotonic_clock::min_time);
@@ -260,7 +260,7 @@
msgs_.emplace_back(buffer);
}
- SimulatedChannel *queue_;
+ SimulatedChannel *simulated_channel_;
std::shared_ptr<SimulatedMessage> msg_;
// Messages queued up but not in use.
@@ -270,6 +270,7 @@
class SimulatedTimerHandler : public TimerHandler {
public:
explicit SimulatedTimerHandler(EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
SimulatedEventLoop *simulated_event_loop,
::std::function<void()> fn);
~SimulatedTimerHandler() { Disable(); }
@@ -285,6 +286,7 @@
SimulatedEventLoop *simulated_event_loop_;
EventHandler<SimulatedTimerHandler> event_;
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
monotonic_clock::time_point base_;
@@ -294,6 +296,7 @@
class SimulatedPhasedLoopHandler : public PhasedLoopHandler {
public:
SimulatedPhasedLoopHandler(EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
SimulatedEventLoop *simulated_event_loop,
::std::function<void(int)> fn,
const monotonic_clock::duration interval,
@@ -309,6 +312,7 @@
EventHandler<SimulatedPhasedLoopHandler> event_;
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
EventScheduler::Token token_;
};
@@ -316,6 +320,7 @@
public:
explicit SimulatedEventLoop(
EventScheduler *scheduler,
+ NodeEventLoopFactory *node_event_loop_factory,
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
*channels,
const Configuration *configuration,
@@ -324,6 +329,7 @@
const Node *node, pid_t tid)
: EventLoop(CHECK_NOTNULL(configuration)),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
channels_(channels),
raw_event_loops_(raw_event_loops),
node_(node),
@@ -361,11 +367,11 @@
}
::aos::monotonic_clock::time_point monotonic_now() override {
- return scheduler_->monotonic_now();
+ return node_event_loop_factory_->monotonic_now();
}
::aos::realtime_clock::time_point realtime_now() override {
- return scheduler_->realtime_now();
+ return node_event_loop_factory_->realtime_now();
}
::std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
@@ -379,17 +385,17 @@
TimerHandler *AddTimer(::std::function<void()> callback) override {
CHECK(!is_running());
- return NewTimer(::std::unique_ptr<TimerHandler>(
- new SimulatedTimerHandler(scheduler_, this, callback)));
+ return NewTimer(::std::unique_ptr<TimerHandler>(new SimulatedTimerHandler(
+ scheduler_, node_event_loop_factory_, this, callback)));
}
PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
const monotonic_clock::duration interval,
const monotonic_clock::duration offset =
::std::chrono::seconds(0)) override {
- return NewPhasedLoop(
- ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
- scheduler_, this, callback, interval, offset)));
+ return NewPhasedLoop(::std::unique_ptr<PhasedLoopHandler>(
+ new SimulatedPhasedLoopHandler(scheduler_, node_event_loop_factory_,
+ this, callback, interval, offset)));
}
void OnRun(::std::function<void()> on_run) override {
@@ -433,6 +439,7 @@
pid_t GetTid() override { return tid_; }
EventScheduler *scheduler_;
+ NodeEventLoopFactory *node_event_loop_factory_;
absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
*raw_event_loops_;
@@ -459,17 +466,13 @@
}
}
-std::chrono::nanoseconds SimulatedEventLoopFactory::send_delay() const {
- return send_delay_;
-}
-
void SimulatedEventLoop::MakeRawWatcher(
const Channel *channel,
std::function<void(const Context &channel, const void *message)> watcher) {
TakeWatcher(channel);
- std::unique_ptr<SimulatedWatcher> shm_watcher(
- new SimulatedWatcher(this, scheduler_, channel, std::move(watcher)));
+ std::unique_ptr<SimulatedWatcher> shm_watcher(new SimulatedWatcher(
+ this, scheduler_, node_event_loop_factory_, channel, std::move(watcher)));
GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
NewWatcher(std::move(shm_watcher));
@@ -511,12 +514,13 @@
SimulatedWatcher::SimulatedWatcher(
SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
- const Channel *channel,
+ NodeEventLoopFactory *node_event_loop_factory, const Channel *channel,
std::function<void(const Context &context, const void *message)> fn)
: WatcherState(simulated_event_loop, channel, std::move(fn)),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
SimulatedWatcher::~SimulatedWatcher() {
@@ -574,9 +578,10 @@
}
void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
- token_ =
- scheduler_->Schedule(event_time + simulated_event_loop_->send_delay(),
- [this]() { simulated_event_loop_->HandleEvent(); });
+ token_ = scheduler_->Schedule(
+ node_event_loop_factory_->ToDistributedClock(
+ event_time + simulated_event_loop_->send_delay()),
+ [this]() { simulated_event_loop_->HandleEvent(); });
}
void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
@@ -622,12 +627,13 @@
}
SimulatedTimerHandler::SimulatedTimerHandler(
- EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
- ::std::function<void()> fn)
+ EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+ SimulatedEventLoop *simulated_event_loop, ::std::function<void()> fn)
: TimerHandler(simulated_event_loop, std::move(fn)),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
@@ -639,10 +645,12 @@
repeat_offset_ = repeat_offset;
if (base < monotonic_now) {
token_ = scheduler_->Schedule(
- monotonic_now, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(monotonic_now),
+ [this]() { simulated_event_loop_->HandleEvent(); });
} else {
token_ = scheduler_->Schedule(
- base, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(base),
+ [this]() { simulated_event_loop_->HandleEvent(); });
}
event_.set_event_time(base_);
simulated_event_loop_->AddEvent(&event_);
@@ -655,7 +663,8 @@
// Reschedule.
while (base_ <= monotonic_now) base_ += repeat_offset_;
token_ = scheduler_->Schedule(
- base_, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(base_),
+ [this]() { simulated_event_loop_->HandleEvent(); });
event_.set_event_time(base_);
simulated_event_loop_->AddEvent(&event_);
} else {
@@ -674,13 +683,15 @@
}
SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
- EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
- ::std::function<void(int)> fn, const monotonic_clock::duration interval,
+ EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
+ SimulatedEventLoop *simulated_event_loop, ::std::function<void(int)> fn,
+ const monotonic_clock::duration interval,
const monotonic_clock::duration offset)
: PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
simulated_event_loop_(simulated_event_loop),
event_(this),
scheduler_(scheduler),
+ node_event_loop_factory_(node_event_loop_factory),
token_(scheduler_->InvalidToken()) {}
SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
@@ -702,51 +713,76 @@
void SimulatedPhasedLoopHandler::Schedule(
monotonic_clock::time_point sleep_time) {
token_ = scheduler_->Schedule(
- sleep_time, [this]() { simulated_event_loop_->HandleEvent(); });
+ node_event_loop_factory_->ToDistributedClock(sleep_time),
+ [this]() { simulated_event_loop_->HandleEvent(); });
event_.set_event_time(sleep_time);
simulated_event_loop_->AddEvent(&event_);
}
+NodeEventLoopFactory::NodeEventLoopFactory(
+ EventScheduler *scheduler, SimulatedEventLoopFactory *factory,
+ const Node *node,
+ std::vector<std::pair<EventLoop *, std::function<void(bool)>>>
+ *raw_event_loops)
+ : scheduler_(scheduler),
+ factory_(factory),
+ node_(node),
+ raw_event_loops_(raw_event_loops) {}
+
SimulatedEventLoopFactory::SimulatedEventLoopFactory(
const Configuration *configuration)
- : configuration_(CHECK_NOTNULL(configuration)), node_(nullptr) {
- CHECK(!configuration_->has_nodes())
- << ": Got a configuration with multiple nodes and no node was selected.";
-}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
- const Configuration *configuration, std::string_view node_name)
- : SimulatedEventLoopFactory(
- configuration, configuration::GetNode(configuration, node_name)) {}
-
-SimulatedEventLoopFactory::SimulatedEventLoopFactory(
- const Configuration *configuration, const Node *node)
- : configuration_(CHECK_NOTNULL(configuration)), node_(node) {
- if (node != nullptr) {
- CHECK(configuration_->has_nodes())
- << ": Got a configuration with no nodes and node \""
- << node->name()->string_view() << "\" was selected.";
- bool found = false;
- for (const Node *node : *configuration_->nodes()) {
- if (node == node_) {
- found = true;
- break;
- }
+ : configuration_(CHECK_NOTNULL(configuration)) {
+ if (configuration::MultiNode(configuration_)) {
+ for (const Node *node : *configuration->nodes()) {
+ nodes_.emplace_back(node);
}
- CHECK(found) << ": node must be a pointer in the configuration.";
+ } else {
+ nodes_.emplace_back(nullptr);
+ }
+
+ for (const Node *node : nodes_) {
+ node_factories_.emplace_back(
+ new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
}
}
SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
+NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
+ const Node *node) {
+ auto result = std::find_if(
+ node_factories_.begin(), node_factories_.end(),
+ [node](const std::unique_ptr<NodeEventLoopFactory> &node_factory) {
+ return node_factory->node() == node;
+ });
+
+ CHECK(result != node_factories_.end())
+ << ": Failed to find node " << FlatbufferToJson(node);
+
+ return result->get();
+}
+
::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
+ std::string_view name, const Node *node) {
+ if (node == nullptr) {
+ CHECK(!configuration::MultiNode(configuration()))
+ << ": Can't make a single node event loop in a multi-node world.";
+ } else {
+ CHECK(configuration::MultiNode(configuration()))
+ << ": Can't make a multi-node event loop in a single-node world.";
+ }
+ return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
+}
+
+::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
std::string_view name) {
pid_t tid = tid_;
++tid_;
::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
- &scheduler_, &channels_, configuration_, &raw_event_loops_, node_, tid));
+ scheduler_, this, &channels_, factory_->configuration(), raw_event_loops_,
+ node_, tid));
result->set_name(name);
- result->set_send_delay(send_delay_);
+ result->set_send_delay(factory_->send_delay());
return std::move(result);
}