Add a simulated message bridge
This gives us multi-node simulations!
Change-Id: I10fa955653766a26e4d11471a6dae5b47ea5cd1c
diff --git a/aos/events/BUILD b/aos/events/BUILD
index a183cae..fb0f356 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -243,8 +243,11 @@
cc_test(
name = "simulated_event_loop_test",
srcs = ["simulated_event_loop_test.cc"],
+ data = ["multinode_pingpong_config.json"],
deps = [
":event_loop_param_test",
+ ":ping_lib",
+ ":pong_lib",
":simulated_event_loop",
"//aos/testing:googletest",
],
@@ -267,10 +270,12 @@
srcs = [
"event_scheduler.cc",
"simulated_event_loop.cc",
+ "simulated_network_bridge.cc",
],
hdrs = [
"event_scheduler.h",
"simulated_event_loop.h",
+ "simulated_network_bridge.h",
],
visibility = ["//visibility:public"],
deps = [
diff --git a/aos/events/event_loop_param_test.h b/aos/events/event_loop_param_test.h
index 4f24d78..89e80ae 100644
--- a/aos/events/event_loop_param_test.h
+++ b/aos/events/event_loop_param_test.h
@@ -55,7 +55,7 @@
virtual void SleepFor(::std::chrono::nanoseconds duration) = 0;
void EnableNodes(std::string_view my_node) {
- std::string json = std::string(R"config({
+ std::string json = R"config({
"channels": [
{
"name": "/aos",
@@ -80,9 +80,12 @@
],
"nodes": [
{
- "name": ")config") +
- std::string(my_node) + R"config(",
+ "name": "me",
"hostname": "myhostname"
+ },
+ {
+ "name": "them",
+ "hostname": "themhostname"
}
]
})config";
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 2d9604f..c766fe6 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -65,7 +65,7 @@
// Make an eventloop for retrieving stats
std::unique_ptr<aos::EventLoop> stats_event_loop =
- log_reader_factory.MakeEventLoop("logstats");
+ log_reader_factory.MakeEventLoop("logstats", reader.node());
stats_event_loop->SkipTimingReport();
// Read channel info and store in vector
diff --git a/aos/events/shm_event_loop_test.cc b/aos/events/shm_event_loop_test.cc
index d33f496..2edc3fc 100644
--- a/aos/events/shm_event_loop_test.cc
+++ b/aos/events/shm_event_loop_test.cc
@@ -38,7 +38,8 @@
::std::unique_ptr<EventLoop> Make(std::string_view name) override {
if (configuration()->has_nodes()) {
- FLAGS_override_hostname = "myhostname";
+ FLAGS_override_hostname =
+ std::string(my_node()->hostname()->string_view());
}
::std::unique_ptr<ShmEventLoop> loop(new ShmEventLoop(configuration()));
loop->set_name(name);
@@ -47,7 +48,8 @@
::std::unique_ptr<EventLoop> MakePrimary(std::string_view name) override {
if (configuration()->has_nodes()) {
- FLAGS_override_hostname = "myhostname";
+ FLAGS_override_hostname =
+ std::string(my_node()->hostname()->string_view());
}
::std::unique_ptr<ShmEventLoop> loop =
::std::unique_ptr<ShmEventLoop>(new ShmEventLoop(configuration()));
diff --git a/aos/events/simulated_event_loop.cc b/aos/events/simulated_event_loop.cc
index 0d8a488..108a485 100644
--- a/aos/events/simulated_event_loop.cc
+++ b/aos/events/simulated_event_loop.cc
@@ -5,6 +5,7 @@
#include <string_view>
#include "absl/container/btree_map.h"
+#include "aos/events/simulated_network_bridge.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/util/phased_loop.h"
@@ -744,6 +745,10 @@
node_factories_.emplace_back(
new NodeEventLoopFactory(&scheduler_, this, node, &raw_event_loops_));
}
+
+ if (configuration::MultiNode(configuration)) {
+ bridge_ = std::make_unique<message_bridge::SimulatedMessageBridge>(this);
+ }
}
SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
diff --git a/aos/events/simulated_event_loop.h b/aos/events/simulated_event_loop.h
index 019172b..8cff0d7 100644
--- a/aos/events/simulated_event_loop.h
+++ b/aos/events/simulated_event_loop.h
@@ -24,6 +24,9 @@
class SimulatedChannel;
class NodeEventLoopFactory;
+namespace message_bridge {
+class SimulatedMessageBridge;
+}
// There are 2 concepts needed to support multi-node simulations.
// 1) The node. This is implemented with NodeEventLoopFactory.
@@ -110,6 +113,8 @@
std::vector<std::unique_ptr<NodeEventLoopFactory>> node_factories_;
std::vector<const Node *> nodes_;
+
+ std::unique_ptr<message_bridge::SimulatedMessageBridge> bridge_;
};
// This class holds all the state required to be a single node.
@@ -132,6 +137,14 @@
inline monotonic_clock::time_point monotonic_now() const;
inline realtime_clock::time_point realtime_now() const;
+ // Returns the simulated network delay for messages forwarded between nodes.
+ std::chrono::nanoseconds network_delay() const {
+ return factory_->network_delay();
+ }
+ // Returns the simulated send delay for all messages sent within a single
+ // node.
+ std::chrono::nanoseconds send_delay() const { return factory_->send_delay(); }
+
// Converts a time to the distributed clock for scheduling and cross-node time
// measurement.
inline distributed_clock::time_point ToDistributedClock(
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index b373aa6..de5cbae 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -3,6 +3,8 @@
#include <string_view>
#include "aos/events/event_loop_param_test.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/pong_lib.h"
#include "aos/events/test_message_generated.h"
#include "gtest/gtest.h"
@@ -230,5 +232,45 @@
0.0);
}
+// Tests that ping and pong work when on 2 different nodes.
+TEST(SimulatedEventLoopTest, MultinodePingPong) {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(
+ "aos/events/multinode_pingpong_config.json");
+ const Node *pi1 = configuration::GetNode(&config.message(), "pi1");
+ const Node *pi2 = configuration::GetNode(&config.message(), "pi2");
+
+ SimulatedEventLoopFactory simulated_event_loop_factory(&config.message());
+
+ std::unique_ptr<EventLoop> ping_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("ping", pi1);
+ Ping ping(ping_event_loop.get());
+
+ std::unique_ptr<EventLoop> pong_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pong", pi2);
+ Pong pong(pong_event_loop.get());
+
+ std::unique_ptr<EventLoop> pi2_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi2_pong_counter", pi2);
+
+ int pi2_pong_count = 0;
+ pi2_pong_counter_event_loop->MakeWatcher(
+ "/test",
+ [&pi2_pong_count](const examples::Pong & /*pong*/) { ++pi2_pong_count; });
+
+ std::unique_ptr<EventLoop> pi1_pong_counter_event_loop =
+ simulated_event_loop_factory.MakeEventLoop("pi1_pong_counter", pi1);
+ int pi1_pong_count = 0;
+ pi1_pong_counter_event_loop->MakeWatcher(
+ "/test",
+ [&pi1_pong_count](const examples::Pong & /*pong*/) { ++pi1_pong_count; });
+
+ simulated_event_loop_factory.RunFor(chrono::seconds(10) +
+ chrono::milliseconds(5));
+
+ EXPECT_EQ(pi1_pong_count, 1001);
+ EXPECT_EQ(pi2_pong_count, 1001);
+}
+
} // namespace testing
} // namespace aos
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
new file mode 100644
index 0000000..57f2efd
--- /dev/null
+++ b/aos/events/simulated_network_bridge.cc
@@ -0,0 +1,165 @@
+#include "aos/events/simulated_network_bridge.h"
+
+#include "aos/events/event_loop.h"
+#include "aos/events/simulated_event_loop.h"
+
+namespace aos {
+namespace message_bridge {
+
+// This class delays messages forwarded between two factories.
+//
+// The basic design is that we need to use the distributed_clock to convert
+// monotonic times from the source to the destination node. We also use a
+// fetcher to manage the queue of data, and a timer to schedule the sends.
+class RawMessageDelayer {
+ public:
+ RawMessageDelayer(aos::NodeEventLoopFactory *fetch_node_factory,
+ aos::NodeEventLoopFactory *send_node_factory,
+ aos::EventLoop *send_event_loop,
+ std::unique_ptr<aos::RawFetcher> fetcher,
+ std::unique_ptr<aos::RawSender> sender)
+ : fetch_node_factory_(fetch_node_factory),
+ send_node_factory_(send_node_factory),
+ send_event_loop_(send_event_loop),
+ fetcher_(std::move(fetcher)),
+ sender_(std::move(sender)) {
+ timer_ = send_event_loop_->AddTimer([this]() { Send(); });
+
+ Schedule();
+ }
+
+ // Kicks us to re-fetch and schedule the timer.
+ void Schedule() {
+ if (fetcher_->context().data == nullptr || sent_) {
+ sent_ = !fetcher_->FetchNext();
+ }
+
+ if (fetcher_->context().data == nullptr) {
+ return;
+ }
+
+ if (sent_) {
+ return;
+ }
+
+ // Compute the time to publish this message.
+ const monotonic_clock::time_point monotonic_delivered_time =
+ DeliveredTime(fetcher_->context());
+
+ CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
+ << ": Trying to deliver message in the past...";
+
+ timer_->Setup(monotonic_delivered_time);
+ }
+
+ private:
+ // Acutally sends the message, and reschedules.
+ void Send() {
+ // Compute the time to publish this message.
+ const monotonic_clock::time_point monotonic_delivered_time =
+ DeliveredTime(fetcher_->context());
+
+ CHECK_EQ(monotonic_delivered_time, send_node_factory_->monotonic_now())
+ << ": Message to be sent at the wrong time.";
+
+ // And also fill out the send times as well.
+ sender_->Send(fetcher_->context().data, fetcher_->context().size,
+ fetcher_->context().monotonic_event_time,
+ fetcher_->context().realtime_event_time,
+ fetcher_->context().queue_index);
+
+ sent_ = true;
+ Schedule();
+ }
+
+ // Converts from time on the sending node to time on the receiving node.
+ monotonic_clock::time_point DeliveredTime(const Context &context) const {
+ const distributed_clock::time_point distributed_sent_time =
+ fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
+
+ return aos::monotonic_clock::epoch() +
+ (distributed_sent_time - send_node_factory_->ToDistributedClock(
+ aos::monotonic_clock::epoch())) +
+ send_node_factory_->network_delay() +
+ send_node_factory_->send_delay();
+ }
+
+ // Factories used for time conversion.
+ aos::NodeEventLoopFactory *fetch_node_factory_;
+ aos::NodeEventLoopFactory *send_node_factory_;
+
+ // Event loop which sending is scheduled on.
+ aos::EventLoop *send_event_loop_;
+ // Timer used to send.
+ aos::TimerHandler *timer_;
+ // Fetcher used to receive messages.
+ std::unique_ptr<aos::RawFetcher> fetcher_;
+ // Sender to send them back out.
+ std::unique_ptr<aos::RawSender> sender_;
+ // True if we have sent the message in the fetcher.
+ bool sent_ = false;
+};
+
+SimulatedMessageBridge::SimulatedMessageBridge(
+ SimulatedEventLoopFactory *simulated_event_loop_factory) {
+ CHECK(
+ configuration::MultiNode(simulated_event_loop_factory->configuration()));
+
+ // Pre-build up event loops for every node. They are pretty cheap anyways.
+ for (const Node *node : simulated_event_loop_factory->nodes()) {
+ CHECK(event_loop_map_
+ .insert({node, simulated_event_loop_factory->MakeEventLoop(
+ "message_bridge", node)})
+ .second);
+ }
+
+ for (const Channel *channel :
+ *simulated_event_loop_factory->configuration()->channels()) {
+ if (!channel->has_destination_nodes()) {
+ continue;
+ }
+
+ // Find the sending node.
+ const Node *node =
+ configuration::GetNode(simulated_event_loop_factory->configuration(),
+ channel->source_node()->string_view());
+ auto source_event_loop = event_loop_map_.find(node);
+ CHECK(source_event_loop != event_loop_map_.end());
+
+ std::unique_ptr<DelayersVector> delayers =
+ std::make_unique<DelayersVector>();
+
+ // And then build up a RawMessageDelayer for each destination.
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const Node *destination_node =
+ configuration::GetNode(simulated_event_loop_factory->configuration(),
+ connection->name()->string_view());
+ auto destination_event_loop = event_loop_map_.find(destination_node);
+ CHECK(destination_event_loop != event_loop_map_.end());
+
+ delayers->emplace_back(std::make_unique<RawMessageDelayer>(
+ simulated_event_loop_factory->GetNodeEventLoopFactory(node),
+ simulated_event_loop_factory->GetNodeEventLoopFactory(
+ destination_node),
+ destination_event_loop->second.get(),
+ source_event_loop->second->MakeRawFetcher(channel),
+ destination_event_loop->second->MakeRawSender(channel)));
+ }
+
+ // And register every delayer to be poked when a new message shows up.
+ source_event_loop->second->MakeRawWatcher(
+ channel,
+ [captured_delayers = delayers.get()](const Context &, const void *) {
+ for (std::unique_ptr<RawMessageDelayer> &delayer :
+ *captured_delayers) {
+ delayer->Schedule();
+ }
+ });
+ delayers_list_.emplace_back(std::move(delayers));
+ }
+}
+
+SimulatedMessageBridge::~SimulatedMessageBridge() {}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
new file mode 100644
index 0000000..5d613ab
--- /dev/null
+++ b/aos/events/simulated_network_bridge.h
@@ -0,0 +1,36 @@
+#ifndef AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
+#define AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_
+
+#include "aos/events/event_loop.h"
+#include "aos/events/simulated_event_loop.h"
+
+namespace aos {
+namespace message_bridge {
+
+class RawMessageDelayer;
+
+// This class moves messages between nodes. It is implemented as a separate
+// class because it would have been even harder to manage forwarding in the
+// SimulatedEventLoopFactory.
+class SimulatedMessageBridge {
+ public:
+ // Constructs the bridge.
+ SimulatedMessageBridge(
+ SimulatedEventLoopFactory *simulated_event_loop_factory);
+ ~SimulatedMessageBridge();
+
+ private:
+ // Map of nodes to event loops. This is a member variable so that the
+ // lifetime of the event loops matches the lifetime of the bridge.
+ std::map<const Node *, std::unique_ptr<aos::EventLoop>> event_loop_map_;
+
+
+ // List of delayers used to resend the messages.
+ using DelayersVector = std::vector<std::unique_ptr<RawMessageDelayer>>;
+ std::vector<std::unique_ptr<DelayersVector>> delayers_list_;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_EVENTS_SIMULATED_NETWORK_BRIDGE_H_