Don't put reliable timestamps in the unreliable log file header field

James and I debugged a log file sorting bug which turned out to be a
logging bug.  The SystemParameters timestamp file (which we can all
agree is a reliable message) had the delivery time in
oldest_unreliable_remote_monotonic_timestamps, which is only for
unreliable timestamps.

This was because reliability was being tracked for the contents channel,
not the source message.  Instead, we need to do the book-keeping to
track if the source is realiable or not, and update the header
accordingly.  While we don't want to see new combined timestamp logs, we
still need to support them.  We don't mind if they are a bit less
efficient to log.

Change-Id: I292f5c0a64926904e13fb5d712acd523f3849cd4
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 8eaeb73..5931200 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -36,7 +36,33 @@
   timer_handler_->set_name("channel_poll");
   VLOG(1) << "Creating logger for " << FlatbufferToJson(node_);
 
-  std::map<const Channel *, const Node *> timestamp_logger_channels;
+  // When we are logging remote timestamps, we need to be able to translate from
+  // the channel index that the event loop uses to the channel index in the
+  // config in the log file.
+  event_loop_to_logged_channel_index_.resize(
+      event_loop->configuration()->channels()->size(), -1);
+  for (size_t event_loop_channel_index = 0;
+       event_loop_channel_index <
+       event_loop->configuration()->channels()->size();
+       ++event_loop_channel_index) {
+    const Channel *event_loop_channel =
+        event_loop->configuration()->channels()->Get(event_loop_channel_index);
+
+    const Channel *logged_channel = aos::configuration::GetChannel(
+        configuration_, event_loop_channel->name()->string_view(),
+        event_loop_channel->type()->string_view(), "", node_);
+
+    if (logged_channel != nullptr) {
+      event_loop_to_logged_channel_index_[event_loop_channel_index] =
+          configuration::ChannelIndex(configuration_, logged_channel);
+    }
+  }
+
+  // Map to match source channels with the timestamp logger, if the contents
+  // should be reliable, and a list of all channels logged on it to be treated
+  // as reliable.
+  std::map<const Channel *, std::tuple<const Node *, bool, std::vector<bool>>>
+      timestamp_logger_channels;
 
   message_bridge::ChannelTimestampFinder finder(event_loop_);
   for (const Channel *channel : *event_loop_->configuration()->channels()) {
@@ -46,6 +72,9 @@
     if (!channel->has_destination_nodes()) {
       continue;
     }
+    const size_t channel_index =
+        configuration::ChannelIndex(event_loop_->configuration(), channel);
+
     for (const Connection *connection : *channel->destination_nodes()) {
       if (configuration::ConnectionDeliveryTimeIsLoggedOnNode(
               connection, event_loop_->node())) {
@@ -54,8 +83,37 @@
 
         VLOG(1) << "Timestamps are logged from "
                 << FlatbufferToJson(other_node);
-        timestamp_logger_channels.insert(
-            std::make_pair(finder.ForChannel(channel, connection), other_node));
+        // True if each channel's remote timestamps are split into a separate
+        // RemoteMessage channel.
+        const bool is_split =
+            finder.SplitChannelForChannel(channel, connection) != nullptr;
+
+        const Channel *const timestamp_logger_channel =
+            finder.ForChannel(channel, connection);
+
+        auto it = timestamp_logger_channels.find(timestamp_logger_channel);
+        if (it != timestamp_logger_channels.end()) {
+          CHECK(!is_split);
+          CHECK_LT(channel_index, std::get<2>(it->second).size());
+          std::get<2>(it->second)[channel_index] = (connection->time_to_live() == 0);
+        } else {
+          if (is_split) {
+            timestamp_logger_channels.insert(std::make_pair(
+                timestamp_logger_channel,
+                std::make_tuple(other_node, (connection->time_to_live() == 0),
+                                std::vector<bool>())));
+          } else {
+            std::vector<bool> channel_reliable_contents(
+                event_loop->configuration()->channels()->size(), false);
+            channel_reliable_contents[channel_index] =
+                (connection->time_to_live() == 0);
+
+            timestamp_logger_channels.insert(std::make_pair(
+                timestamp_logger_channel,
+                std::make_tuple(other_node, false,
+                                std::move(channel_reliable_contents))));
+          }
+        }
       }
     }
   }
@@ -86,8 +144,8 @@
 
     const bool is_readable =
         configuration::ChannelIsReadableOnNode(config_channel, node_);
-    const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
-        config_channel, node_);
+    const bool is_logged =
+        configuration::ChannelMessageIsLoggedOnNode(config_channel, node_);
     const bool log_message = is_logged && is_readable;
 
     bool log_delivery_times = false;
@@ -107,8 +165,8 @@
       }
     }
 
