Resend any reliable messages received before startup
We want to be able to publish (and receive) low frequency configuration
messages. This lets us make those *not* periodic when we move them
across nodes, simplifying the system.
This takes some special tracking at startup. We very much don't want to
re-send messages already sent. That would result in 2 receive
timestamps for a single node for 1 send packet, probably breaking log
sorting. I'm not interested in learning what would break...
Change-Id: I489460cd4919907516e504e6694d7cef544b0da6
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index a3e7f63..36bdde9 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -4,6 +4,7 @@
#include <thread>
#include "absl/strings/str_cat.h"
+#include "aos/event.h"
#include "aos/events/ping_generated.h"
#include "aos/events/pong_generated.h"
#include "aos/network/message_bridge_client_lib.h"
@@ -817,11 +818,323 @@
pi2_test_thread.join();
}
-// TODO(austin): This test confirms that the external state does the right
+// TODO(austin): The above test confirms that the external state does the right
// thing, but doesn't confirm that the internal state does. We either need to
// expose a way to check the state in a thread-safe way, or need a way to jump
// time for one node to do that.
+void SendPing(aos::Sender<examples::Ping> *sender, int value) {
+ aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
+ examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
+ ping_builder.add_value(value);
+ builder.Send(ping_builder.Finish());
+}
+
+// Tests that when a message is sent before the bridge starts up, but is
+// configured as reliable, we forward it. Confirm this survives a client reset.
+TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
+ DoSetShmBase("pi1");
+ // Force ourselves to be "raspberrypi" and allocate everything.
+ FLAGS_override_hostname = "raspberrypi";
+
+ FLAGS_application_name = "sender";
+ aos::ShmEventLoop send_event_loop(&pi1_config.message());
+ aos::Sender<examples::Ping> ping_sender =
+ send_event_loop.MakeSender<examples::Ping>("/test");
+ SendPing(&ping_sender, 1);
+ aos::Sender<examples::Ping> unreliable_ping_sender =
+ send_event_loop.MakeSender<examples::Ping>("/unreliable");
+ SendPing(&unreliable_ping_sender, 1);
+
+ FLAGS_application_name = "pi1_message_bridge_server";
+ aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ FLAGS_application_name = "pi1_message_bridge_client";
+ aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
+ MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+ FLAGS_application_name = "pi1_timestamp";
+ aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
+
+ // Now do it for "raspberrypi2", the client.
+ DoSetShmBase("pi2");
+ FLAGS_override_hostname = "raspberrypi2";
+
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+ aos::ShmEventLoop receive_event_loop(&pi2_config.message());
+ aos::Fetcher<examples::Ping> ping_fetcher =
+ receive_event_loop.MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
+ receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
+ aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
+ receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
+
+ const size_t ping_channel_index = configuration::ChannelIndex(
+ receive_event_loop.configuration(), ping_fetcher.channel());
+
+ std::atomic<int> ping_timestamp_count{0};
+ pi1_remote_timestamp_event_loop.MakeWatcher(
+ "/pi1/aos/remote_timestamps/pi2",
+ [ping_channel_index,
+ &ping_timestamp_count](const logger::MessageHeader &header) {
+ VLOG(1) << aos::FlatbufferToJson(&header);
+ if (header.channel_index() == ping_channel_index) {
+ ++ping_timestamp_count;
+ }
+ });
+
+ // Before everything starts up, confirm there is no message.
+ EXPECT_FALSE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+
+ // Spin up the persistant pieces.
+ std::thread pi1_server_thread(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+ std::thread pi1_client_thread(
+ [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+ std::thread pi2_server_thread(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+
+ // Event used to wait for the timestamp counting thread to start.
+ aos::Event event;
+ std::thread pi1_remote_timestamp_thread(
+ [&pi1_remote_timestamp_event_loop, &event]() {
+ pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
+ pi1_remote_timestamp_event_loop.Run();
+ });
+
+ event.Wait();
+
+ {
+ // Now, spin up a client for 2 seconds.
+ LOG(INFO) << "Starting first pi2 MessageBridgeClient";
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+ pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_client_event_loop.monotonic_now() +
+ chrono::milliseconds(2050));
+ });
+
+ // And go!
+ pi2_client_event_loop.Run();
+
+ // Confirm there is no detected duplicate packet.
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->duplicate_packets(),
+ 0u);
+
+ EXPECT_TRUE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+ EXPECT_EQ(ping_timestamp_count, 1);
+ LOG(INFO) << "Shutting down first pi2 MessageBridgeClient";
+ }
+
+ {
+ // Now, spin up a second client for 2 seconds.
+ LOG(INFO) << "Starting second pi2 MessageBridgeClient";
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+ pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_client_event_loop.monotonic_now() +
+ chrono::milliseconds(5050));
+ });
+
+ // And go!
+ pi2_client_event_loop.Run();
+
+ // Confirm we detect the duplicate packet correctly.
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->duplicate_packets(),
+ 1u);
+
+ EXPECT_EQ(ping_timestamp_count, 1);
+ EXPECT_FALSE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+ }
+
+ // Shut everyone else down
+ pi1_server_event_loop.Exit();
+ pi1_client_event_loop.Exit();
+ pi2_server_event_loop.Exit();
+ pi1_remote_timestamp_event_loop.Exit();
+ pi1_remote_timestamp_thread.join();
+ pi1_server_thread.join();
+ pi1_client_thread.join();
+ pi2_server_thread.join();
+}
+
+// Tests that when a message is sent before the bridge starts up, but is
+// configured as reliable, we forward it. Confirm this works across server
+// resets.
+TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
+ // Now do it for "raspberrypi2", the client.
+ DoSetShmBase("pi2");
+ FLAGS_override_hostname = "raspberrypi2";
+
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ aos::ShmEventLoop receive_event_loop(&pi2_config.message());
+ aos::Fetcher<examples::Ping> ping_fetcher =
+ receive_event_loop.MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
+ receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
+ aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
+ receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
+
+ DoSetShmBase("pi1");
+ // Force ourselves to be "raspberrypi" and allocate everything.
+ FLAGS_override_hostname = "raspberrypi";
+
+ FLAGS_application_name = "sender";
+ aos::ShmEventLoop send_event_loop(&pi1_config.message());
+ aos::Sender<examples::Ping> ping_sender =
+ send_event_loop.MakeSender<examples::Ping>("/test");
+ {
+ aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
+ examples::Ping::Builder ping_builder =
+ builder.MakeBuilder<examples::Ping>();
+ ping_builder.add_value(1);
+ builder.Send(ping_builder.Finish());
+ }
+
+ FLAGS_application_name = "pi1_message_bridge_client";
+ aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
+ MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+ FLAGS_application_name = "pi1_timestamp";
+ aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
+
+ const size_t ping_channel_index = configuration::ChannelIndex(
+ receive_event_loop.configuration(), ping_fetcher.channel());
+
+ std::atomic<int> ping_timestamp_count{0};
+ pi1_remote_timestamp_event_loop.MakeWatcher(
+ "/pi1/aos/remote_timestamps/pi2",
+ [ping_channel_index,
+ &ping_timestamp_count](const logger::MessageHeader &header) {
+ VLOG(1) << aos::FlatbufferToJson(&header);
+ if (header.channel_index() == ping_channel_index) {
+ ++ping_timestamp_count;
+ }
+ });
+
+ // Before everything starts up, confirm there is no message.
+ EXPECT_FALSE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+
+ // Spin up the persistant pieces.
+ std::thread pi1_client_thread(
+ [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+ std::thread pi2_server_thread(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+ std::thread pi2_client_thread(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+
+ // Event used to wait for the timestamp counting thread to start.
+ aos::Event event;
+ std::thread pi1_remote_timestamp_thread(
+ [&pi1_remote_timestamp_event_loop, &event]() {
+ pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
+ pi1_remote_timestamp_event_loop.Run();
+ });
+
+ event.Wait();
+
+ {
+ // Now, spin up a server for 2 seconds.
+ FLAGS_application_name = "pi1_message_bridge_server";
+ aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
+ pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi1_server_event_loop.monotonic_now() +
+ chrono::milliseconds(2050));
+ });
+
+ // And go!
+ pi1_server_event_loop.Run();
+
+ // Confirm there is no detected duplicate packet.
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->duplicate_packets(),
+ 0u);
+
+ EXPECT_TRUE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+ EXPECT_EQ(ping_timestamp_count, 1);
+ LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
+ }
+
+ {
+ // Now, spin up a second server for 2 seconds.
+ FLAGS_application_name = "pi1_message_bridge_server";
+ aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
+ pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi1_server_event_loop.monotonic_now() +
+ chrono::milliseconds(2050));
+ });
+
+ // And go!
+ pi1_server_event_loop.Run();
+
+ // Confirm we detect the duplicate packet correctly.
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->duplicate_packets(),
+ 1u);
+
+ EXPECT_EQ(ping_timestamp_count, 1);
+ EXPECT_FALSE(ping_fetcher.Fetch());
+ EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+ LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
+ }
+
+ // Shut everyone else down
+ pi1_client_event_loop.Exit();
+ pi2_server_event_loop.Exit();
+ pi2_client_event_loop.Exit();
+ pi1_remote_timestamp_event_loop.Exit();
+ pi1_remote_timestamp_thread.join();
+ pi1_client_thread.join();
+ pi2_server_thread.join();
+ pi2_client_thread.join();
+}
+
} // namespace testing
} // namespace message_bridge
} // namespace aos