Switch message_bridge over to RemoteData from MessageHeader

Just like timestamps, we want to add more data here.  Instead of having
to add that to MessageHeader, switch over to a new message.

Change-Id: I171fde6db0212683dde5bfc9ba7a3700e858dbf0
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 789b093..42e115b 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -37,6 +37,13 @@
 )
 
 flatbuffer_cc_library(
+    name = "remote_data_fbs",
+    srcs = ["remote_data.fbs"],
+    gen_reflections = 1,
+    target_compatible_with = ["@platforms//os:linux"],
+)
+
+flatbuffer_cc_library(
     name = "timestamp_fbs",
     srcs = ["timestamp.fbs"],
     gen_reflections = 1,
@@ -206,6 +213,7 @@
         ":message_bridge_protocol",
         ":message_bridge_server_fbs",
         ":message_bridge_server_status",
+        ":remote_data_fbs",
         ":remote_message_fbs",
         ":sctp_lib",
         ":sctp_server",
@@ -287,6 +295,7 @@
         ":message_bridge_client_status",
         ":message_bridge_protocol",
         ":message_bridge_server_fbs",
+        ":remote_data_fbs",
         ":remote_message_fbs",
         ":sctp_client",
         ":timestamp_fbs",
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 97058f1..bde8480 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -8,6 +8,7 @@
 #include "aos/network/connect_generated.h"
 #include "aos/network/message_bridge_client_generated.h"
 #include "aos/network/message_bridge_protocol.h"
+#include "aos/network/remote_data_generated.h"
 #include "aos/network/sctp_client.h"
 #include "aos/network/timestamp_generated.h"
 #include "aos/unique_malloc_ptr.h"
@@ -218,8 +219,8 @@
 }
 
 void SctpClientConnection::HandleData(const Message *message) {
-  const logger::MessageHeader *message_header =
-      flatbuffers::GetSizePrefixedRoot<logger::MessageHeader>(message->data());
+  const RemoteData *remote_data =
+      flatbuffers::GetSizePrefixedRoot<RemoteData>(message->data());
 
   VLOG(1) << "Got a message of size " << message->size;
   CHECK_EQ(message->size, flatbuffers::GetPrefixedSize(message->data()) +
@@ -228,9 +229,9 @@
   const int stream = message->header.rcvinfo.rcv_sid - kControlStreams();
   SctpClientChannelState *channel_state = &((*channels_)[stream_to_channel_[stream]]);
 
-  if (message_header->queue_index() == channel_state->last_queue_index &&
-      aos::monotonic_clock::time_point(
-          chrono::nanoseconds(message_header->monotonic_sent_time())) ==
+  if (remote_data->queue_index() == channel_state->last_queue_index &&
+      monotonic_clock::time_point(
+          chrono::nanoseconds(remote_data->monotonic_sent_time())) ==
           channel_state->last_timestamp) {
     LOG(INFO) << "Duplicate message from " << message->PeerAddress();
     connection_->mutate_duplicate_packets(connection_->duplicate_packets() + 1);
@@ -238,23 +239,23 @@
   } else {
     connection_->mutate_received_packets(connection_->received_packets() + 1);
 
-    channel_state->last_queue_index = message_header->queue_index();
-    channel_state->last_timestamp = aos::monotonic_clock::time_point(
-        chrono::nanoseconds(message_header->monotonic_sent_time()));
+    channel_state->last_queue_index = remote_data->queue_index();
+    channel_state->last_timestamp = monotonic_clock::time_point(
+        chrono::nanoseconds(remote_data->monotonic_sent_time()));
 
     // Publish the message.
     RawSender *sender = channel_state->sender.get();
-    sender->Send(message_header->data()->data(), message_header->data()->size(),
-                 aos::monotonic_clock::time_point(chrono::nanoseconds(
-                     message_header->monotonic_sent_time())),
-                 aos::realtime_clock::time_point(
-                     chrono::nanoseconds(message_header->realtime_sent_time())),
-                 message_header->queue_index());
+    sender->Send(remote_data->data()->data(), remote_data->data()->size(),
+                 monotonic_clock::time_point(
+                     chrono::nanoseconds(remote_data->monotonic_sent_time())),
+                 realtime_clock::time_point(
+                     chrono::nanoseconds(remote_data->realtime_sent_time())),
+                 remote_data->queue_index());
 
     client_status_->SampleFilter(
         client_index_,
-        aos::monotonic_clock::time_point(
-            chrono::nanoseconds(message_header->monotonic_sent_time())),
+        monotonic_clock::time_point(
+            chrono::nanoseconds(remote_data->monotonic_sent_time())),
         sender->monotonic_sent_time());
 
     if (stream_reply_with_timestamp_[stream]) {
@@ -264,13 +265,13 @@
       // Now fill out the message received reply.  This uses a MessageHeader
       // container so it can be directly logged.
       message_reception_reply_.mutable_message()->mutate_channel_index(
-          message_header->channel_index());
+          remote_data->channel_index());
       message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
-          message_header->monotonic_sent_time());
+          remote_data->monotonic_sent_time());
       message_reception_reply_.mutable_message()->mutate_realtime_sent_time(
-          message_header->realtime_sent_time());
+          remote_data->realtime_sent_time());
       message_reception_reply_.mutable_message()->mutate_queue_index(
-          message_header->queue_index());
+          remote_data->queue_index());
 
       // And capture the relevant data needed to generate the forwarding
       // MessageHeader.
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 7328f6f..7347607 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -8,6 +8,7 @@
 #include "aos/network/connect_generated.h"
 #include "aos/network/message_bridge_protocol.h"
 #include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_data_generated.h"
 #include "aos/network/remote_message_generated.h"
 #include "aos/network/sctp_server.h"
 #include "aos/network/timestamp_channel.h"