-    // Now, detect a RemoteMessage timestamp logger where we should just log the
-    // contents to a file directly.
+    // Now, detect a RemoteMessage timestamp logger where we should just log
+    // the contents to a file directly.
     const bool log_contents = timestamp_logger_channels.find(channel) !=
                               timestamp_logger_channels.end();
 
@@ -144,7 +202,14 @@
       if (log_contents) {
         VLOG(1) << "Timestamp logger channel "
                 << configuration::CleanedChannelToString(channel);
-        fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
+        auto timestamp_logger_channel_info =
+            timestamp_logger_channels.find(channel);
+        CHECK(timestamp_logger_channel_info != timestamp_logger_channels.end());
+        fs.timestamp_node = std::get<0>(timestamp_logger_channel_info->second);
+        fs.reliable_contents =
+            std::get<1>(timestamp_logger_channel_info->second);
+        fs.channel_reliable_contents =
+            std::get<2>(timestamp_logger_channel_info->second);
         fs.wants_contents_writer = true;
         fs.contents_node_index =
             configuration::GetNodeIndex(configuration_, fs.timestamp_node);
@@ -152,36 +217,14 @@
       fetchers_.emplace_back(std::move(fs));
     }
   }
-
-  // When we are logging remote timestamps, we need to be able to translate from
-  // the channel index that the event loop uses to the channel index in the
-  // config in the log file.
-  event_loop_to_logged_channel_index_.resize(
-      event_loop->configuration()->channels()->size(), -1);
-  for (size_t event_loop_channel_index = 0;
-       event_loop_channel_index <
-       event_loop->configuration()->channels()->size();
-       ++event_loop_channel_index) {
-    const Channel *event_loop_channel =
-        event_loop->configuration()->channels()->Get(event_loop_channel_index);
-
-    const Channel *logged_channel = aos::configuration::GetChannel(
-        configuration_, event_loop_channel->name()->string_view(),
-        event_loop_channel->type()->string_view(), "", node_);
-
-    if (logged_channel != nullptr) {
-      event_loop_to_logged_channel_index_[event_loop_channel_index] =
-          configuration::ChannelIndex(configuration_, logged_channel);
-    }
-  }
 }
 
 Logger::~Logger() {
   if (log_namer_) {
     // If we are replaying a log file, or in simulation, we want to force the
     // last bit of data to be logged.  The easiest way to deal with this is to
-    // poll everything as we go to destroy the class, ie, shut down the logger,
-    // and write it to disk.
+    // poll everything as we go to destroy the class, ie, shut down the
+    // logger, and write it to disk.
     StopLogging(event_loop_->monotonic_now());
   }
 }
@@ -701,13 +744,18 @@
 
         // Start with recording info about the data flowing from our node to the
         // remote.
+        const bool reliable =
+            f.channel_reliable_contents.size() != 0u
+                ? f.channel_reliable_contents[msg->channel_index()]
+                : f.reliable_contents;
+
         f.contents_writer->UpdateRemote(
             node_index_, event_loop_->boot_uuid(),
             monotonic_clock::time_point(
                 chrono::nanoseconds(msg->monotonic_remote_time())),
             monotonic_clock::time_point(
                 chrono::nanoseconds(msg->monotonic_sent_time())),
-            f.reliable_forwarding);
+            reliable);
 
         f.contents_writer->QueueMessage(
             &fbb, UUID::FromVector(msg->boot_uuid()), end);
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index 5c8b0c7..6b04f5d 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -199,6 +199,17 @@
 
     // If true, this message is being sent over a reliable channel.
     bool reliable_forwarding = false;
+
+    // One of the following will be populated.  If channel_reliable_contents is
+    // non zero size, it contains a mapping from the event loop channel (not the
+    // logged channel) to a bool telling us if that particular channel is
+    // reliable.
+    //
+    // If channel_reliable_contents is empty, reliable_contents will contain the
+    // same info for all contents logged here.  This is the predominant case for
+    // split timestamp channels (the prefered approach).
+    bool reliable_contents = false;
+    std::vector<bool> channel_reliable_contents;
   };
 
   // Vector mapping from the channel index from the event loop to the logged
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 5000ff9..30de0b5 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -836,24 +836,17 @@
               VLOG(1)
                   << "Unreliable remote time "
                   << next_boot_time.oldest_remote_unreliable_monotonic_timestamp
-                  << " remote " << boot_time_list.first << " local "
-                  << source_boot_uuid;
-              VLOG(1)
-                  << "Unreliable local time "
+                  << " remote " << boot_time_list.first << " -> local time "
                   << next_boot_time.oldest_local_unreliable_monotonic_timestamp
