Reduce the number of remote data writers

Data coming in from other nodes are logged per channel, i.e. there is
one data writer and one file per channel information being logged from
remote nodes. This takes up significant memory for the compression level
we aim for. Reduce the number of writers by consolidating all channel
data per node.

Change-Id: Id44d8ab51d0d076ae4ad6917bc22bd9e70bd3aae
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index 01f4d2f..b1f35f9 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -80,9 +80,9 @@
 
     EXPECT_EQ(logfile_uuids.size(), 2u);
     if (shared()) {
-      EXPECT_EQ(parts_uuids.size(), 7u);
+      EXPECT_EQ(parts_uuids.size(), 6u);
     } else {
-      EXPECT_EQ(parts_uuids.size(), 8u);
+      EXPECT_EQ(parts_uuids.size(), 7u);
     }
 
     // And confirm everything is on the correct node.
@@ -92,33 +92,30 @@
 
     EXPECT_EQ(log_header[5].message().node()->name()->string_view(), "pi2");
     EXPECT_EQ(log_header[6].message().node()->name()->string_view(), "pi2");
-
     EXPECT_EQ(log_header[7].message().node()->name()->string_view(), "pi2");
-    EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi2");
-    EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi2");
 
-    EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi1");
-    EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi1");
+    EXPECT_EQ(log_header[8].message().node()->name()->string_view(), "pi1");
+    EXPECT_EQ(log_header[9].message().node()->name()->string_view(), "pi1");
 
-    EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
-    EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[10].message().node()->name()->string_view(), "pi2");
+    EXPECT_EQ(log_header[11].message().node()->name()->string_view(), "pi2");
 
     if (shared()) {
+      EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
+      EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
       EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
-      EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
-      EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
 
-      EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
-      EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi1");
-    } else {
-      EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi2");
-      EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi2");
-
+      EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
       EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi1");
-      EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi1");
+    } else {
+      EXPECT_EQ(log_header[12].message().node()->name()->string_view(), "pi2");
+      EXPECT_EQ(log_header[13].message().node()->name()->string_view(), "pi2");
 
-      EXPECT_EQ(log_header[18].message().node()->name()->string_view(), "pi2");
-      EXPECT_EQ(log_header[19].message().node()->name()->string_view(), "pi2");
+      EXPECT_EQ(log_header[14].message().node()->name()->string_view(), "pi1");
+      EXPECT_EQ(log_header[15].message().node()->name()->string_view(), "pi1");
+
+      EXPECT_EQ(log_header[16].message().node()->name()->string_view(), "pi2");
+      EXPECT_EQ(log_header[17].message().node()->name()->string_view(), "pi2");
     }
 
     // And the parts index matches.
@@ -128,33 +125,30 @@
 
     EXPECT_EQ(log_header[5].message().parts_index(), 0);
     EXPECT_EQ(log_header[6].message().parts_index(), 1);
+    EXPECT_EQ(log_header[7].message().parts_index(), 2);
 
-    EXPECT_EQ(log_header[7].message().parts_index(), 0);
-    EXPECT_EQ(log_header[8].message().parts_index(), 1);
-    EXPECT_EQ(log_header[9].message().parts_index(), 2);
+    EXPECT_EQ(log_header[8].message().parts_index(), 0);
+    EXPECT_EQ(log_header[9].message().parts_index(), 1);
 
     EXPECT_EQ(log_header[10].message().parts_index(), 0);
     EXPECT_EQ(log_header[11].message().parts_index(), 1);
 
-    EXPECT_EQ(log_header[12].message().parts_index(), 0);
-    EXPECT_EQ(log_header[13].message().parts_index(), 1);
-
     if (shared()) {
-      EXPECT_EQ(log_header[14].message().parts_index(), 0);
-      EXPECT_EQ(log_header[15].message().parts_index(), 1);
-      EXPECT_EQ(log_header[16].message().parts_index(), 2);
+      EXPECT_EQ(log_header[12].message().parts_index(), 0);
+      EXPECT_EQ(log_header[13].message().parts_index(), 1);
+      EXPECT_EQ(log_header[14].message().parts_index(), 2);
 
-      EXPECT_EQ(log_header[17].message().parts_index(), 0);
-      EXPECT_EQ(log_header[18].message().parts_index(), 1);
+      EXPECT_EQ(log_header[15].message().parts_index(), 0);
+      EXPECT_EQ(log_header[16].message().parts_index(), 1);
     } else {
+      EXPECT_EQ(log_header[12].message().parts_index(), 0);
+      EXPECT_EQ(log_header[13].message().parts_index(), 1);
+
       EXPECT_EQ(log_header[14].message().parts_index(), 0);
       EXPECT_EQ(log_header[15].message().parts_index(), 1);
 
       EXPECT_EQ(log_header[16].message().parts_index(), 0);
       EXPECT_EQ(log_header[17].message().parts_index(), 1);
-
-      EXPECT_EQ(log_header[18].message().parts_index(), 0);
-      EXPECT_EQ(log_header[19].message().parts_index(), 1);
     }
   }
 
