Resend any reliable messages received before startup

We want to be able to publish (and receive) low frequency configuration
messages.  This lets us make those *not* periodic when we move them
across nodes, simplifying the system.

This takes some special tracking at startup.  We very much don't want to
re-send messages already sent.  That would result in 2 receive
timestamps for a single node for 1 send packet, probably breaking log
sorting.  I'm not interested in learning what would break...

Change-Id: I489460cd4919907516e504e6694d7cef544b0da6
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index a3e7f63..36bdde9 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -4,6 +4,7 @@
 #include <thread>
 
 #include "absl/strings/str_cat.h"
+#include "aos/event.h"
 #include "aos/events/ping_generated.h"
 #include "aos/events/pong_generated.h"
 #include "aos/network/message_bridge_client_lib.h"
@@ -817,11 +818,323 @@
   pi2_test_thread.join();
 }
 
-// TODO(austin): This test confirms that the external state does the right
+// TODO(austin): The above test confirms that the external state does the right
 // thing, but doesn't confirm that the internal state does.  We either need to
 // expose a way to check the state in a thread-safe way, or need a way to jump
 // time for one node to do that.
 
+void SendPing(aos::Sender<examples::Ping> *sender, int value) {
+  aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
+  examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
+  ping_builder.add_value(value);
+  builder.Send(ping_builder.Finish());
+}
+
+// Tests that when a message is sent before the bridge starts up, but is
+// configured as reliable, we forward it.  Confirm this survives a client reset.
+TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
+  DoSetShmBase("pi1");
+  // Force ourselves to be "raspberrypi" and allocate everything.
+  FLAGS_override_hostname = "raspberrypi";
+
+  FLAGS_application_name = "sender";
+  aos::ShmEventLoop send_event_loop(&pi1_config.message());
+  aos::Sender<examples::Ping> ping_sender =
+      send_event_loop.MakeSender<examples::Ping>("/test");
+  SendPing(&ping_sender, 1);
+  aos::Sender<examples::Ping> unreliable_ping_sender =
+      send_event_loop.MakeSender<examples::Ping>("/unreliable");
+  SendPing(&unreliable_ping_sender, 1);
+
+  FLAGS_application_name = "pi1_message_bridge_server";
+  aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+  MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+  FLAGS_application_name = "pi1_message_bridge_client";
+  aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
+  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+  FLAGS_application_name = "pi1_timestamp";
+  aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
+
+  // Now do it for "raspberrypi2", the client.
+  DoSetShmBase("pi2");
+  FLAGS_override_hostname = "raspberrypi2";
+
+  FLAGS_application_name = "pi2_message_bridge_server";
+  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+  MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+  aos::ShmEventLoop receive_event_loop(&pi2_config.message());
+  aos::Fetcher<examples::Ping> ping_fetcher =
+      receive_event_loop.MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
+      receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
+  aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
+      receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
+
+  const size_t ping_channel_index = configuration::ChannelIndex(
+      receive_event_loop.configuration(), ping_fetcher.channel());
+
+  std::atomic<int> ping_timestamp_count{0};
+  pi1_remote_timestamp_event_loop.MakeWatcher(
+      "/pi1/aos/remote_timestamps/pi2",
+      [ping_channel_index,
+       &ping_timestamp_count](const logger::MessageHeader &header) {
+        VLOG(1) << aos::FlatbufferToJson(&header);
+        if (header.channel_index() == ping_channel_index) {
+          ++ping_timestamp_count;
+        }
+      });
+
+  // Before everything starts up, confirm there is no message.
+  EXPECT_FALSE(ping_fetcher.Fetch());
+  EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+
+  // Spin up the persistant pieces.
+  std::thread pi1_server_thread(
+      [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+  std::thread pi1_client_thread(
+      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+  std::thread pi2_server_thread(
+      [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+
+  // Event used to wait for the timestamp counting thread to start.
+  aos::Event event;
+  std::thread pi1_remote_timestamp_thread(
+      [&pi1_remote_timestamp_event_loop, &event]() {
+        pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
+        pi1_remote_timestamp_event_loop.Run();
+      });
+
+  event.Wait();
+
+  {
+    // Now, spin up a client for 2 seconds.
+    LOG(INFO) << "Starting first pi2 MessageBridgeClient";
+    FLAGS_application_name = "pi2_message_bridge_client";
+    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+    aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
+        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_client_event_loop.monotonic_now() +
+                  chrono::milliseconds(2050));
+    });
+
+    // And go!
+    pi2_client_event_loop.Run();
+
+    // Confirm there is no detected duplicate packet.
+    EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->duplicate_packets(),
+              0u);
+
+    EXPECT_TRUE(ping_fetcher.Fetch());
+    EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+    EXPECT_EQ(ping_timestamp_count, 1);
+    LOG(INFO) << "Shutting down first pi2 MessageBridgeClient";
+  }
+
+  {
+    // Now, spin up a second client for 2 seconds.
+    LOG(INFO) << "Starting second pi2 MessageBridgeClient";
+    FLAGS_application_name = "pi2_message_bridge_client";
+    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+    aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
+        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_client_event_loop.monotonic_now() +
+                  chrono::milliseconds(5050));
+    });
+
+    // And go!
+    pi2_client_event_loop.Run();
+
+    // Confirm we detect the duplicate packet correctly.
+    EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->duplicate_packets(),
+              1u);
+
+    EXPECT_EQ(ping_timestamp_count, 1);
+    EXPECT_FALSE(ping_fetcher.Fetch());
+    EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+  }
+
+  // Shut everyone else down
+  pi1_server_event_loop.Exit();
+  pi1_client_event_loop.Exit();
+  pi2_server_event_loop.Exit();
+  pi1_remote_timestamp_event_loop.Exit();
+  pi1_remote_timestamp_thread.join();
+  pi1_server_thread.join();
+  pi1_client_thread.join();
+  pi2_server_thread.join();
+}
+
+// Tests that when a message is sent before the bridge starts up, but is
+// configured as reliable, we forward it.  Confirm this works across server
+// resets.
+TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
+  // Now do it for "raspberrypi2", the client.
+  DoSetShmBase("pi2");
+  FLAGS_override_hostname = "raspberrypi2";
+
+  FLAGS_application_name = "pi2_message_bridge_server";
+  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+  MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+  FLAGS_application_name = "pi2_message_bridge_client";
+  aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+  MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+  aos::ShmEventLoop receive_event_loop(&pi2_config.message());
+  aos::Fetcher<examples::Ping> ping_fetcher =
+      receive_event_loop.MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
+      receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
+  aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
+      receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
+
+  DoSetShmBase("pi1");
+  // Force ourselves to be "raspberrypi" and allocate everything.
+  FLAGS_override_hostname = "raspberrypi";
+
+  FLAGS_application_name = "sender";
+  aos::ShmEventLoop send_event_loop(&pi1_config.message());
+  aos::Sender<examples::Ping> ping_sender =
+      send_event_loop.MakeSender<examples::Ping>("/test");
+  {
+    aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
+    examples::Ping::Builder ping_builder =
+        builder.MakeBuilder<examples::Ping>();
+    ping_builder.add_value(1);
+    builder.Send(ping_builder.Finish());
+  }
+
+  FLAGS_application_name = "pi1_message_bridge_client";
+  aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
+  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+  FLAGS_application_name = "pi1_timestamp";
+  aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
+
+  const size_t ping_channel_index = configuration::ChannelIndex(
+      receive_event_loop.configuration(), ping_fetcher.channel());
+
+  std::atomic<int> ping_timestamp_count{0};
+  pi1_remote_timestamp_event_loop.MakeWatcher(
+      "/pi1/aos/remote_timestamps/pi2",
+      [ping_channel_index,
+       &ping_timestamp_count](const logger::MessageHeader &header) {
+        VLOG(1) << aos::FlatbufferToJson(&header);
+        if (header.channel_index() == ping_channel_index) {
+          ++ping_timestamp_count;
+        }
+      });
+
+  // Before everything starts up, confirm there is no message.
+  EXPECT_FALSE(ping_fetcher.Fetch());
+  EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+
+  // Spin up the persistant pieces.
+  std::thread pi1_client_thread(
+      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+  std::thread pi2_server_thread(
+      [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+  std::thread pi2_client_thread(
+      [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+
+  // Event used to wait for the timestamp counting thread to start.
+  aos::Event event;
+  std::thread pi1_remote_timestamp_thread(
+      [&pi1_remote_timestamp_event_loop, &event]() {
+        pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
+        pi1_remote_timestamp_event_loop.Run();
+      });
+
+  event.Wait();
+
+  {
+    // Now, spin up a server for 2 seconds.
+    FLAGS_application_name = "pi1_message_bridge_server";
+    aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+    MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+    aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
+        [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
+    pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi1_server_event_loop.monotonic_now() +
+                  chrono::milliseconds(2050));
+    });
+
+    // And go!
+    pi1_server_event_loop.Run();
+
+    // Confirm there is no detected duplicate packet.
+    EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->duplicate_packets(),
+              0u);
+
+    EXPECT_TRUE(ping_fetcher.Fetch());
+    EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+    EXPECT_EQ(ping_timestamp_count, 1);
+    LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
+  }
+
+  {
+    // Now, spin up a second server for 2 seconds.
+    FLAGS_application_name = "pi1_message_bridge_server";
+    aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+    MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+    aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
+        [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
+    pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi1_server_event_loop.monotonic_now() +
+                  chrono::milliseconds(2050));
+    });
+
+    // And go!
+    pi1_server_event_loop.Run();
+
+    // Confirm we detect the duplicate packet correctly.
+    EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->duplicate_packets(),
+              1u);
+
+    EXPECT_EQ(ping_timestamp_count, 1);
+    EXPECT_FALSE(ping_fetcher.Fetch());
+    EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+    LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
+  }
+
+  // Shut everyone else down
+  pi1_client_event_loop.Exit();
+  pi2_server_event_loop.Exit();
+  pi2_client_event_loop.Exit();
+  pi1_remote_timestamp_event_loop.Exit();
+  pi1_remote_timestamp_thread.join();
+  pi1_client_thread.join();
+  pi2_server_thread.join();
+  pi2_client_thread.join();
+}
+
 }  // namespace testing
 }  // namespace message_bridge
 }  // namespace aos