Add support for both split and combined timestamp channels
In order to handle simulation sending messages as fast as possible, and
messages sent infrequently with timestamps delivered, we want to split
each channel's RemoteMessage timestamps into a separate channel.
Start by creating a class to sort out which one is which reliably, and
start by using and testing it in message_bridge.
Change-Id: I5d279836146a01f5d403f597e59d330440fe702c
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 15af2b3..1d4b634 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -174,6 +174,20 @@
)
cc_library(
+ name = "timestamp_channel",
+ srcs = ["timestamp_channel.cc"],
+ hdrs = ["timestamp_channel.h"],
+ deps = [
+ ":remote_message_fbs",
+ "//aos:configuration",
+ "//aos/events:event_loop",
+ "@com_github_google_glog//:glog",
+ "@com_google_absl//absl/container:btree",
+ "@com_google_absl//absl/strings",
+ ],
+)
+
+cc_library(
name = "message_bridge_server_lib",
srcs = [
"message_bridge_server_lib.cc",
@@ -194,6 +208,7 @@
":remote_message_fbs",
":sctp_lib",
":sctp_server",
+ ":timestamp_channel",
":timestamp_fbs",
"//aos:unique_malloc_ptr",
"//aos/events:shm_event_loop",
@@ -297,6 +312,21 @@
)
aos_config(
+ name = "message_bridge_test_combined_timestamps_common_config",
+ src = "message_bridge_test_combined_timestamps_common.json",
+ flatbuffers = [
+ ":remote_message_fbs",
+ "//aos/events:ping_fbs",
+ "//aos/events:pong_fbs",
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:message_bridge_server_fbs",
+ "//aos/network:timestamp_fbs",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ deps = ["//aos/events:config"],
+)
+
+aos_config(
name = "message_bridge_test_common_config",
src = "message_bridge_test_common.json",
flatbuffers = [
@@ -317,10 +347,11 @@
"message_bridge_test.cc",
],
data = [
+ ":message_bridge_test_combined_timestamps_common_config",
":message_bridge_test_common_config",
],
flaky = True,
- shard_count = 5,
+ shard_count = 10,
target_compatible_with = ["@platforms//os:linux"],
deps = [
":message_bridge_client_lib",
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index aaeecff..a96d08e 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -10,6 +10,7 @@
#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/sctp_server.h"
+#include "aos/network/timestamp_channel.h"
#include "glog/logging.h"
namespace aos {
@@ -254,6 +255,7 @@
MessageBridgeServer::MessageBridgeServer(aos::ShmEventLoop *event_loop)
: event_loop_(event_loop),
+ timestamp_loggers_(event_loop_),
server_("::", event_loop->node()->port()),
server_status_(event_loop, [this](const Context &context) {
timestamp_state_->SendData(&server_, context);
@@ -261,7 +263,6 @@
CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
int32_t max_size = 0;
- timestamp_loggers_.resize(event_loop->configuration()->nodes()->size());
// Seed up all the per-node connection state.
// We are making the assumption here that every connection is bidirectional
@@ -287,10 +288,6 @@
}
// TODO(austin): Logging synchronization.
- //
- // TODO(austin): How do we handle parameter channels? The oldest value
- // needs to be sent regardless on connection (though probably only if it has
- // changed).
event_loop_->epoll()->OnReadable(server_.fd(),
[this]() { MessageReceived(); });
@@ -327,21 +324,11 @@
for (const Connection *connection : *channel->destination_nodes()) {
const Node *other_node = configuration::GetNode(
event_loop_->configuration(), connection->name()->string_view());
- const size_t other_node_index = configuration::GetNodeIndex(
- event_loop_->configuration(), other_node);
const bool delivery_time_is_logged =
configuration::ConnectionDeliveryTimeIsLoggedOnNode(
connection, event_loop_->node());
- // Conditionally create the timestamp logger if we are supposed to log
- // timestamps from it.
- if (delivery_time_is_logged && !timestamp_loggers_[other_node_index]) {
- timestamp_loggers_[other_node_index] =
- event_loop_->MakeSender<RemoteMessage>(
- absl::StrCat("/aos/remote_timestamps/",
- connection->name()->string_view()));
- }
state->AddPeer(
connection,
configuration::GetNodeIndex(event_loop_->configuration(),
@@ -349,8 +336,9 @@
server_status_.FindServerConnection(
connection->name()->string_view()),
configuration::ChannelMessageIsLoggedOnNode(channel, other_node),
- delivery_time_is_logged ? ×tamp_loggers_[other_node_index]
- : nullptr);
+ delivery_time_is_logged
+ ? timestamp_loggers_.SenderForChannel(channel, connection)
+ : nullptr);
}
// Don't subscribe to timestamps on the timestamp channel. Those get
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index fea2d06..88433d1 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -13,6 +13,7 @@
#include "aos/network/message_bridge_server_status.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/sctp_server.h"
+#include "aos/network/timestamp_channel.h"
#include "aos/network/timestamp_generated.h"
#include "glog/logging.h"
@@ -127,7 +128,7 @@
// Event loop to schedule everything on.
aos::ShmEventLoop *event_loop_;
- std::vector<aos::Sender<RemoteMessage>> timestamp_loggers_;
+ ChannelTimestampSender timestamp_loggers_;
SctpServer server_;
MessageBridgeServerStatus server_status_;
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index c8c24cf..1f926c8 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -32,15 +32,28 @@
aos::SetShmBase(ShmBase(node));
}
-class MessageBridgeTest : public ::testing::Test {
+// Parameters to run all the tests with.
+struct Param {
+ // The config file to use.
+ std::string config;
+ // If true, the RemoteMessage channel should be shared between all the remote
+ // channels. If false, there will be 1 RemoteMessage channel per remote
+ // channel.
+ bool shared;
+};
+
+class MessageBridgeParameterizedTest
+ : public ::testing::TestWithParam<struct Param> {
public:
- MessageBridgeTest(std::string_view path =
- "aos/network/message_bridge_test_common_config.json")
- : config(aos::configuration::ReadConfig(path)) {
+ MessageBridgeParameterizedTest()
+ : config(aos::configuration::ReadConfig(
+ absl::StrCat("aos/network/", GetParam().config))) {
util::UnlinkRecursive(ShmBase("pi1"));
util::UnlinkRecursive(ShmBase("pi2"));
}
+ bool shared() const { return GetParam().shared; }
+
void OnPi1() {
DoSetShmBase("pi1");
FLAGS_override_hostname = "raspberrypi";
@@ -288,7 +301,7 @@
};
// Test that we can send a ping message over sctp and receive it.
-TEST_F(MessageBridgeTest, PingPong) {
+TEST_P(MessageBridgeParameterizedTest, PingPong) {
// This is rather annoying to set up. We need to start up a client and
// server, on the same node, but get them to think that they are on different
// nodes.
@@ -324,7 +337,8 @@
aos::ShmEventLoop pi1_test_event_loop(&config.message());
aos::Fetcher<RemoteMessage> message_header_fetcher1 =
pi1_test_event_loop.MakeFetcher<RemoteMessage>(
- "/pi1/aos/remote_timestamps/pi2");
+ shared() ? "/pi1/aos/remote_timestamps/pi2"
+ : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping");
// Fetchers for confirming the remote timestamps made it.
aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
@@ -350,7 +364,9 @@
test_event_loop.MakeFetcher<ClientStatistics>("/aos");
aos::Fetcher<RemoteMessage> message_header_fetcher2 =
test_event_loop.MakeFetcher<RemoteMessage>(
- "/pi2/aos/remote_timestamps/pi1");
+ shared() ? "/pi2/aos/remote_timestamps/pi1"
+ : "/pi2/aos/remote_timestamps/pi1/pi2/aos/"
+ "aos-message_bridge-Timestamp");
// Event loop for fetching data delivered to pi2 from pi1 to match up
// messages.
@@ -511,83 +527,99 @@
// For each remote timestamp we get back, confirm that it is either a ping
// message, or a timestamp we sent out. Also confirm that the timestamps are
// correct.
- ping_event_loop.MakeWatcher(
- "/pi1/aos/remote_timestamps/pi2",
- [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
- &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
- &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
- VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
- << aos::FlatbufferToJson(&header);
+ for (std::pair<int, std::string> channel :
+ shared()
+ ? std::vector<std::pair<
+ int, std::string>>{{-1, "/pi1/aos/remote_timestamps/pi2"}}
+ : std::vector<std::pair<int, std::string>>{
+ {pi1_timestamp_channel,
+ "/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+ "aos-message_bridge-Timestamp"},
+ {ping_timestamp_channel,
+ "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping"}}) {
+ ping_event_loop.MakeWatcher(
+ channel.second,
+ [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
+ &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
+ &pi1_on_pi1_timestamp_fetcher,
+ channel_index = channel.first](const RemoteMessage &header) {
+ VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ << aos::FlatbufferToJson(&header);
- EXPECT_TRUE(header.has_boot_uuid());
-
- const aos::monotonic_clock::time_point header_monotonic_sent_time(
- chrono::nanoseconds(header.monotonic_sent_time()));
- const aos::realtime_clock::time_point header_realtime_sent_time(
- chrono::nanoseconds(header.realtime_sent_time()));
- const aos::monotonic_clock::time_point header_monotonic_remote_time(
- chrono::nanoseconds(header.monotonic_remote_time()));
- const aos::realtime_clock::time_point header_realtime_remote_time(
- chrono::nanoseconds(header.realtime_remote_time()));
-
- const Context *pi1_context = nullptr;
- const Context *pi2_context = nullptr;
-
- if (header.channel_index() == pi1_timestamp_channel) {
- // Find the forwarded message.
- while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
- header_monotonic_sent_time) {
- ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+ EXPECT_TRUE(header.has_boot_uuid());
+ if (channel_index != -1) {
+ ASSERT_EQ(channel_index, header.channel_index());
}
- // And the source message.
- while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
- header_monotonic_remote_time) {
- ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+ const aos::monotonic_clock::time_point header_monotonic_sent_time(
+ chrono::nanoseconds(header.monotonic_sent_time()));
+ const aos::realtime_clock::time_point header_realtime_sent_time(
+ chrono::nanoseconds(header.realtime_sent_time()));
+ const aos::monotonic_clock::time_point header_monotonic_remote_time(
+ chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::realtime_clock::time_point header_realtime_remote_time(
+ chrono::nanoseconds(header.realtime_remote_time()));
+
+ const Context *pi1_context = nullptr;
+ const Context *pi2_context = nullptr;
+
+ if (header.channel_index() == pi1_timestamp_channel) {
+ // Find the forwarded message.
+ while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+ }
+
+ pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
+ pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+ } else if (header.channel_index() == ping_timestamp_channel) {
+ // Find the forwarded message.
+ while (ping_on_pi2_fetcher.context().monotonic_event_time <
+ header_monotonic_sent_time) {
+ ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+ }
+
+ // And the source message.
+ while (ping_on_pi1_fetcher.context().monotonic_event_time <
+ header_monotonic_remote_time) {
+ ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+ }
+
+ pi1_context = &ping_on_pi1_fetcher.context();
+ pi2_context = &ping_on_pi2_fetcher.context();
+ } else {
+ LOG(FATAL) << "Unknown channel";
}
- pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
- pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
- } else if (header.channel_index() == ping_timestamp_channel) {
- // Find the forwarded message.
- while (ping_on_pi2_fetcher.context().monotonic_event_time <
- header_monotonic_sent_time) {
- ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
- }
+ // Confirm the forwarded message has matching timestamps to the
+ // timestamps we got back.
+ EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi2_context->monotonic_event_time,
+ header_monotonic_sent_time);
+ EXPECT_EQ(pi2_context->realtime_event_time,
+ header_realtime_sent_time);
+ EXPECT_EQ(pi2_context->realtime_remote_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_remote_time,
+ header_monotonic_remote_time);
- // And the source message.
- while (ping_on_pi1_fetcher.context().monotonic_event_time <
- header_monotonic_remote_time) {
- ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
- }
+ // Confirm the forwarded message also matches the source message.
+ EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+ EXPECT_EQ(pi1_context->monotonic_event_time,
+ header_monotonic_remote_time);
+ EXPECT_EQ(pi1_context->realtime_event_time,
+ header_realtime_remote_time);
+ });
+ }
- pi1_context = &ping_on_pi1_fetcher.context();
- pi2_context = &ping_on_pi2_fetcher.context();
- } else {
- LOG(FATAL) << "Unknown channel";
- }
-
- // Confirm the forwarded message has matching timestamps to the
- // timestamps we got back.
- EXPECT_EQ(pi2_context->queue_index, header.queue_index());
- EXPECT_EQ(pi2_context->monotonic_event_time,
- header_monotonic_sent_time);
- EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
- EXPECT_EQ(pi2_context->realtime_remote_time,
- header_realtime_remote_time);
- EXPECT_EQ(pi2_context->monotonic_remote_time,
- header_monotonic_remote_time);
-
- // Confirm the forwarded message also matches the source message.
- EXPECT_EQ(pi1_context->queue_index, header.queue_index());
- EXPECT_EQ(pi1_context->monotonic_event_time,
- header_monotonic_remote_time);
- EXPECT_EQ(pi1_context->realtime_event_time,
- header_realtime_remote_time);
- });
-
- // Start everything up. Pong is the only thing we don't know how to wait on,
- // so start it first.
+ // Start everything up. Pong is the only thing we don't know how to wait
+ // on, so start it first.
std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
StartPi1Server();
@@ -641,7 +673,7 @@
// Test that the client disconnecting triggers the server offsets on both sides
// to clear.
-TEST_F(MessageBridgeTest, ClientRestart) {
+TEST_P(MessageBridgeParameterizedTest, ClientRestart) {
// This is rather annoying to set up. We need to start up a client and
// server, on the same node, but get them to think that they are on different
// nodes.
@@ -778,7 +810,7 @@
// Test that the server disconnecting triggers the server offsets on the other
// side to clear, along with the other client.
-TEST_F(MessageBridgeTest, ServerRestart) {
+TEST_P(MessageBridgeParameterizedTest, ServerRestart) {
// This is rather annoying to set up. We need to start up a client and
// server, on the same node, but get them to think that they are on different
// nodes.
@@ -932,7 +964,7 @@
// Tests that when a message is sent before the bridge starts up, but is
// configured as reliable, we forward it. Confirm this survives a client reset.
-TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
+TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeClientStartup) {
OnPi1();
FLAGS_application_name = "sender";
@@ -967,15 +999,20 @@
receive_event_loop.configuration(), ping_fetcher.channel());
std::atomic<int> ping_timestamp_count{0};
+ const std::string channel_name =
+ shared() ? "/pi1/aos/remote_timestamps/pi2"
+ : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
pi1_remote_timestamp_event_loop.MakeWatcher(
- "/pi1/aos/remote_timestamps/pi2",
- [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
- VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ channel_name, [this, channel_name, ping_channel_index,
+ &ping_timestamp_count](const RemoteMessage &header) {
+ VLOG(1) <<channel_name << " RemoteMessage "
<< aos::FlatbufferToJson(&header);
EXPECT_TRUE(header.has_boot_uuid());
- if (header.channel_index() == ping_channel_index) {
- ++ping_timestamp_count;
+ if (shared() && header.channel_index() != ping_channel_index) {
+ return;
}
+ CHECK_EQ(header.channel_index(), ping_channel_index);
+ ++ping_timestamp_count;
});
// Before everything starts up, confirm there is no message.
@@ -1048,7 +1085,7 @@
// Tests that when a message is sent before the bridge starts up, but is
// configured as reliable, we forward it. Confirm this works across server
// resets.
-TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
+TEST_P(MessageBridgeParameterizedTest, ReliableSentBeforeServerStartup) {
// Now do it for "raspberrypi2", the client.
OnPi2();
@@ -1087,15 +1124,20 @@
receive_event_loop.configuration(), ping_fetcher.channel());
std::atomic<int> ping_timestamp_count{0};
+ const std::string channel_name =
+ shared() ? "/pi1/aos/remote_timestamps/pi2"
+ : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
pi1_remote_timestamp_event_loop.MakeWatcher(
- "/pi1/aos/remote_timestamps/pi2",
- [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
- VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+ channel_name, [this, channel_name, ping_channel_index,
+ &ping_timestamp_count](const RemoteMessage &header) {
+ VLOG(1) << channel_name << " RemoteMessage "
<< aos::FlatbufferToJson(&header);
EXPECT_TRUE(header.has_boot_uuid());
- if (header.channel_index() == ping_channel_index) {
- ++ping_timestamp_count;
+ if (shared() && header.channel_index() != ping_channel_index) {
+ return;
}
+ CHECK_EQ(header.channel_index(), ping_channel_index);
+ ++ping_timestamp_count;
});
// Before everything starts up, confirm there is no message.
@@ -1166,6 +1208,13 @@
pi1_remote_timestamp_thread.join();
}
+INSTANTIATE_TEST_CASE_P(
+ MessageBridgeTests, MessageBridgeParameterizedTest,
+ ::testing::Values(
+ Param{"message_bridge_test_combined_timestamps_common_config.json",
+ true},
+ Param{"message_bridge_test_common_config.json", false}));
+
} // namespace testing
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_test_combined_timestamps_common.json b/aos/network/message_bridge_test_combined_timestamps_common.json
new file mode 100644
index 0000000..74c932d
--- /dev/null
+++ b/aos/network/message_bridge_test_combined_timestamps_common.json
@@ -0,0 +1,180 @@
+{
+ "channels": [
+ {
+ "name": "/pi1/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi1",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.logging.LogMessageFbs",
+ "source_node": "pi2",
+ "frequency": 200,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi1",
+ "frequency": 10,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"],
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.Timestamp",
+ "source_node": "pi2",
+ "frequency": 10,
+ "max_size": 200,
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi2"],
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2",
+ "type": "aos.message_bridge.RemoteMessage",
+ "source_node": "pi1",
+ "frequency": 10
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1",
+ "type": "aos.message_bridge.RemoteMessage",
+ "source_node": "pi2",
+ "frequency": 10
+ },
+ {
+ "name": "/pi1/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi1",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/pi2/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi2",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
+ }
+ ]
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong",
+ "source_node": "pi2",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["pi1"],
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"]
+ }
+ ]
+ },
+ {
+ "name": "/unreliable",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["pi1"],
+ "time_to_live": 5000000
+ }
+ ]
+ }
+ ],
+ "maps": [
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/pi1/aos"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos*",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/pi2/aos"
+ }
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "localhost",
+ "hostnames": ["raspberrypi"],
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "localhost",
+ "hostnames": ["raspberrypi2"],
+ "port": 9972
+ }
+ ]
+}
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index 74c932d..99c80a9 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -73,18 +73,36 @@
"frequency": 2
},
{
- "name": "/pi1/aos/remote_timestamps/pi2",
+ "name": "/pi1/aos/remote_timestamps/pi2/pi1/aos/aos-message_bridge-Timestamp",
"type": "aos.message_bridge.RemoteMessage",
"source_node": "pi1",
"frequency": 10
},
{
- "name": "/pi2/aos/remote_timestamps/pi1",
+ "name": "/pi2/aos/remote_timestamps/pi1/pi2/aos/aos-message_bridge-Timestamp",
"type": "aos.message_bridge.RemoteMessage",
"source_node": "pi2",
"frequency": 10
},
{
+ "name": "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping",
+ "type": "aos.message_bridge.RemoteMessage",
+ "source_node": "pi1",
+ "frequency": 10
+ },
+ {
+ "name": "/pi2/aos/remote_timestamps/pi1/test/aos-examples-Pong",
+ "type": "aos.message_bridge.RemoteMessage",
+ "source_node": "pi2",
+ "frequency": 10
+ },
+ {
+ "name": "/pi1/aos/remote_timestamps/pi2/unreliable/aos-examples-Ping",
+ "type": "aos.message_bridge.RemoteMessage",
+ "source_node": "pi1",
+ "frequency": 10
+ },
+ {
"name": "/pi1/aos",
"type": "aos.timing.Report",
"source_node": "pi1",
@@ -124,7 +142,7 @@
"name": "pi1",
"priority": 1,
"timestamp_logger": "REMOTE_LOGGER",
- "timestamp_logger_nodes": ["pi1"]
+ "timestamp_logger_nodes": ["pi2"]
}
]
},
diff --git a/aos/network/timestamp_channel.cc b/aos/network/timestamp_channel.cc
new file mode 100644
index 0000000..12c6713
--- /dev/null
+++ b/aos/network/timestamp_channel.cc
@@ -0,0 +1,67 @@
+#include "aos/network/timestamp_channel.h"
+
+#include "absl/strings/str_cat.h"
+
+namespace aos {
+namespace message_bridge {
+
+ChannelTimestampSender::ChannelTimestampSender(aos::EventLoop *event_loop)
+ : event_loop_(event_loop) {
+ CHECK(configuration::MultiNode(event_loop_->configuration()));
+ timestamp_loggers_.resize(event_loop_->configuration()->nodes()->size());
+}
+
+aos::Sender<RemoteMessage> *ChannelTimestampSender::SenderForChannel(
+ const Channel *channel, const Connection *connection) {
+ // Look at any pre-created channel/connection pairs.
+ auto it =
+ channel_timestamp_loggers_.find(std::make_pair(channel, connection));
+ if (it != channel_timestamp_loggers_.end()) {
+ return it->second.get();
+ }
+
+ const Node *other_node = configuration::GetNode(
+ event_loop_->configuration(), connection->name()->string_view());
+ const size_t other_node_index =
+ configuration::GetNodeIndex(event_loop_->configuration(), other_node);
+
+ std::string type(channel->type()->string_view());
+ std::replace(type.begin(), type.end(), '.', '-');
+ const std::string single_timestamp_channel =
+ absl::StrCat("/aos/remote_timestamps/", connection->name()->string_view(),
+ channel->name()->string_view(), "/", type);
+ if (event_loop_->HasChannel<RemoteMessage>(single_timestamp_channel)) {
+ LOG(INFO) << "Making RemoteMessage channel " << single_timestamp_channel;
+ auto result = channel_timestamp_loggers_.try_emplace(
+ std::make_pair(channel, connection),
+ std::make_unique<aos::Sender<RemoteMessage>>(
+ event_loop_->MakeSender<RemoteMessage>(single_timestamp_channel)));
+ return result.first->second.get();
+ } else {
+ // Then look for any per-remote-node channels.
+ if (timestamp_loggers_[other_node_index]) {
+ return ×tamp_loggers_[other_node_index];
+ }
+ const std::string shared_timestamp_channel = absl::StrCat(
+ "/aos/remote_timestamps/", connection->name()->string_view());
+ LOG(INFO) << "Looking for " << shared_timestamp_channel;
+ if (event_loop_->HasChannel<RemoteMessage>(shared_timestamp_channel)) {
+ LOG(WARNING) << "Failed to find timestamp channel {\"name\": \""
+ << single_timestamp_channel << "\", \"type\": \""
+ << RemoteMessage::GetFullyQualifiedName()
+ << "\"}, falling back to old version.";
+ timestamp_loggers_[other_node_index] =
+ event_loop_->MakeSender<RemoteMessage>(shared_timestamp_channel);
+ return ×tamp_loggers_[other_node_index];
+ } else {
+ LOG(ERROR) << "Failed";
+ }
+
+ // Explode with an error about the new channel.
+ event_loop_->MakeSender<RemoteMessage>(single_timestamp_channel);
+ return nullptr;
+ }
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/timestamp_channel.h b/aos/network/timestamp_channel.h
new file mode 100644
index 0000000..b62c5e9
--- /dev/null
+++ b/aos/network/timestamp_channel.h
@@ -0,0 +1,50 @@
+#ifndef AOS_NETWORK_TIMESTAMP_CHANNEL_
+#define AOS_NETWORK_TIMESTAMP_CHANNEL_
+
+#include <vector>
+
+#include "absl/container/btree_map.h"
+#include "aos/configuration.h"
+#include "aos/events/event_loop.h"
+#include "aos/network/remote_message_generated.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Class to manage lifetime, and creating senders for channels and connections.
+class ChannelTimestampSender {
+ public:
+ ChannelTimestampSender(aos::EventLoop *event_loop);
+
+ aos::Sender<RemoteMessage> *SenderForChannel(const Channel *channel,
+ const Connection *connection);
+
+ private:
+ aos::EventLoop *event_loop_;
+
+ // We've got 3 cases to consider.
+ // 1) The old single channel per connection exists.
+ // 2) A new channel per channel exists.
+ // 3) Both exist.
+ //
+ // I want the default to be such that if no channel is found, we explode
+ // looking for the single channel per channel. This means users will add the
+ // new channel when blindly fixing errors, which is what we want.
+ //
+ // I'd prefer 3) to be an error, but don't have strong opinions. We will
+ // still be correct if it gets used, as long as everything is consistent.
+
+ // List of Senders per node.
+ std::vector<aos::Sender<RemoteMessage>> timestamp_loggers_;
+
+ // Mapping from channel and connection to logger.
+ absl::btree_map<std::pair<const Channel *, const Connection *>,
+ std::unique_ptr<aos::Sender<RemoteMessage>>>
+ channel_timestamp_loggers_;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_TIMESTAMP_CHANNEL_