@@ -223,37 +217,19 @@
             std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)))
         << " : " << logfiles_[4];
 
-    // Pong data.
-    EXPECT_THAT(
-        CountChannelsData(config, logfiles_[5]),
-        UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 91)))
-        << " : " << logfiles_[5];
-    EXPECT_THAT(CountChannelsData(config, logfiles_[6]),
-                UnorderedElementsAre(
-                    std::make_tuple("/test", "aos.examples.Pong", 1910)))
-        << " : " << logfiles_[6];
-
-    // No timestamps
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
-                UnorderedElementsAre())
-        << " : " << logfiles_[5];
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[6]),
-                UnorderedElementsAre())
-        << " : " << logfiles_[6];
-
     // Timing reports and pongs.
-    EXPECT_THAT(CountChannelsData(config, logfiles_[7]),
+    EXPECT_THAT(CountChannelsData(config, logfiles_[5]),
                 UnorderedElementsAre(
                     std::make_tuple("/pi2/aos", "aos.examples.Ping", 1),
                     std::make_tuple("/pi2/aos",
                                     "aos.message_bridge.ServerStatistics", 1)))
-        << " : " << logfiles_[7];
+        << " : " << logfiles_[5];
     EXPECT_THAT(
-        CountChannelsData(config, logfiles_[8]),
+        CountChannelsData(config, logfiles_[6]),
         UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Pong", 1)))
-        << " : " << logfiles_[8];
+        << " : " << logfiles_[6];
     EXPECT_THAT(
-        CountChannelsData(config, logfiles_[9]),
+        CountChannelsData(config, logfiles_[7]),
         UnorderedElementsAre(
             std::make_tuple("/pi2/aos", "aos.examples.Ping", 2000),
             std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
@@ -263,57 +239,68 @@
                             200),
             std::make_tuple("/pi2/aos", "aos.timing.Report", 60),
             std::make_tuple("/test", "aos.examples.Pong", 2000)))
-        << " : " << logfiles_[9];
-    // And ping timestamps.
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[7]),
-                UnorderedElementsAre())
         << " : " << logfiles_[7];
+    // And ping timestamps.
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[5]),
+                UnorderedElementsAre())
+        << " : " << logfiles_[5];
     EXPECT_THAT(
-        CountChannelsTimestamp(config, logfiles_[8]),
+        CountChannelsTimestamp(config, logfiles_[6]),
         UnorderedElementsAre(std::make_tuple("/test", "aos.examples.Ping", 1)))
-        << " : " << logfiles_[8];
+        << " : " << logfiles_[6];
     EXPECT_THAT(
-        CountChannelsTimestamp(config, logfiles_[9]),
+        CountChannelsTimestamp(config, logfiles_[7]),
         UnorderedElementsAre(
             std::make_tuple("/test", "aos.examples.Ping", 2000),
             std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)))
-        << " : " << logfiles_[9];
+        << " : " << logfiles_[7];
 
     // And then test that the remotely logged timestamp data files only have
     // timestamps in them.
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[8]),
+                UnorderedElementsAre())
+        << " : " << logfiles_[8];
+    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[9]),
+                UnorderedElementsAre())
+        << " : " << logfiles_[9];
     EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[10]),
                 UnorderedElementsAre())
         << " : " << logfiles_[10];
     EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[11]),
                 UnorderedElementsAre())
         << " : " << logfiles_[11];
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
-                UnorderedElementsAre())
-        << " : " << logfiles_[12];
-    EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
-                UnorderedElementsAre())
-        << " : " << logfiles_[13];
 
