Don't put reliable timestamps in the unreliable log file header field

James and I debugged a log file sorting bug which turned out to be a
logging bug.  The SystemParameters timestamp file (which we can all
agree is a reliable message) had the delivery time in
oldest_unreliable_remote_monotonic_timestamps, which is only for
unreliable timestamps.

This was because reliability was being tracked for the contents channel,
not the source message.  Instead, we need to do the book-keeping to
track if the source is realiable or not, and update the header
accordingly.  While we don't want to see new combined timestamp logs, we
still need to support them.  We don't mind if they are a bit less
efficient to log.

Change-Id: I292f5c0a64926904e13fb5d712acd523f3849cd4
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 8eaeb73..5931200 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -36,7 +36,33 @@
   timer_handler_->set_name("channel_poll");
   VLOG(1) << "Creating logger for " << FlatbufferToJson(node_);
 
-  std::map<const Channel *, const Node *> timestamp_logger_channels;
+  // When we are logging remote timestamps, we need to be able to translate from
+  // the channel index that the event loop uses to the channel index in the
+  // config in the log file.
+  event_loop_to_logged_channel_index_.resize(
+      event_loop->configuration()->channels()->size(), -1);
+  for (size_t event_loop_channel_index = 0;
+       event_loop_channel_index <
+       event_loop->configuration()->channels()->size();
+       ++event_loop_channel_index) {
+    const Channel *event_loop_channel =
+        event_loop->configuration()->channels()->Get(event_loop_channel_index);
+
+    const Channel *logged_channel = aos::configuration::GetChannel(
+        configuration_, event_loop_channel->name()->string_view(),
+        event_loop_channel->type()->string_view(), "", node_);
+
+    if (logged_channel != nullptr) {
+      event_loop_to_logged_channel_index_[event_loop_channel_index] =
+          configuration::ChannelIndex(configuration_, logged_channel);
+    }
+  }
+
+  // Map to match source channels with the timestamp logger, if the contents
+  // should be reliable, and a list of all channels logged on it to be treated
+  // as reliable.
+  std::map<const Channel *, std::tuple<const Node *, bool, std::vector<bool>>>
+      timestamp_logger_channels;
 
   message_bridge::ChannelTimestampFinder finder(event_loop_);
   for (const Channel *channel : *event_loop_->configuration()->channels()) {
@@ -46,6 +72,9 @@
     if (!channel->has_destination_nodes()) {
       continue;
     }
+    const size_t channel_index =
+        configuration::ChannelIndex(event_loop_->configuration(), channel);
+
     for (const Connection *connection : *channel->destination_nodes()) {
       if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
               connection, event_loop_->node())) {
@@ -54,8 +83,37 @@
 
         VLOG(1) << "Timestamps are logged from "
                 << FlatbufferToJson(other_node);
-        timestamp_logger_channels.insert(
-            std::make_pair(finder.ForChannel(channel, connection), other_node));
+        // True if each channel's remote timestamps are split into a separate
+        // RemoteMessage channel.
+        const bool is_split =
+            finder.SplitChannelForChannel(channel, connection) != nullptr;
+
+        const Channel *const timestamp_logger_channel =
+            finder.ForChannel(channel, connection);
+
+        auto it = timestamp_logger_channels.find(timestamp_logger_channel);
+        if (it != timestamp_logger_channels.end()) {
+          CHECK(!is_split);
+          CHECK_LT(channel_index, std::get<2>(it->second).size());
+          std::get<2>(it->second)[channel_index] = (connection->time_to_live() == 0);
+        } else {
+          if (is_split) {
+            timestamp_logger_channels.insert(std::make_pair(
+                timestamp_logger_channel,
+                std::make_tuple(other_node, (connection->time_to_live() == 0),
+                                std::vector<bool>())));
+          } else {
+            std::vector<bool> channel_reliable_contents(
+                event_loop->configuration()->channels()->size(), false);
+            channel_reliable_contents[channel_index] =
+                (connection->time_to_live() == 0);
+
+            timestamp_logger_channels.insert(std::make_pair(
+                timestamp_logger_channel,
+                std::make_tuple(other_node, false,
+                                std::move(channel_reliable_contents))));
+          }
+        }
       }
     }
   }