-                  << " remote " << boot_time_list.first << " local "
-                  << source_boot_uuid;
+                  << " local " << source_boot_uuid;
             }
             if (next_boot_time.oldest_local_monotonic_timestamp !=
                 aos::monotonic_clock::max_time) {
               VLOG(1) << "Reliable remote time "
                       << next_boot_time.oldest_remote_monotonic_timestamp
-                      << " remote " << boot_time_list.first << " local "
-                      << source_boot_uuid;
-              VLOG(1) << "Reliable local time "
+                      << " remote " << boot_time_list.first << " -> local time "
                       << next_boot_time.oldest_local_monotonic_timestamp
-                      << " remote " << boot_time_list.first << " local "
-                      << source_boot_uuid;
+                      << " local " << source_boot_uuid;
             }
             // If we found an existing entry, update the min to be the min of
             // all records.  This lets us combine info from multiple part files.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 55bf687..081d3c4 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -510,8 +510,14 @@
 
   ~LoggerState() {
     if (logger) {
-      for (const std::string &file : log_namer->all_filenames()) {
+      std::vector<std::string> filenames;
+      AppendAllFilenames(&filenames);
+      for (const std::string &file : filenames) {
         LOG(INFO) << "Wrote to " << file;
+        auto x = ReadHeader(file);
+        if (x) {
+          VLOG(1) << aos::FlatbufferToJson(x.value());
+        }
       }
     }
   }
@@ -2384,28 +2390,62 @@
         log_header->message().source_node_boot_uuid()->string_view());
 
     if (log_header->message().node()->name()->string_view() != "pi1") {
-      switch (log_header->message().parts_index()) {
-        case 0:
-          EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
-          EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
-          break;
-        case 1:
-          EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
-          ASSERT_EQ(monotonic_start_time,
-                    monotonic_clock::epoch() + chrono::seconds(1));
-          break;
-        case 2:
-          EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
-          EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
-          break;
-        case 3:
-          EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
-          ASSERT_EQ(monotonic_start_time,
-                    monotonic_clock::epoch() + chrono::nanoseconds(2322999462));
-          break;
-        default:
-          FAIL();
-          break;
+      // The remote message channel should rotate later and have more parts.
+      // This only is true on the log files with shared remote messages.
+      //
+      // TODO(austin): I'm not the most thrilled with this test pattern...  It
+      // feels brittle in a different way.
+      if (file.find("aos.message_bridge.RemoteMessage") == std::string::npos || !shared()) {
+        switch (log_header->message().parts_index()) {
+          case 0:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+            EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+            break;
+          case 1:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+            ASSERT_EQ(monotonic_start_time,
+                      monotonic_clock::epoch() + chrono::seconds(1));
+            break;
+          case 2:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+            EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
+            break;
+          case 3:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+            ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
+                                                chrono::nanoseconds(2322999462))
+                << " on " << file;
+            break;
+          default:
+            FAIL();
+            break;
+        }
+      } else {
+        switch (log_header->message().parts_index()) {
+          case 0:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+            EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+            break;
+          case 1:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+            ASSERT_EQ(monotonic_start_time,
+                      monotonic_clock::epoch() + chrono::seconds(1));
+            break;
+          case 2:
+          case 3:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+            EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time) << file;
+            break;
+          case 4:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+            ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
+                                                chrono::nanoseconds(2322999462))
+                << " on " << file;
+            break;
+          default:
+            FAIL();
+            break;
+        }
       }
       continue;
     }
@@ -2651,39 +2691,116 @@
                   log_header->message()
                       .oldest_local_unreliable_monotonic_timestamps()
                       ->Get(0)));
-      // Confirm that the oldest timestamps match what we expect.  Based on what
-      // we are doing, we know that the oldest time is the first message's time.
-      //
-      // This makes the test robust to both the split and combined config tests.
-      switch (log_header->message().parts_index()) {
-        case 0:
-        case 1:
-          EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                    expected_oldest_remote_monotonic_timestamps);
-          EXPECT_EQ(oldest_local_monotonic_timestamps,
-                    expected_oldest_local_monotonic_timestamps);
-          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                    expected_oldest_remote_monotonic_timestamps);
-          EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                    expected_oldest_local_monotonic_timestamps);
-          break;
-        default:
-          FAIL();
-          break;
-      }
 
