Split local data and timestamps into separate files

This sets us up to read all the timestamps before the data to enable
sorting.  This doesn't do that yet.  It only sets the files and formats
up to be ready.

Change-Id: I1213a70c90867a082ca37b21eafd99f9b95b600d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index f6e5447..648f430 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -79,7 +79,7 @@
     }
 
     EXPECT_EQ(logfile_uuids.size(), 2u);
-    EXPECT_EQ(parts_uuids.size(), 6u);
+    EXPECT_EQ(parts_uuids.size(), 8u);
 
     // And confirm everything is on the correct node.
     EXPECT_EQ(log_header[2].message().node()->name()->string_view(), "pi1");
@@ -105,12 +105,14 @@
 
     // And the parts index matches.
     EXPECT_EQ(log_header[2].message().parts_index(), 0);
-    EXPECT_EQ(log_header[3].message().parts_index(), 1);
-    EXPECT_EQ(log_header[4].message().parts_index(), 2);
+
+    EXPECT_EQ(log_header[3].message().parts_index(), 0);
+    EXPECT_EQ(log_header[4].message().parts_index(), 1);
 
     EXPECT_EQ(log_header[5].message().parts_index(), 0);
-    EXPECT_EQ(log_header[6].message().parts_index(), 1);
-    EXPECT_EQ(log_header[7].message().parts_index(), 2);
+
+    EXPECT_EQ(log_header[6].message().parts_index(), 0);
+    EXPECT_EQ(log_header[7].message().parts_index(), 1);
 
     EXPECT_EQ(log_header[8].message().parts_index(), 0);
     EXPECT_EQ(log_header[9].message().parts_index(), 1);
@@ -127,24 +129,18 @@
 
     // And that the data_stored field is right.
     EXPECT_THAT(*log_header[2].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::DATA));
     EXPECT_THAT(*log_header[3].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
     EXPECT_THAT(*log_header[4].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
 
     EXPECT_THAT(*log_header[5].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::DATA));
     EXPECT_THAT(*log_header[6].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
     EXPECT_THAT(*log_header[7].message().data_stored(),
-                ::testing::ElementsAre(StoredDataType::DATA,
-                                       StoredDataType::TIMESTAMPS));
+                ::testing::ElementsAre(StoredDataType::TIMESTAMPS));
 
     EXPECT_THAT(*log_header[8].message().data_stored(),
                 ::testing::ElementsAre(StoredDataType::DATA));
@@ -176,49 +172,44 @@
     std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
 
     // Timing reports, pings
-    EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
-                UnorderedElementsAre(
-                    std::make_tuple("/pi1/aos",
-                                    "aos.message_bridge.ServerStatistics", 1),
-                    std::make_tuple("/test", "aos.examples.Ping", 1),
-                    std::make_tuple("/pi1/aos", "aos.examples.Ping", 1)))
-        << " : " << logfiles_[2];
-    {
-      std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
-          std::make_tuple("/pi1/aos", "aos.examples.Ping", 10),
-          std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 1),
-          std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
-                          1)};
-      if (!std::get<0>(GetParam()).shared) {
-        channel_counts.push_back(
-            std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
-                            "aos-message_bridge-Timestamp",
-                            "aos.message_bridge.RemoteMessage", 1));
-      }
-      EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
-                  ::testing::UnorderedElementsAreArray(channel_counts))
-          << " : " << logfiles_[3];
+    if (shared()) {
+      EXPECT_THAT(
+          CountChannelsData(config, logfiles_[2]),
+          UnorderedElementsAre(
+              std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+                              200),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
+                              21),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+              std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
+              std::make_tuple("/test", "aos.examples.Ping", 2001)))
+          << " : " << logfiles_[2];
+    } else {
+      EXPECT_THAT(
+          CountChannelsData(config, logfiles_[2]),
+          UnorderedElementsAre(
+              std::make_tuple("/pi1/aos", "aos.examples.Ping", 2001),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
+                              200),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
+                              21),
+              std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+              std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
+              std::make_tuple("/test", "aos.examples.Ping", 2001),
+              std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
+                              "aos-message_bridge-Timestamp",
+                              "aos.message_bridge.RemoteMessage", 200)))
+          << " : " << logfiles_[2];
     }