-    EXPECT_THAT(CountChannelsData(config, logfiles_[10]),
+    EXPECT_THAT(CountChannelsData(config, logfiles_[8]),
                 UnorderedElementsAre(std::make_tuple(
                     "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
-        << " : " << logfiles_[10];
-    EXPECT_THAT(CountChannelsData(config, logfiles_[11]),
+        << " : " << logfiles_[8];
+    EXPECT_THAT(CountChannelsData(config, logfiles_[9]),
                 UnorderedElementsAre(std::make_tuple(
                     "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
-        << " : " << logfiles_[11];
+        << " : " << logfiles_[9];
 
-    EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
-                UnorderedElementsAre(std::make_tuple(
-                    "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
-        << " : " << logfiles_[12];
-    EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
-                UnorderedElementsAre(std::make_tuple(
-                    "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
-        << " : " << logfiles_[13];
+    // Pong snd timestamp data.
+    EXPECT_THAT(
+        CountChannelsData(config, logfiles_[10]),
+        UnorderedElementsAre(
+            std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 9),
+            std::make_tuple("/test", "aos.examples.Pong", 91)))
+        << " : " << logfiles_[10];
+    EXPECT_THAT(
+        CountChannelsData(config, logfiles_[11]),
+        UnorderedElementsAre(
+            std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 191),
+            std::make_tuple("/test", "aos.examples.Pong", 1910)))
+        << " : " << logfiles_[11];
 
     // Timestamps from pi2 on pi1, and the other way.
     if (shared()) {
+      EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
+                  UnorderedElementsAre())
+          << " : " << logfiles_[12];
+      EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
+                  UnorderedElementsAre())
+          << " : " << logfiles_[13];
       EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
                   UnorderedElementsAre())
           << " : " << logfiles_[14];
@@ -323,38 +310,38 @@
       EXPECT_THAT(CountChannelsData(config, logfiles_[16]),
                   UnorderedElementsAre())
           << " : " << logfiles_[16];
-      EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
-                  UnorderedElementsAre())
-          << " : " << logfiles_[17];
-      EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
-                  UnorderedElementsAre())
-          << " : " << logfiles_[18];
 
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
                   UnorderedElementsAre(
                       std::make_tuple("/test", "aos.examples.Ping", 1)))
-          << " : " << logfiles_[14];
+          << " : " << logfiles_[12];
       EXPECT_THAT(
-          CountChannelsTimestamp(config, logfiles_[15]),
+          CountChannelsTimestamp(config, logfiles_[13]),
           UnorderedElementsAre(
               std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 9),
               std::make_tuple("/test", "aos.examples.Ping", 90)))
-          << " : " << logfiles_[15];
+          << " : " << logfiles_[13];
       EXPECT_THAT(
-          CountChannelsTimestamp(config, logfiles_[16]),
+          CountChannelsTimestamp(config, logfiles_[14]),
           UnorderedElementsAre(
               std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 191),
               std::make_tuple("/test", "aos.examples.Ping", 1910)))
-          << " : " << logfiles_[16];
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
+          << " : " << logfiles_[14];
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
                   UnorderedElementsAre(std::make_tuple(
                       "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
-          << " : " << logfiles_[17];
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
+          << " : " << logfiles_[15];
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
                   UnorderedElementsAre(std::make_tuple(
                       "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
-          << " : " << logfiles_[18];
+          << " : " << logfiles_[16];
     } else {
+      EXPECT_THAT(CountChannelsData(config, logfiles_[12]),
+                  UnorderedElementsAre())
+          << " : " << logfiles_[12];
+      EXPECT_THAT(CountChannelsData(config, logfiles_[13]),
+                  UnorderedElementsAre())
+          << " : " << logfiles_[13];
       EXPECT_THAT(CountChannelsData(config, logfiles_[14]),
                   UnorderedElementsAre())
           << " : " << logfiles_[14];
@@ -367,37 +354,31 @@
       EXPECT_THAT(CountChannelsData(config, logfiles_[17]),
                   UnorderedElementsAre())
           << " : " << logfiles_[17];
-      EXPECT_THAT(CountChannelsData(config, logfiles_[18]),
-                  UnorderedElementsAre())
-          << " : " << logfiles_[18];
-      EXPECT_THAT(CountChannelsData(config, logfiles_[19]),
-                  UnorderedElementsAre())
-          << " : " << logfiles_[19];
 
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[12]),
                   UnorderedElementsAre(std::make_tuple(
                       "/pi1/aos", "aos.message_bridge.Timestamp", 9)))
+          << " : " << logfiles_[12];
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[13]),
+                  UnorderedElementsAre(std::make_tuple(
+                      "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
+          << " : " << logfiles_[13];
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[14]),
+                  UnorderedElementsAre(std::make_tuple(
+                      "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
           << " : " << logfiles_[14];
       EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[15]),
                   UnorderedElementsAre(std::make_tuple(
-                      "/pi1/aos", "aos.message_bridge.Timestamp", 191)))
+                      "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
           << " : " << logfiles_[15];
       EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[16]),
-                  UnorderedElementsAre(std::make_tuple(
-                      "/pi2/aos", "aos.message_bridge.Timestamp", 9)))
-          << " : " << logfiles_[16];
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
-                  UnorderedElementsAre(std::make_tuple(
-                      "/pi2/aos", "aos.message_bridge.Timestamp", 191)))
-          << " : " << logfiles_[17];
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[18]),
                   UnorderedElementsAre(
                       std::make_tuple("/test", "aos.examples.Ping", 91)))
-          << " : " << logfiles_[18];
-      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[19]),
+          << " : " << logfiles_[16];
+      EXPECT_THAT(CountChannelsTimestamp(config, logfiles_[17]),
                   UnorderedElementsAre(
                       std::make_tuple("/test", "aos.examples.Ping", 1910)))
-          << " : " << logfiles_[19];
+          << " : " << logfiles_[17];
     }
   }