@@ -34,10 +35,22 @@
           << channel_->name()->string_view() << " "
           << channel_->type()->string_view() << " size " << context.size;
 
+  flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
+      fbb.CreateVector(static_cast<const uint8_t *>(context.data),
+                       context.size);
+
+  RemoteData::Builder remote_data_builder(fbb);
+  remote_data_builder.add_channel_index(channel_index_);
+  remote_data_builder.add_queue_index(context.queue_index);
+  remote_data_builder.add_monotonic_sent_time(
+      context.monotonic_event_time.time_since_epoch().count());
+  remote_data_builder.add_realtime_sent_time(
+      context.realtime_event_time.time_since_epoch().count());
+  remote_data_builder.add_data(data_offset);
+
   // TODO(austin): Use an iovec to build it up in 3 parts to avoid the copy?
   // Only useful when not logging.
-  fbb.FinishSizePrefixed(logger::PackMessage(&fbb, context, channel_index_,
-                                             logger::LogType::kLogMessage));
+  fbb.FinishSizePrefixed(remote_data_builder.Finish());
 
   return fbb;
 }
@@ -115,30 +128,30 @@
         flatbuffers::Offset<flatbuffers::String> boot_uuid_offset =
             server_status.BootUUID(peer.node_index).PackString(builder.fbb());
 
-        RemoteMessage::Builder message_header_builder =
+        RemoteMessage::Builder remote_message_builder =
             builder.MakeBuilder<RemoteMessage>();
 
-        message_header_builder.add_channel_index(
+        remote_message_builder.add_channel_index(
             message_header->channel_index());
 
-        message_header_builder.add_queue_index(
+        remote_message_builder.add_queue_index(
             message_header->remote_queue_index());
-        message_header_builder.add_monotonic_sent_time(
+        remote_message_builder.add_monotonic_sent_time(
             message_header->monotonic_remote_time());
-        message_header_builder.add_realtime_sent_time(
+        remote_message_builder.add_realtime_sent_time(
             message_header->realtime_remote_time());
 
         // Swap the remote and sent metrics.  They are from the sender's
         // perspective, not the receiver's perspective.
-        message_header_builder.add_monotonic_remote_time(
+        remote_message_builder.add_monotonic_remote_time(
             message_header->monotonic_sent_time());
-        message_header_builder.add_realtime_remote_time(
+        remote_message_builder.add_realtime_remote_time(
             message_header->realtime_sent_time());
-        message_header_builder.add_remote_queue_index(
+        remote_message_builder.add_remote_queue_index(
             message_header->queue_index());
-        message_header_builder.add_boot_uuid(boot_uuid_offset);
+        remote_message_builder.add_boot_uuid(boot_uuid_offset);
 
-        builder.Send(message_header_builder.Finish());
+        builder.Send(remote_message_builder.Finish());
       }
       break;
     }
@@ -166,7 +179,7 @@
 }
 
 void ChannelState::HandleFailure(
-    SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message) {
+    SizePrefixedFlatbufferDetachedBuffer<RemoteData> &&message) {
   // TODO(austin): Put it in the log queue.
   if (VLOG_IS_ON(2)) {
     LOG(INFO) << "Failed to send " << FlatbufferToJson(message);
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index 88433d1..be1801c 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -11,6 +11,7 @@
 #include "aos/network/message_bridge_client_generated.h"
 #include "aos/network/message_bridge_server_generated.h"
 #include "aos/network/message_bridge_server_status.h"
+#include "aos/network/remote_data_generated.h"
 #include "aos/network/remote_message_generated.h"
 #include "aos/network/sctp_server.h"
 #include "aos/network/timestamp_channel.h"
@@ -88,7 +89,7 @@
 
   // Handles (by consuming) failure to deliver a message.
   void HandleFailure(
-      SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message);
+      SizePrefixedFlatbufferDetachedBuffer<RemoteData> &&message);
 
  private:
   const int channel_index_;
@@ -96,8 +97,7 @@
 
   std::vector<Peer> peers_;
 
-  std::deque<SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader>>
-      sent_messages_;
+  std::deque<SizePrefixedFlatbufferDetachedBuffer<RemoteData>> sent_messages_;
 
   // A fetcher to use to send the last message when a node connects and is
   // reliable.
diff --git a/aos/network/remote_data.fbs b/aos/network/remote_data.fbs
new file mode 100644
index 0000000..0813e57
--- /dev/null
+++ b/aos/network/remote_data.fbs
@@ -0,0 +1,25 @@
+namespace aos.message_bridge;
+
+table RemoteData {
+  // Index into the channel datastructure in the log file header.  This
+  // provides the data type.
+  channel_index:uint (id: 0);
+  // Time this message was sent on the monotonic clock in nanoseconds on this
+  // node.
+  monotonic_sent_time:long (id: 1);
+  // Time this message was sent on the realtime clock in nanoseconds on this
+  // node.
+  realtime_sent_time:long (id: 2);
+  // Index into the ipc queue of this message.  This should start with 0 and
+  // always monotonically increment if no messages were ever lost.  It will
+  // wrap at a multiple of the queue size.
+  queue_index:uint (id: 3);
+
+  // The nested flatbuffer.
+  data:[ubyte] (id: 4);
+
+  // UUID for this boot.  This is 16 bytes long.
+  boot_uuid:[uint8] (id: 5);
+}
+
+root_type RemoteData;