-    {
-      std::vector<std::tuple<std::string, std::string, int>> channel_counts = {
-          std::make_tuple("/pi1/aos", "aos.examples.Ping", 1990),
-          std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 199),
-          std::make_tuple("/pi1/aos", "aos.message_bridge.ServerStatistics",
-                          20),
-          std::make_tuple("/pi1/aos", "aos.message_bridge.ClientStatistics",
-                          199),
-          std::make_tuple("/pi1/aos", "aos.timing.Report", 60),
-          std::make_tuple("/test", "aos.examples.Ping", 2000)};
-      if (!std::get<0>(GetParam()).shared) {
-        channel_counts.push_back(
-            std::make_tuple("/pi1/aos/remote_timestamps/pi2/pi1/aos/"
-                            "aos-message_bridge-Timestamp",
-                            "aos.message_bridge.RemoteMessage", 199));
-      }
-      EXPECT_THAT(CountChannelsData(config, logfiles_[4]),
-                  ::testing::UnorderedElementsAreArray(channel_counts))
-          << " : " << logfiles_[4];
-    }
+
+    EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
+                ::testing::UnorderedElementsAre())
+        << " : " << logfiles_[3];
+    EXPECT_THAT(CountChannelsData(config, logfiles_[3]),
+                ::testing::UnorderedElementsAre())
+        << " : " << logfiles_[4];
+
     // Timestamps for pong
     EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[2]),
                 UnorderedElementsAre())
@@ -235,27 +226,21 @@
         << " : " << logfiles_[4];
 
     // Timing reports and pongs.
-    EXPECT_THAT(CountChannelsData(config, logfiles_[5]),
-                UnorderedElementsAre(
-                    std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
-                    std::make_tuple("/pi2/aos",
-                                    "aos.message_bridge.ServerStatistics", 1)))
-        << " : " << logfiles_[5];
     EXPECT_THAT(
-        CountChannelsData(config, logfiles_[6]),
-        UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
-        << " : " << logfiles_[6];
-    EXPECT_THAT(
-        CountChannelsData(config, logfiles_[7]),
+        CountChannelsData(config, logfiles_[5]),
         UnorderedElementsAre(
-            std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
-            std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
-            std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
-                            20),
+            std::make_tuple("/pi2/aos", "aos.examples.Ping", 2001),
             std::make_tuple("/pi2/aos", "aos.message_bridge.ClientStatistics",
                             200),
+            std::make_tuple("/pi2/aos", "aos.message_bridge.ServerStatistics",
+                            21),
+            std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
             std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
-            std::make_tuple("/test", "aos.examples.Pong", 2000)))
+            std::make_tuple("/test", "aos.examples.Pong", 2001)))
+        << " : " << logfiles_[5];
+    EXPECT_THAT(CountChannelsData(config, logfiles_[6]), UnorderedElementsAre())
+        << " : " << logfiles_[6];
+    EXPECT_THAT(CountChannelsData(config, logfiles_[7]), UnorderedElementsAre())
         << " : " << logfiles_[7];
     // And ping timestamps.
     EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
@@ -977,7 +962,7 @@
   }
 
   const std::vector<LogFile> sorted_parts =
-      SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3));
+      SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 1, 1, 2, 2));
   EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
   LogReader reader(sorted_parts);
 
@@ -1155,10 +1140,10 @@
   // For snappy, needs to have enough data to be >1 chunk of compressed data so
   // that we don't corrupt the entire log part.
   ::std::string compressed_contents =
-      aos::util::ReadFileToStringOrDie(logfiles_[4]);
+      aos::util::ReadFileToStringOrDie(logfiles_[2]);
 
   aos::util::WriteStringToFileOrDie(
-      logfiles_[4],
+      logfiles_[2],
       compressed_contents.substr(0, compressed_contents.size() - 100));
 
   const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
@@ -1954,7 +1939,7 @@
   // hitting any fatal errors.
   {
     LogReader relogged_reader(SortParts(MakeLogFiles(
-        tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 3, 3, true)));
+        tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2", 1, 1, 2, 2, true)));
     relogged_reader.Register();
 
     relogged_reader.event_loop_factory()->Run();
@@ -1964,7 +1949,7 @@
   {
     LogReader relogged_reader(
         SortParts(MakeLogFiles(tmp_dir_ + "/relogged1", tmp_dir_ + "/relogged2",
-                               3, 3, true)),
+                               1, 1, 2, 2, true)),
         reader.configuration());
     relogged_reader.Register();
 
@@ -2150,7 +2135,7 @@
       //
       // TODO(austin): I'm not the most thrilled with this test pattern...  It
       // feels brittle in a different way.