-      switch (log_header->message().parts_index()) {
-        case 0:
-          EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
-          EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
-          break;
-        case 1:
-          EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
-          EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
-          break;
-        default:
-          FAIL();
-          break;
+      const Channel *channel =
+          event_loop_factory_.configuration()->channels()->Get(
+              msg->message().channel_index());
+      const Connection *connection = configuration::ConnectionToNode(
+          channel, configuration::GetNode(
+                       event_loop_factory_.configuration(),
+                       log_header->message().node()->name()->string_view()));
+
+      const bool reliable = connection->time_to_live() == 0;
+
+      if (shared()) {
+        // Confirm that the oldest timestamps match what we expect.  Based on
+        // what we are doing, we know that the oldest time is the first
+        // message's time.
+        //
+        // This makes the test robust to both the split and combined config
+        // tests.
+        switch (log_header->message().parts_index()) {
+          case 0:
+          case 1:
+            EXPECT_EQ(oldest_remote_monotonic_timestamps,
+                      expected_oldest_remote_monotonic_timestamps);
+            EXPECT_EQ(oldest_local_monotonic_timestamps,
+                      expected_oldest_local_monotonic_timestamps);
+            if (reliable) {
+              EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                        monotonic_clock::max_time);
+              EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                        monotonic_clock::max_time);
+            } else {
+              EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                        expected_oldest_remote_monotonic_timestamps);
+              EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                        expected_oldest_local_monotonic_timestamps);
+            }
+            break;
+          case 2:
+            LOG(INFO) << "Shared";
+            EXPECT_EQ(
+                oldest_remote_monotonic_timestamps,
+                monotonic_clock::epoch() + chrono::nanoseconds(10000000000));
+            EXPECT_EQ(
+                oldest_local_monotonic_timestamps,
+                monotonic_clock::epoch() + chrono::nanoseconds(1323100000));
+            EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                      expected_oldest_remote_monotonic_timestamps);
+            EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                      expected_oldest_local_monotonic_timestamps);
+            break;
+          default:
+            FAIL();
+            break;
+        }
+
+        switch (log_header->message().parts_index()) {
+          case 0:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+            EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+            break;
+          case 1:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+            EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+            break;
+          case 2:
+            if (shared()) {
+              EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+              EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+              LOG(INFO) << "Shared";
+              break;
+            }
+            [[fallthrough]];
+          default:
+            FAIL();
+            break;
+        }
+      } else {
+        switch (log_header->message().parts_index()) {
+          case 0:
+          case 1:
+            if (reliable) {
+              EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                        monotonic_clock::max_time);
+              EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                        monotonic_clock::max_time);
+            } else {
+              EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                        expected_oldest_remote_monotonic_timestamps);
+              EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                        expected_oldest_local_monotonic_timestamps);
+            }
+            break;
+          default:
+            FAIL();
+            break;
+        }
+
+        switch (log_header->message().parts_index()) {
+          case 0:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
+            EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+            break;
+          case 1:
+            EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
+            EXPECT_EQ(monotonic_start_time, monotonic_clock::min_time);
+            break;
+          default:
+            FAIL();
+            break;
+        }
       }
 
       continue;
@@ -2737,7 +2854,11 @@
     }
   }
 
-  EXPECT_TRUE(timestamp_file_count == 2u || timestamp_file_count == 4u);
+  if (shared()) {
+    EXPECT_EQ(timestamp_file_count, 3u);
+  } else {
+    EXPECT_EQ(timestamp_file_count, 4u);
+  }
 
   // Confirm that we can actually sort the resulting log and read it.
   {
diff --git a/aos/network/timestamp_channel.cc b/aos/network/timestamp_channel.cc
index 88b6ed0..ab61051 100644
--- a/aos/network/timestamp_channel.cc
+++ b/aos/network/timestamp_channel.cc
@@ -27,6 +27,15 @@
   return absl::StrCat("/aos/remote_timestamps/", remote_node);
 }
 
+const Channel *ChannelTimestampFinder::SplitChannelForChannel(
+    const Channel *channel, const Connection *connection) {
+  const std::string split_timestamp_channel_name =
+      SplitChannelName(channel, connection);
+  return configuration::GetChannel(configuration_, split_timestamp_channel_name,
+                                   RemoteMessage::GetFullyQualifiedName(),
+                                   name_, node_, true);
+}
+
 const Channel *ChannelTimestampFinder::ForChannel(
     const Channel *channel, const Connection *connection) {
   const std::string split_timestamp_channel_name =
diff --git a/aos/network/timestamp_channel.h b/aos/network/timestamp_channel.h
index b78c83a..e35a170 100644
--- a/aos/network/timestamp_channel.h
+++ b/aos/network/timestamp_channel.h
@@ -15,6 +15,9 @@
 
 // Class to find the corresponding channel where timestamps for a specified data
 // channel and connection will be logged.
+//
+// This abstracts (and detects) when we have combined or split remote timestamp
+// logging channels.
 class ChannelTimestampFinder {
  public:
   ChannelTimestampFinder(aos::EventLoop *event_loop)
@@ -23,6 +26,11 @@
   ChannelTimestampFinder(const Configuration *configuration,
                          const std::string_view name, const Node *node);
 
+  // Returns the split timestamp logging channel for the provide channel and
+  // connection if one exists, or nullptr otherwise.
+  const Channel *SplitChannelForChannel(const Channel *channel,
+                                        const Connection *connection);
+
   // Finds the timestamp logging channel for the provided data channel and
   // connection.
   const Channel *ForChannel(const Channel *channel,