Reduce the number of remote data writers

Data coming in from other nodes are logged per channel, i.e. there is
one data writer and one file per channel information being logged from
remote nodes. This takes up significant memory for the compression level
we aim for. Reduce the number of writers by consolidating all channel
data per node.

Change-Id: Id44d8ab51d0d076ae4ad6917bc22bd9e70bd3aae
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index bdcb454..16bd609 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -727,15 +727,29 @@
     nodes_.emplace_back(source_node);
   }
 
+  // If we already have a data writer for the node, then use the same writer for
+  // all channels of that node.
+  if (node_data_writers_.find(source_node) != node_data_writers_.end()) {
+    node_data_writers_.find(source_node)
+        ->second->UpdateMaxMessageSize(
+            PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
+    return node_data_writers_.find(source_node)->second;
+  }
+
+  // If we don't have a data writer for the node, create one.
   NewDataWriter data_writer(
       this, source_node, node_,
       [this, channel](NewDataWriter *data_writer) {
         OpenWriter(channel, data_writer);
       },
       [this](NewDataWriter *data_writer) { CloseWriter(&data_writer->writer); },
+      0);
+  data_writer.UpdateMaxMessageSize(
       PackMessageSize(LogType::kLogRemoteMessage, channel->max_size()));
-  return &(
-      data_writers_.emplace(channel, std::move(data_writer)).first->second);
+
+  data_writers_.emplace(channel, std::move(data_writer));
+  node_data_writers_.emplace(source_node, &data_writers_.find(channel)->second);
+  return &(data_writers_.find(channel)->second);
 }
 
 NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
@@ -782,6 +796,7 @@
 
 WriteCode MultiNodeLogNamer::Close() {
   data_writers_.clear();
+  node_data_writers_.clear();
   data_writer_.reset();
   if (ran_out_of_space_) {
     return WriteCode::kOutOfSpace;
@@ -820,9 +835,9 @@
 void MultiNodeLogNamer::OpenWriter(const Channel *channel,
                                    NewDataWriter *data_writer) {
   const std::string filename = absl::StrCat(
-      CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
-      channel->name()->string_view(), "/", channel->type()->string_view(),
-      ".part", data_writer->parts_index(), ".bfbs", extension_);
+      CHECK_NOTNULL(channel->source_node())->string_view(), "_data/",
+      channel->source_node()->string_view(), "_data.part",
+      data_writer->parts_index(), ".bfbs", extension_);
   CreateBufferWriter(filename, data_writer->max_message_size(),
                      &data_writer->writer);
 }