Fix log sorting for good

Unfortunately, this is really hard to split up, so a bunch of stuff
happens at once.

When a message is sent from node A -> node B, add support for sending
that timestamp back from node B to node A for logging.

    {
      "name": "/pi1/aos",
      "type": "aos.message_bridge.Timestamp",
      "source_node": "pi1",
      "frequency": 10,
      "max_size": 200,
      "destination_nodes": [
        {
          "name": "pi2",
          "priority": 1,
          "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
          "timestamp_logger_nodes": ["pi1"]
        }
      ]
    },

This gives us a way to log enough information on node A such that
everything is self contained.  We know all the messages we sent to B,
and when they got there, so we can recreate the time offset and replay
the node.

This data is then published over
   { "name": "/aos/remote_timestamps/pi2", "type": ".aos.logger.MessageHeader"}

The logger then treats that channel specially and log the contents
directly as though the message contents were received on the remote
node.

This (among other things) exposes log sorting problems.  Use our fancy
new infinite precision noncausal filter to estimate time precise enough
to actually order events.  This gets us down to 2-3 ns of error due to
integer precision.

Change-Id: Ia843c5176a2c4efc227e669c07d7bb4c7cbe7c91
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 8d52292..aa0a034 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -3,6 +3,7 @@
 #include <chrono>
 #include <thread>
 
+#include "absl/strings/str_cat.h"
 #include "aos/events/ping_generated.h"
 #include "aos/events/pong_generated.h"
 #include "aos/network/message_bridge_client_lib.h"
@@ -10,11 +11,22 @@
 #include "aos/network/team_number.h"
 
 namespace aos {
+void SetShmBase(const std::string_view base);
+
 namespace message_bridge {
 namespace testing {
 
 namespace chrono = std::chrono;
 
+void DoSetShmBase(const std::string_view node) {
+  const char *tmpdir_c_str = getenv("TEST_TMPDIR");
+  if (tmpdir_c_str != nullptr) {
+    aos::SetShmBase(absl::StrCat(tmpdir_c_str, "/", node));
+  } else {
+    aos::SetShmBase(absl::StrCat("/dev/shm/", node));
+  }
+}
+
 // Test that we can send a ping message over sctp and receive it.
 TEST(MessageBridgeTest, PingPong) {
   // This is rather annoying to set up.  We need to start up a client and
@@ -44,9 +56,11 @@
       aos::configuration::ReadConfig(
           "aos/network/message_bridge_test_client_config.json");
 
+  DoSetShmBase("pi1");
   FLAGS_application_name = "pi1_message_bridge_server";
   // Force ourselves to be "raspberrypi" and allocate everything.
   FLAGS_override_hostname = "raspberrypi";
+
   aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
   MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
 
@@ -60,9 +74,22 @@
   aos::Sender<examples::Ping> ping_sender =
       ping_event_loop.MakeSender<examples::Ping>("/test");
 
+  aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+  aos::Fetcher<logger::MessageHeader> message_header_fetcher1 =
+      pi1_test_event_loop.MakeFetcher<logger::MessageHeader>(
+          "/pi1/aos/remote_timestamps/pi2");
+
+  // Fetchers for confirming the remote timestamps made it.
+  aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+      ping_event_loop.MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<Timestamp> pi1_on_pi1_timestamp_fetcher =
+      ping_event_loop.MakeFetcher<Timestamp>("/aos");
+
   // Now do it for "raspberrypi2", the client.
   FLAGS_application_name = "pi2_message_bridge_client";
   FLAGS_override_hostname = "raspberrypi2";
+  DoSetShmBase("pi2");
+
   aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
   MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
 
@@ -80,11 +107,24 @@
 
   aos::Fetcher<ClientStatistics> client_statistics_fetcher =
       test_event_loop.MakeFetcher<ClientStatistics>("/aos");
+  aos::Fetcher<logger::MessageHeader> message_header_fetcher2 =
+      test_event_loop.MakeFetcher<logger::MessageHeader>(
+          "/pi2/aos/remote_timestamps/pi1");
+
+  // Event loop for fetching data delivered to pi2 from pi1 to match up
+  // messages.
+  aos::ShmEventLoop delivered_messages_event_loop(&pi2_config.message());
+  aos::Fetcher<Timestamp> pi1_on_pi2_timestamp_fetcher =
+      delivered_messages_event_loop.MakeFetcher<Timestamp>("/pi1/aos");
+  aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+      delivered_messages_event_loop.MakeFetcher<examples::Ping>("/test");
+  EXPECT_FALSE(ping_on_pi2_fetcher.Fetch());
+  EXPECT_FALSE(pi1_on_pi2_timestamp_fetcher.Fetch());
 
   // Count the pongs.
   int pong_count = 0;
   pong_event_loop.MakeWatcher(
-      "/test2", [&pong_count](const examples::Ping &ping) {
+      "/test", [&pong_count](const examples::Ping &ping) {
         ++pong_count;
         LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
       });
@@ -191,11 +231,11 @@
 
   ping_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
     EXPECT_TRUE(timestamp.has_offsets());
-    LOG(INFO) << FlatbufferToJson(&timestamp);
+    LOG(INFO) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
   });
   pong_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
     EXPECT_TRUE(timestamp.has_offsets());
-    LOG(INFO) << FlatbufferToJson(&timestamp);
+    LOG(INFO) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
   });
 
   // Run for 5 seconds to make sure we have time to estimate the offset.
@@ -206,6 +246,96 @@
     quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
   });
 
