Add multi-node local logging to the logger

This is not yet able to forward messages, but is able to log messages
that have been forwarded.  Create a log file and test that the
timestamps are getting recorded correctly.

Change-Id: Ica891dbc560543546f6ee594438cebb03672190e
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e35fe5b..6eae9e9 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -18,6 +18,9 @@
 
 DEFINE_int32(flush_size, 1000000,
              "Number of outstanding bytes to allow before flushing to disk.");
+DEFINE_bool(skip_missing_forwarding_entries, false,
+            "If true, drop any forwarding entries with missing data.  If "
+            "false, CHECK.");
 
 namespace aos {
 namespace logger {
@@ -86,7 +89,44 @@
       polling_period_(polling_period) {
   for (const Channel *channel : *event_loop_->configuration()->channels()) {
     FetcherStruct fs;
-    fs.fetcher = event_loop->MakeRawFetcher(channel);
+    const bool is_readable =
+        configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
+    const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
+                                 channel, event_loop_->node()) &&
+                             is_readable;
+
+    const bool log_delivery_times =
+        (event_loop_->node() == nullptr)
+            ? false
+            : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+                  channel, event_loop_->node(), event_loop_->node());
+
+    if (log_message || log_delivery_times) {
+      fs.fetcher = event_loop->MakeRawFetcher(channel);
+      VLOG(1) << "Logging channel "
+              << configuration::CleanedChannelToString(channel);
+
+      if (log_delivery_times) {
+        if (log_message) {
+          VLOG(1) << "  Logging message and delivery times";
+          fs.log_type = LogType::kLogMessageAndDeliveryTime;
+        } else {
+          VLOG(1) << "  Logging delivery times only";
+          fs.log_type = LogType::kLogDeliveryTimeOnly;
+        }
+      } else {
+        // We don't have a particularly great use case right now for logging a
+        // forwarded message, but either not logging the delivery times, or
+        // logging them on another node.  Fail rather than produce bad results.
+        CHECK(configuration::ChannelIsSendableOnNode(channel,
+                                                     event_loop_->node()))
+            << ": Logger only knows how to log remote messages with "
+               "forwarding timestamps.";
+        VLOG(1) << "  Logging message only";
+        fs.log_type = LogType::kLogMessage;
+      }
+    }
+
     fs.written = false;
     fetchers_.emplace_back(std::move(fs));
   }
@@ -99,7 +139,9 @@
     // so we can capture the latest message on each channel.  This lets us have
     // non periodic messages with configuration that now get logged.
     for (FetcherStruct &f : fetchers_) {
-      f.written = !f.fetcher->Fetch();
+      if (f.fetcher.get() != nullptr) {
+        f.written = !f.fetcher->Fetch();
+      }
     }
 
     // We need to pick a point in time to declare the log file "started".  This
@@ -122,10 +164,16 @@
       flatbuffers::Offset<flatbuffers::String> string_offset =
           fbb.CreateString(network::GetHostname());
 
+      flatbuffers::Offset<Node> node_offset =
+          CopyFlatBuffer(event_loop_->node(), &fbb);
+      LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
+
       aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
 
       log_file_header_builder.add_name(string_offset);
 
+      log_file_header_builder.add_node(node_offset);
+
       log_file_header_builder.add_configuration(configuration_offset);
       // The worst case theoretical out of order is the polling period times 2.
       // One message could get logged right after the boundary, but be for right
@@ -157,20 +205,46 @@
 
 flatbuffers::Offset<MessageHeader> PackMessage(
     flatbuffers::FlatBufferBuilder *fbb, const Context &context,
-    int channel_index) {
-  flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
-      fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
+    int channel_index, LogType log_type) {
+  flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
+
+  switch(log_type) {
+    case LogType::kLogMessage:
+    case LogType::kLogMessageAndDeliveryTime:
+      data_offset =
+          fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
+      break;
+
+    case LogType::kLogDeliveryTimeOnly:
+      break;
+  }
 
   MessageHeader::Builder message_header_builder(*fbb);
   message_header_builder.add_channel_index(channel_index);
+  message_header_builder.add_queue_index(context.queue_index);
   message_header_builder.add_monotonic_sent_time(
       context.monotonic_event_time.time_since_epoch().count());
   message_header_builder.add_realtime_sent_time(
       context.realtime_event_time.time_since_epoch().count());
 
-  message_header_builder.add_queue_index(context.queue_index);
+  switch (log_type) {
+    case LogType::kLogMessage:
+      message_header_builder.add_data(data_offset);
+      break;
 
-  message_header_builder.add_data(data_offset);
+    case LogType::kLogMessageAndDeliveryTime:
+      message_header_builder.add_data(data_offset);
+      [[fallthrough]];
+
+    case LogType::kLogDeliveryTimeOnly:
+      message_header_builder.add_monotonic_remote_time(
+          context.monotonic_remote_time.time_since_epoch().count());
+      message_header_builder.add_realtime_remote_time(
+          context.realtime_remote_time.time_since_epoch().count());
+      message_header_builder.add_remote_queue_index(context.remote_queue_index);
+      break;
+  }
+
   return message_header_builder.Finish();
 }
 
@@ -188,51 +262,46 @@
     size_t channel_index = 0;
     // Write each channel to disk, one at a time.
     for (FetcherStruct &f : fetchers_) {
-      while (true) {
-        if (f.fetcher.get() == nullptr) {
-          if (!f.fetcher->FetchNext()) {
-            VLOG(1) << "No new data on "
-                    << FlatbufferToJson(f.fetcher->channel());
-            break;
-          } else {
-            f.written = false;
+      // Skip any channels which we aren't supposed to log.
+      if (f.fetcher.get() != nullptr) {
+        while (true) {
+          if (f.written) {
+            if (!f.fetcher->FetchNext()) {
+              VLOG(2) << "No new data on "
+                      << configuration::CleanedChannelToString(
+                             f.fetcher->channel());
+              break;
+            } else {
+              f.written = false;
+            }
           }
-        }
 
-        if (f.written) {
-          if (!f.fetcher->FetchNext()) {
-            VLOG(1) << "No new data on "
-                    << FlatbufferToJson(f.fetcher->channel());
-            break;
+          CHECK(!f.written);
+
+          // TODO(james): Write tests to exercise this logic.
+          if (f.fetcher->context().monotonic_event_time <
+              last_synchronized_time_) {
+            // Write!
+            flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+                                               max_header_size_);
+            fbb.ForceDefaults(1);
+
+            fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                               channel_index, f.log_type));
+
+            VLOG(2) << "Writing data for channel "
+                    << configuration::CleanedChannelToString(
+                           f.fetcher->channel());
+
+            max_header_size_ = std::max(
+                max_header_size_, fbb.GetSize() - f.fetcher->context().size);
+            writer_->QueueSizedFlatbuffer(&fbb);
+
+            f.written = true;
           } else {
-            f.written = false;
+            break;
           }
         }
-
-        CHECK(!f.written);
-
-        // TODO(james): Write tests to exercise this logic.
-        if (f.fetcher->context().monotonic_event_time <
-            last_synchronized_time_) {
-          // Write!
-          flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
-                                             max_header_size_);
-          fbb.ForceDefaults(1);
-
-          fbb.FinishSizePrefixed(
-              PackMessage(&fbb, f.fetcher->context(), channel_index));
-
-          VLOG(1) << "Writing data for channel "
-                  << FlatbufferToJson(f.fetcher->channel());
-
-          max_header_size_ = std::max(
-              max_header_size_, fbb.GetSize() - f.fetcher->context().size);
-          writer_->QueueSizedFlatbuffer(&fbb);
-
-          f.written = true;
-        } else {
-          break;
-        }
       }
 
       ++channel_index;
@@ -373,11 +442,20 @@
   queue_data_time_ = newest_timestamp_ - max_out_of_order_duration_;
 }
 
-const Configuration *LogReader::configuration() {
+const Configuration *LogReader::configuration() const {
   return flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
       ->configuration();
 }
 
+const Node *LogReader::node() const {
+  return configuration::GetNode(
+      configuration(),
+      flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
+          ->node()
+          ->name()
+          ->string_view());
+}
+
 monotonic_clock::time_point LogReader::monotonic_start_time() {
   return monotonic_clock::time_point(std::chrono::nanoseconds(
       flatbuffers::GetSizePrefixedRoot<LogFileHeader>(configuration_.data())
@@ -434,20 +512,32 @@
 
     FlatbufferVector<MessageHeader> front = std::move(channel.front());
 
-    CHECK(front.message().data() != nullptr);
+    if (oldest_channel_index.first > monotonic_start_time() ||
+        event_loop_factory_ != nullptr) {
+      if (!FLAGS_skip_missing_forwarding_entries ||
+          front.message().data() != nullptr) {
+        CHECK(front.message().data() != nullptr)
+            << ": Got a message without data.  Forwarding entry which was not "
+               "matched?  Use --skip_missing_forwarding_entries to ignore "
+               "this.";
 
-    if (oldest_channel_index.first > monotonic_start_time()) {
-      // If we have access to the factory, use it to fix the realtime time.
-      if (event_loop_factory_ != nullptr) {
-        event_loop_factory_->SetRealtimeOffset(
+        // If we have access to the factory, use it to fix the realtime time.
+        if (event_loop_factory_ != nullptr) {
+          event_loop_factory_->SetRealtimeOffset(
+              monotonic_clock::time_point(
+                  chrono::nanoseconds(front.message().monotonic_sent_time())),
+              realtime_clock::time_point(
+                  chrono::nanoseconds(front.message().realtime_sent_time())));
+        }
+
+        channel.raw_sender->Send(
+            front.message().data()->Data(), front.message().data()->size(),
             monotonic_clock::time_point(
-                chrono::nanoseconds(front.message().monotonic_sent_time())),
+                chrono::nanoseconds(front.message().monotonic_remote_time())),
             realtime_clock::time_point(
-                chrono::nanoseconds(front.message().realtime_sent_time())));
+                chrono::nanoseconds(front.message().realtime_remote_time())),
+            front.message().remote_queue_index());
       }
-
-      channel.raw_sender->Send(front.message().data()->Data(),
-                               front.message().data()->size());
     } else {
       LOG(WARNING) << "Not sending data from before the start of the log file. "
                    << oldest_channel_index.first.time_since_epoch().count()