-      if (file.find("source_pi1_timestamp_pi2") == std::string::npos) {
+      if (file.find("timestamps/remote_pi2") == std::string::npos) {
         switch (log_header->message().parts_index()) {
           case 0:
             EXPECT_EQ(source_node_boot_uuid, pi2_boot0);
@@ -2287,93 +2272,107 @@
               monotonic_clock::max_time);
     EXPECT_EQ(oldest_logger_local_unreliable_monotonic_timestamps,
               monotonic_clock::max_time);
-    switch (log_header->message().parts_index()) {
-      case 0:
-        EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        EXPECT_EQ(oldest_local_monotonic_timestamps, monotonic_clock::max_time);
-        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        break;
-      case 1:
-        EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90200)));
-        EXPECT_EQ(oldest_local_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90350)));
-        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90200)));
-        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90350)));
-        EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time);
-        break;
-      case 2:
-        EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90200)))
-            << file;
-        EXPECT_EQ(oldest_local_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90350)))
-            << file;
-        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90200)))
-            << file;
-        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(90350)))
-            << file;
-        EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(100000)))
-            << file;
-        EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(100150)))
-            << file;
-        break;
-      case 3:
-        EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::milliseconds(1323) +
-                                              chrono::microseconds(200)));
-        EXPECT_EQ(oldest_local_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100350)));
-        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::milliseconds(1323) +
-                                              chrono::microseconds(200)));
-        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100350)));
-        EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time)
-            << file;
-        EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                  monotonic_clock::max_time)
-            << file;
-        break;
-      case 4:
-        EXPECT_EQ(oldest_remote_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::milliseconds(1323) +
-                                              chrono::microseconds(200)));
-        EXPECT_EQ(oldest_local_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100350)));
-        EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::milliseconds(1323) +
-                                              chrono::microseconds(200)));
-        EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10100350)));
-        EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(1423000)))
-            << file;
-        EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                  monotonic_clock::time_point(chrono::microseconds(10200150)))
-            << file;
-        break;
-      default:
-        FAIL();
-        break;
+    if (log_header->message().data_stored()->Get(0) == StoredDataType::DATA) {
+      switch (log_header->message().parts_index()) {
+        case 0:
+          ASSERT_EQ(oldest_remote_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_local_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          break;
+        default:
+          FAIL();
+          break;
+      }
+    } else if (log_header->message().data_stored()->Get(0) ==
+               StoredDataType::TIMESTAMPS) {
+      switch (log_header->message().parts_index()) {
+        case 0:
+          ASSERT_EQ(oldest_remote_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90200)));
+          EXPECT_EQ(oldest_local_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90350)));
+          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90200)));
+          EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90350)));
+          EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time);
+          break;
+        case 1:
+          ASSERT_EQ(oldest_remote_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90200)))
+              << file;
+          EXPECT_EQ(oldest_local_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90350)))
+              << file;
+          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90200)))
+              << file;
+          EXPECT_EQ(oldest_local_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(90350)))
+              << file;
+          EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(100000)))
+              << file;
+          EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(100150)))
+              << file;
+          break;
+        case 2:
+          ASSERT_EQ(oldest_remote_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::milliseconds(1323) +
+                                                chrono::microseconds(200)));
+          EXPECT_EQ(
+              oldest_local_monotonic_timestamps,
+              monotonic_clock::time_point(chrono::microseconds(10100350)));
+          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::milliseconds(1323) +
+                                                chrono::microseconds(200)));
+          EXPECT_EQ(
+              oldest_local_unreliable_monotonic_timestamps,
+              monotonic_clock::time_point(chrono::microseconds(10100350)));
+          EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time)
+              << file;
+          EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+                    monotonic_clock::max_time)
+              << file;
+          break;
+        case 3:
+          ASSERT_EQ(oldest_remote_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::milliseconds(1323) +
+                                                chrono::microseconds(200)));
+          EXPECT_EQ(
+              oldest_local_monotonic_timestamps,
+              monotonic_clock::time_point(chrono::microseconds(10100350)));
+          EXPECT_EQ(oldest_remote_unreliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::milliseconds(1323) +
+                                                chrono::microseconds(200)));
+          EXPECT_EQ(
+              oldest_local_unreliable_monotonic_timestamps,
+              monotonic_clock::time_point(chrono::microseconds(10100350)));
+          EXPECT_EQ(oldest_remote_reliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(1423000)))
+              << file;
+          EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
+                    monotonic_clock::time_point(chrono::microseconds(10200150)))
+              << file;
+          break;
+        default:
+          FAIL();
+          break;
+      }
     }
   }
 
@@ -2513,7 +2512,7 @@
               2u);
 
     if (log_header->message().node()->name()->string_view() != "pi1") {
-      ASSERT_TRUE(file.find("source_pi1_timestamp_pi2") != std::string::npos);
+      ASSERT_TRUE(file.find("timestamps/remote_pi2") != std::string::npos);
 
       const std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
           ReadNthMessage(file, 0);
@@ -2915,8 +2914,10 @@
 
   std::vector<std::string> missing_parts;
 
-  missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part0" + Extension());
-  missing_parts.emplace_back(logfile_base1_ + "_pi1_data.part2" + Extension());
+  missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part0" +
+                             Extension());
+  missing_parts.emplace_back(logfile_base1_ + "_pi1_timestamps.part2" +
+                             Extension());
   missing_parts.emplace_back(absl::StrCat(
       logfile_base1_, "_", std::get<0>(GetParam()).sha256, Extension()));