Switch magic timestamp channel to RemoteMessage

In order to track reboots and such, we really need to communicate more
information from the message_bridge to the logger.  It is a shame to
have to modify the MessageHeader to do this, even though it would be
nice.  Switch the remote timestamp channels over to a new RemoteMessage
channel instead, and add code to rename the channel when replaying.

There are no known log files with a MessageHeader actually logged, so
most of this should be pretty safe.  I've tested this on an old log file
by hand.

Change-Id: If81b31869b95040151d833d20ec3eb8623ab1cd4
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 4818aa2..13ba42e 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -1,10 +1,23 @@
 load("//aos/seasocks:gen_embedded.bzl", "gen_embedded")
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
 load("//aos:config.bzl", "aos_config")
+load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
 
 package(default_visibility = ["//visibility:public"])
 
 flatbuffer_cc_library(
+    name = "remote_message_fbs",
+    srcs = ["remote_message.fbs"],
+    gen_reflections = 1,
+)
+
+cc_static_flatbuffer(
+    name = "remote_message_schema",
+    function = "aos::message_bridge::RemoteMessageSchema",
+    target = ":remote_message_fbs_reflection_out",
+)
+
+flatbuffer_cc_library(
     name = "connect_fbs",
     srcs = ["connect.fbs"],
     gen_reflections = 1,
@@ -178,6 +191,7 @@
         ":message_bridge_protocol",
         ":message_bridge_server_fbs",
         ":message_bridge_server_status",
+        ":remote_message_fbs",
         ":sctp_lib",
         ":sctp_server",
         ":timestamp_fbs",
@@ -257,6 +271,7 @@
         ":message_bridge_client_status",
         ":message_bridge_protocol",
         ":message_bridge_server_fbs",
+        ":remote_message_fbs",
         ":sctp_client",
         ":timestamp_fbs",
         "//aos/events:shm_event_loop",
@@ -285,7 +300,7 @@
     name = "message_bridge_test_common_config",
     src = "message_bridge_test_common.json",
     flatbuffers = [
-        "//aos/events/logging:logger_fbs",
+        ":remote_message_fbs",
         "//aos/events:ping_fbs",
         "//aos/events:pong_fbs",
         "//aos/network:message_bridge_client_fbs",
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index b6c94e5..d8a632e 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -8,6 +8,7 @@
 #include "aos/network/connect_generated.h"
 #include "aos/network/message_bridge_protocol.h"
 #include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_message_generated.h"
 #include "aos/network/sctp_server.h"
 #include "glog/logging.h"
 
@@ -106,11 +107,11 @@
         // This needs to be munged and cleaned up to match the timestamp
         // standard.
 
-        aos::Sender<logger::MessageHeader>::Builder builder =
+        aos::Sender<RemoteMessage>::Builder builder =
             peer.timestamp_logger->MakeBuilder();
 
-        logger::MessageHeader::Builder message_header_builder =
-            builder.MakeBuilder<logger::MessageHeader>();
+        RemoteMessage::Builder message_header_builder =
+            builder.MakeBuilder<RemoteMessage>();
 
         message_header_builder.add_channel_index(
             message_header->channel_index());
@@ -173,10 +174,10 @@
   // time out eventually.  Need to sort that out.
 }
 
-void ChannelState::AddPeer(
-    const Connection *connection, int node_index,
-    ServerConnection *server_connection_statistics, bool logged_remotely,
-    aos::Sender<logger::MessageHeader> *timestamp_logger) {
+void ChannelState::AddPeer(const Connection *connection, int node_index,
+                           ServerConnection *server_connection_statistics,
+                           bool logged_remotely,
+                           aos::Sender<RemoteMessage> *timestamp_logger) {
   peers_.emplace_back(connection, node_index, server_connection_statistics,
                       logged_remotely, timestamp_logger);
 }
@@ -330,7 +331,7 @@
         // timestamps from it.
         if (delivery_time_is_logged && !timestamp_loggers_[other_node_index]) {
           timestamp_loggers_[other_node_index] =
-              event_loop_->MakeSender<logger::MessageHeader>(
+              event_loop_->MakeSender<RemoteMessage>(
                   absl::StrCat("/aos/remote_timestamps/",
                                connection->name()->string_view()));
         }
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index c0c1d56..04b0e16 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -11,6 +11,7 @@
 #include "aos/network/message_bridge_client_generated.h"
 #include "aos/network/message_bridge_server_generated.h"
 #include "aos/network/message_bridge_server_status.h"
+#include "aos/network/remote_message_generated.h"
 #include "aos/network/sctp_server.h"
 #include "aos/network/timestamp_generated.h"
 #include "glog/logging.h"
@@ -36,7 +37,7 @@
     Peer(const Connection *new_connection, int new_node_index,
          ServerConnection *new_server_connection_statistics,
          bool new_logged_remotely,
-         aos::Sender<logger::MessageHeader> *new_timestamp_logger)
+         aos::Sender<RemoteMessage> *new_timestamp_logger)
         : connection(new_connection),
           node_index(new_node_index),
           server_connection_statistics(new_server_connection_statistics),
@@ -50,7 +51,7 @@
     const aos::Connection *connection;
     const int node_index;
     ServerConnection *server_connection_statistics;
-    aos::Sender<logger::MessageHeader> *timestamp_logger = nullptr;
+    aos::Sender<RemoteMessage> *timestamp_logger = nullptr;
 
     // If true, this message will be logged on a receiving node.  We need to
     // keep it around to log it locally if that fails.
@@ -67,7 +68,7 @@
   void AddPeer(const Connection *connection, int node_index,
                ServerConnection *server_connection_statistics,
                bool logged_remotely,
-               aos::Sender<logger::MessageHeader> *timestamp_logger);
+               aos::Sender<RemoteMessage> *timestamp_logger);
 
   // Returns true if this channel has the same name and type as the other
   // channel.
@@ -125,7 +126,7 @@
   // Event loop to schedule everything on.
   aos::ShmEventLoop *event_loop_;
 
-  std::vector<aos::Sender<logger::MessageHeader>> timestamp_loggers_;
+  std::vector<aos::Sender<RemoteMessage>> timestamp_loggers_;
   SctpServer server_;
 
   MessageBridgeServerStatus server_status_;
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 36bdde9..ed192b7 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -34,18 +34,18 @@
 }
 
 class MessageBridgeTest : public ::testing::Test {
-  public:
-   MessageBridgeTest()
-       : pi1_config(aos::configuration::ReadConfig(
-             "aos/network/message_bridge_test_server_config.json")),
-         pi2_config(aos::configuration::ReadConfig(
-             "aos/network/message_bridge_test_client_config.json")) {
-     util::UnlinkRecursive(ShmBase("pi1"));
-     util::UnlinkRecursive(ShmBase("pi2"));
-   }
+ public:
+  MessageBridgeTest()
+      : pi1_config(aos::configuration::ReadConfig(
+            "aos/network/message_bridge_test_server_config.json")),
+        pi2_config(aos::configuration::ReadConfig(
+            "aos/network/message_bridge_test_client_config.json")) {
+    util::UnlinkRecursive(ShmBase("pi1"));
+    util::UnlinkRecursive(ShmBase("pi2"));
+  }
 
-   aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
-   aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
+  aos::FlatbufferDetachedBuffer<aos::Configuration> pi1_config;
+  aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config;
 };
 
 // Test that we can send a ping message over sctp and receive it.