+  // Find the channel index for both the /pi1/aos Timestamp channel and Ping
+  // channel.
+  const size_t pi1_timestamp_channel = configuration::ChannelIndex(
+      pong_event_loop.configuration(), pi1_on_pi2_timestamp_fetcher.channel());
+  const size_t ping_timestamp_channel =
+      configuration::ChannelIndex(delivered_messages_event_loop.configuration(),
+                                  ping_on_pi2_fetcher.channel());
+
+  for (const Channel *channel : *ping_event_loop.configuration()->channels()) {
+    VLOG(1) << "Channel "
+            << configuration::ChannelIndex(ping_event_loop.configuration(),
+                                           channel)
+            << " " << configuration::CleanedChannelToString(channel);
+  }
+
+  // For each remote timestamp we get back, confirm that it is either a ping
+  // message, or a timestamp we sent out.  Also confirm that the timestamps are
+  // correct.
+  ping_event_loop.MakeWatcher(
+      "/pi1/aos/remote_timestamps/pi2",
+      [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
+       &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
+       &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
+        VLOG(1) << aos::FlatbufferToJson(&header);
+
+        const aos::monotonic_clock::time_point header_monotonic_sent_time(
+            chrono::nanoseconds(header.monotonic_sent_time()));
+        const aos::realtime_clock::time_point header_realtime_sent_time(
+            chrono::nanoseconds(header.realtime_sent_time()));
+        const aos::monotonic_clock::time_point header_monotonic_remote_time(
+            chrono::nanoseconds(header.monotonic_remote_time()));
+        const aos::realtime_clock::time_point header_realtime_remote_time(
+            chrono::nanoseconds(header.realtime_remote_time()));
+
+        const Context *pi1_context = nullptr;
+        const Context *pi2_context = nullptr;
+
+        if (header.channel_index() == pi1_timestamp_channel) {
+          // Find the forwarded message.
+          while (pi1_on_pi2_timestamp_fetcher.context().monotonic_event_time <
+                 header_monotonic_sent_time) {
+            ASSERT_TRUE(pi1_on_pi2_timestamp_fetcher.FetchNext());
+          }
+
+          // And the source message.
+          while (pi1_on_pi1_timestamp_fetcher.context().monotonic_event_time <
+                 header_monotonic_remote_time) {
+            ASSERT_TRUE(pi1_on_pi1_timestamp_fetcher.FetchNext());
+          }
+
+          pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
+          pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+        } else if (header.channel_index() == ping_timestamp_channel) {
+          // Find the forwarded message.
+          while (ping_on_pi2_fetcher.context().monotonic_event_time <
+                 header_monotonic_sent_time) {
+            ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+          }
+
+          // And the source message.
+          while (ping_on_pi1_fetcher.context().monotonic_event_time <
+                 header_monotonic_remote_time) {
+            ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+          }
+
+          pi1_context = &ping_on_pi1_fetcher.context();
+          pi2_context = &ping_on_pi2_fetcher.context();
+        } else {
+          LOG(FATAL) << "Unknown channel";
+        }
+
+        // Confirm the forwarded message has matching timestamps to the
+        // timestamps we got back.
+        EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+        EXPECT_EQ(pi2_context->monotonic_event_time,
+                  header_monotonic_sent_time);
+        EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
+        EXPECT_EQ(pi2_context->realtime_remote_time,
+                  header_realtime_remote_time);
+        EXPECT_EQ(pi2_context->monotonic_remote_time,
+                  header_monotonic_remote_time);
+
+        // Confirm the forwarded message also matches the source message.
+        EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+        EXPECT_EQ(pi1_context->monotonic_event_time,
+                  header_monotonic_remote_time);
+        EXPECT_EQ(pi1_context->realtime_event_time,
+                  header_realtime_remote_time);
+      });
+
   // Start everything up.  Pong is the only thing we don't know how to wait on,
   // so start it first.
   std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
@@ -261,6 +391,10 @@
   EXPECT_GE(pi2_server_statistics_count, 2);
   EXPECT_GE(pi1_client_statistics_count, 2);
   EXPECT_GE(pi2_client_statistics_count, 2);
+
+  // Confirm we got timestamps back!
+  EXPECT_TRUE(message_header_fetcher1.Fetch());
+  EXPECT_TRUE(message_header_fetcher2.Fetch());
 }
 
 // Test that the client disconnecting triggers the server offsets on both sides
@@ -290,6 +424,7 @@
   FLAGS_application_name = "pi1_message_bridge_server";
   // Force ourselves to be "raspberrypi" and allocate everything.
   FLAGS_override_hostname = "raspberrypi";
+  DoSetShmBase("pi1");
   aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
   MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
 
@@ -305,6 +440,7 @@
 
   // Now do it for "raspberrypi2", the client.
   FLAGS_override_hostname = "raspberrypi2";
+  DoSetShmBase("pi2");
   FLAGS_application_name = "pi2_message_bridge_server";
   aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
   MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
@@ -496,6 +632,7 @@
   FLAGS_application_name = "pi1_message_bridge_server";
   // Force ourselves to be "raspberrypi" and allocate everything.
   FLAGS_override_hostname = "raspberrypi";
+  DoSetShmBase("pi1");
   aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
   MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
 
@@ -513,6 +650,7 @@
 
   // Now do it for "raspberrypi2", the client.
   FLAGS_override_hostname = "raspberrypi2";
+  DoSetShmBase("pi2");
   FLAGS_application_name = "pi2_message_bridge_client";
   aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
   MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);