@@ -86,8 +144,8 @@
 
     const bool is_readable =
         configuration::ChannelIsReadableOnNode(config_channel, node_);
-    const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
-        config_channel, node_);
+    const bool is_logged =
+        configuration::ChannelMessageIsLoggedOnNode(config_channel, node_);
     const bool log_message = is_logged && is_readable;
 
     bool log_delivery_times = false;
@@ -107,8 +165,8 @@
       }
     }
 
-    // Now, detect a RemoteMessage timestamp logger where we should just log the
-    // contents to a file directly.
+    // Now, detect a RemoteMessage timestamp logger where we should just log
+    // the contents to a file directly.
     const bool log_contents = timestamp_logger_channels.find(channel) !=
                               timestamp_logger_channels.end();
 
@@ -144,7 +202,14 @@
       if (log_contents) {
         VLOG(1) << "Timestamp logger channel "
                 << configuration::CleanedChannelToString(channel);
-        fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
+        auto timestamp_logger_channel_info =
+            timestamp_logger_channels.find(channel);
+        CHECK(timestamp_logger_channel_info != timestamp_logger_channels.end());
+        fs.timestamp_node = std::get<0>(timestamp_logger_channel_info->second);
+        fs.reliable_contents =
+            std::get<1>(timestamp_logger_channel_info->second);
+        fs.channel_reliable_contents =
+            std::get<2>(timestamp_logger_channel_info->second);
         fs.wants_contents_writer = true;
         fs.contents_node_index =
             configuration::GetNodeIndex(configuration_, fs.timestamp_node);
@@ -152,36 +217,14 @@
       fetchers_.emplace_back(std::move(fs));
     }
   }
-
-  // When we are logging remote timestamps, we need to be able to translate from
-  // the channel index that the event loop uses to the channel index in the
-  // config in the log file.
-  event_loop_to_logged_channel_index_.resize(
-      event_loop->configuration()->channels()->size(), -1);
-  for (size_t event_loop_channel_index = 0;
-       event_loop_channel_index <
-       event_loop->configuration()->channels()->size();
-       ++event_loop_channel_index) {
-    const Channel *event_loop_channel =
-        event_loop->configuration()->channels()->Get(event_loop_channel_index);
-
-    const Channel *logged_channel = aos::configuration::GetChannel(
-        configuration_, event_loop_channel->name()->string_view(),
-        event_loop_channel->type()->string_view(), "", node_);
-
-    if (logged_channel != nullptr) {
-      event_loop_to_logged_channel_index_[event_loop_channel_index] =
-          configuration::ChannelIndex(configuration_, logged_channel);
-    }
-  }
 }
 
 Logger::~Logger() {
   if (log_namer_) {
     // If we are replaying a log file, or in simulation, we want to force the
     // last bit of data to be logged.  The easiest way to deal with this is to
-    // poll everything as we go to destroy the class, ie, shut down the logger,
-    // and write it to disk.
+    // poll everything as we go to destroy the class, ie, shut down the
+    // logger, and write it to disk.
     StopLogging(event_loop_->monotonic_now());
   }
 }
@@ -701,13 +744,18 @@
 
         // Start with recording info about the data flowing from our node to the
         // remote.
+        const bool reliable =
+            f.channel_reliable_contents.size() != 0u
+                ? f.channel_reliable_contents[msg->channel_index()]
+                : f.reliable_contents;
+
         f.contents_writer->UpdateRemote(
             node_index_, event_loop_->boot_uuid(),
             monotonic_clock::time_point(
                 chrono::nanoseconds(msg->monotonic_remote_time())),
             monotonic_clock::time_point(
                 chrono::nanoseconds(msg->monotonic_sent_time())),
-            f.reliable_forwarding);
+            reliable);
 
         f.contents_writer->QueueMessage(
             &fbb, UUID::FromVector(msg->boot_uuid()), end);