@@ -89,8 +89,8 @@
       ping_event_loop.MakeSender<examples::Ping>("/test");
 
   aos::ShmEventLoop pi1_test_event_loop(&pi1_config.message());
-  aos::Fetcher<logger::MessageHeader> message_header_fetcher1 =
-      pi1_test_event_loop.MakeFetcher<logger::MessageHeader>(
+  aos::Fetcher<RemoteMessage> message_header_fetcher1 =
+      pi1_test_event_loop.MakeFetcher<RemoteMessage>(
           "/pi1/aos/remote_timestamps/pi2");
 
   // Fetchers for confirming the remote timestamps made it.
@@ -121,8 +121,8 @@
 
   aos::Fetcher<ClientStatistics> client_statistics_fetcher =
       test_event_loop.MakeFetcher<ClientStatistics>("/aos");
-  aos::Fetcher<logger::MessageHeader> message_header_fetcher2 =
-      test_event_loop.MakeFetcher<logger::MessageHeader>(
+  aos::Fetcher<RemoteMessage> message_header_fetcher2 =
+      test_event_loop.MakeFetcher<RemoteMessage>(
           "/pi2/aos/remote_timestamps/pi1");
 
   // Event loop for fetching data delivered to pi2 from pi1 to match up
@@ -282,8 +282,9 @@
       "/pi1/aos/remote_timestamps/pi2",
       [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
        &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
-       &pi1_on_pi1_timestamp_fetcher](const logger::MessageHeader &header) {
-        VLOG(1) << aos::FlatbufferToJson(&header);
+       &pi1_on_pi1_timestamp_fetcher](const RemoteMessage &header) {
+        VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+                << aos::FlatbufferToJson(&header);
 
         const aos::monotonic_clock::time_point header_monotonic_sent_time(
             chrono::nanoseconds(header.monotonic_sent_time()));
@@ -879,9 +880,9 @@
   std::atomic<int> ping_timestamp_count{0};
   pi1_remote_timestamp_event_loop.MakeWatcher(
       "/pi1/aos/remote_timestamps/pi2",
-      [ping_channel_index,
-       &ping_timestamp_count](const logger::MessageHeader &header) {
-        VLOG(1) << aos::FlatbufferToJson(&header);
+      [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
+        VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+                << aos::FlatbufferToJson(&header);
         if (header.channel_index() == ping_channel_index) {
           ++ping_timestamp_count;
         }
@@ -1034,9 +1035,9 @@
   std::atomic<int> ping_timestamp_count{0};
   pi1_remote_timestamp_event_loop.MakeWatcher(
       "/pi1/aos/remote_timestamps/pi2",
-      [ping_channel_index,
-       &ping_timestamp_count](const logger::MessageHeader &header) {
-        VLOG(1) << aos::FlatbufferToJson(&header);
+      [ping_channel_index, &ping_timestamp_count](const RemoteMessage &header) {
+        VLOG(1) << "/pi1/aos/remote_timestamps/pi2 RemoteMessage "
+                << aos::FlatbufferToJson(&header);
         if (header.channel_index() == ping_channel_index) {
           ++ping_timestamp_count;
         }
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index a7c0d1b..2b89080 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -74,13 +74,13 @@
     },
     {
       "name": "/pi1/aos/remote_timestamps/pi2",
-      "type": "aos.logger.MessageHeader",
+      "type": "aos.message_bridge.RemoteMessage",
       "source_node": "pi1",
       "frequency": 10
     },
     {
       "name": "/pi2/aos/remote_timestamps/pi1",
-      "type": "aos.logger.MessageHeader",
+      "type": "aos.message_bridge.RemoteMessage",
       "source_node": "pi2",
       "frequency": 10
     },
diff --git a/aos/network/remote_message.fbs b/aos/network/remote_message.fbs
new file mode 100644
index 0000000..5659704
--- /dev/null
+++ b/aos/network/remote_message.fbs
@@ -0,0 +1,31 @@
+namespace aos.message_bridge;
+
+table RemoteMessage {
+  // Index into the channel datastructure in the log file header.  This
+  // provides the data type.
+  channel_index:uint (id: 0);
+  // Time this message was sent on the monotonic clock in nanoseconds on this
+  // node.
+  monotonic_sent_time:long (id: 1);
+  // Time this message was sent on the realtime clock in nanoseconds on this
+  // node.
+  realtime_sent_time:long (id: 2);
+  // Index into the ipc queue of this message.  This should start with 0 and
+  // always monotonically increment if no messages were ever lost.  It will
+  // wrap at a multiple of the queue size.
+  queue_index:uint (id: 3);
+
+  // The nested flatbuffer.
+  data:[ubyte] (id: 4);
+
+  // Time this message was sent on the monotonic clock of the remote node in
+  // nanoseconds.
+  monotonic_remote_time:int64 = -9223372036854775808 (id: 5);
+  // Time this message was sent on the realtime clock of the remote node in
+  // nanoseconds.
+  realtime_remote_time:int64 = -9223372036854775808 (id: 6);
+  // Queue index of this message on the remote node.
+  remote_queue_index:uint32 = 4294967295 (id: 7);
+}
+
+root_type RemoteMessage;