Move log_namer to log_namer.{cc,h}

logger.{cc,h} is getting too big.  Also move more implementation from
the header to the .cc, and add comments.

Change-Id: Ia342bfade98c71d4d42f769571256c28e2c826d5
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 5aba387..56bef43 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -36,11 +36,13 @@
 cc_library(
     name = "logger",
     srcs = [
+        "log_namer.cc",
         "logger.cc",
         "logger_math.cc",
     ],
     hdrs = [
         "eigen_mpq.h",
+        "log_namer.h",
         "logger.h",
     ],
     visibility = ["//visibility:public"],
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
new file mode 100644
index 0000000..def0842
--- /dev/null
+++ b/aos/events/logging/log_namer.cc
@@ -0,0 +1,237 @@
+#include "aos/events/logging/log_namer.h"
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "absl/strings/str_cat.h"
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
+#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace logger {
+
+void LogNamer::UpdateHeader(
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+    const UUID &uuid, int parts_index) const {
+  header->mutable_message()->mutate_parts_index(parts_index);
+  CHECK_EQ(uuid.string_view().size(),
+           header->mutable_message()->mutable_parts_uuid()->size());
+  std::copy(uuid.string_view().begin(), uuid.string_view().end(),
+            reinterpret_cast<char *>(
+                header->mutable_message()->mutable_parts_uuid()->Data()));
+}
+
+void LocalLogNamer::WriteHeader(
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+    const Node *node) {
+  CHECK_EQ(node, this->node());
+  UpdateHeader(header, uuid_, part_number_);
+  data_writer_->WriteSizedFlatbuffer(header->full_span());
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
+  CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
+  return data_writer_.get();
+}
+
+void LocalLogNamer::Rotate(
+    const Node *node,
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+  CHECK(node == this->node());
+  ++part_number_;
+  *data_writer_ = std::move(*OpenDataWriter());
+  UpdateHeader(header, uuid_, part_number_);
+  data_writer_->WriteSizedFlatbuffer(header->full_span());
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
+    const Channel *channel) {
+  CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
+      << ": Message is not delivered to this node.";
+  CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
+  CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
+                                                            node_))
+      << ": Delivery times aren't logged for this channel on this node.";
+  return data_writer_.get();
+}
+
+DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
+    const Channel * /*channel*/, const Node * /*node*/) {
+  LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
+  return nullptr;
+}
+
+MultiNodeLogNamer::MultiNodeLogNamer(std::string_view base_name,
+                                     const Configuration *configuration,
+                                     const Node *node)
+    : LogNamer(node),
+      base_name_(base_name),
+      configuration_(configuration),
+      uuid_(UUID::Random()),
+      data_writer_(OpenDataWriter()) {}
+
+void MultiNodeLogNamer::WriteHeader(
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+    const Node *node) {
+  if (node == this->node()) {
+    UpdateHeader(header, uuid_, part_number_);
+    data_writer_->WriteSizedFlatbuffer(header->full_span());
+  } else {
+    for (std::pair<const Channel *const, DataWriter> &data_writer :
+         data_writers_) {
+      if (node == data_writer.second.node) {
+        UpdateHeader(header, data_writer.second.uuid,
+                     data_writer.second.part_number);
+        data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+      }
+    }
+  }
+}
+
+void MultiNodeLogNamer::Rotate(
+    const Node *node,
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
+  if (node == this->node()) {
+    ++part_number_;
+    *data_writer_ = std::move(*OpenDataWriter());
+    UpdateHeader(header, uuid_, part_number_);
+    data_writer_->WriteSizedFlatbuffer(header->full_span());
+  } else {
+    for (std::pair<const Channel *const, DataWriter> &data_writer :
+         data_writers_) {
+      if (node == data_writer.second.node) {
+        ++data_writer.second.part_number;
+        data_writer.second.rotate(data_writer.first, &data_writer.second);
+        UpdateHeader(header, data_writer.second.uuid,
+                     data_writer.second.part_number);
+        data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
+      }
+    }
+  }
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
+  // See if we can read the data on this node at all.
+  const bool is_readable =
+      configuration::ChannelIsReadableOnNode(channel, this->node());
+  if (!is_readable) {
+    return nullptr;
+  }
+
+  // Then, see if we are supposed to log the data here.
+  const bool log_message =
+      configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
+
+  if (!log_message) {
+    return nullptr;
+  }
+
+  // Now, sort out if this is data generated on this node, or not.  It is
+  // generated if it is sendable on this node.
+  if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
+    return data_writer_.get();
+  }
+
+  // Ok, we have data that is being forwarded to us that we are supposed to
+  // log.  It needs to be logged with send timestamps, but be sorted enough
+  // to be able to be processed.
+  CHECK(data_writers_.find(channel) == data_writers_.end());
+
+  // Track that this node is being logged.
+  const Node *source_node = configuration::GetNode(
+      configuration_, channel->source_node()->string_view());
+
+  if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
+    nodes_.emplace_back(source_node);
+  }
+
+  DataWriter data_writer;
+  data_writer.node = source_node;
+  data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
+    OpenWriter(channel, data_writer);
+  };
+  data_writer.rotate(channel, &data_writer);
+
+  return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+      .first->second.writer.get();
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
+    const Channel *channel, const Node *node) {
+  // See if we can read the data on this node at all.
+  const bool is_readable =
+      configuration::ChannelIsReadableOnNode(channel, this->node());
+  CHECK(is_readable) << ": " << configuration::CleanedChannelToString(channel);
+
+  CHECK(data_writers_.find(channel) == data_writers_.end());
+
+  if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
+    nodes_.emplace_back(node);
+  }
+
+  DataWriter data_writer;
+  data_writer.node = node;
+  data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
+    OpenForwardedTimestampWriter(channel, data_writer);
+  };
+  data_writer.rotate(channel, &data_writer);
+
+  return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
+      .first->second.writer.get();
+}
+
+DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
+    const Channel *channel) {
+  const bool log_delivery_times =
+      (this->node() == nullptr)
+          ? false
+          : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+                channel, this->node(), this->node());
+  if (!log_delivery_times) {
+    return nullptr;
+  }
+
+  return data_writer_.get();
+}
+
+void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
+                                                     DataWriter *data_writer) {
+  std::string filename =
+      absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
+                   "/", channel->type()->string_view(), ".part",
+                   data_writer->part_number, ".bfbs");
+
+  if (!data_writer->writer) {
+    data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+  } else {
+    *data_writer->writer = DetachedBufferWriter(filename);
+  }
+}
+
+void MultiNodeLogNamer::OpenWriter(const Channel *channel,
+                                   DataWriter *data_writer) {
+  const std::string filename = absl::StrCat(
+      base_name_, "_", channel->source_node()->string_view(), "_data",
+      channel->name()->string_view(), "/", channel->type()->string_view(),
+      ".part", data_writer->part_number, ".bfbs");
+  if (!data_writer->writer) {
+    data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
+  } else {
+    *data_writer->writer = DetachedBufferWriter(filename);
+  }
+}
+
+std::unique_ptr<DetachedBufferWriter> MultiNodeLogNamer::OpenDataWriter() {
+  return std::make_unique<DetachedBufferWriter>(
+      absl::StrCat(base_name_, "_", node()->name()->string_view(), "_data.part",
+                   part_number_, ".bfbs"));
+}
+
+}  // namespace logger
+}  // namespace aos
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
new file mode 100644
index 0000000..631016b
--- /dev/null
+++ b/aos/events/logging/log_namer.h
@@ -0,0 +1,168 @@
+#ifndef AOS_EVENTS_LOGGING_LOG_NAMER_H_
+#define AOS_EVENTS_LOGGING_LOG_NAMER_H_
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <string_view>
+#include <vector>
+
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+// Interface describing how to name, track, and add headers to log file parts.
+class LogNamer {
+ public:
+  // Constructs a LogNamer with the primary node (ie the one the logger runs on)
+  // being node.
+  LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
+  virtual ~LogNamer() {}
+
+  // Writes the header to all log files for a specific node.  This function
+  // needs to be called after all the writers are created.
+  //
+  // Modifies header to contain the uuid and part number for each writer as it
+  // writes it.  Since this is done unconditionally, it does not restore the
+  // previous value at the end.
+  virtual void WriteHeader(
+      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+      const Node *node) = 0;
+
+  // Returns a writer for writing data logged on this channel (on the node
+  // provided in the constructor).
+  virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
+
+  // Returns a writer for writing timestamps logged on this channel (on the node
+  // provided in the constructor).
+  virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
+
+  // Returns a writer for writing timestamps delivered over the special
+  // /aos/remote_timestamps/* channels.  node is the node that the timestamps
+  // are forwarded back from.
+  virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
+      const Channel *channel, const Node *node) = 0;
+
+  // Rotates all log files for the provided node.  The provided header will be
+  // modified and written per WriteHeader above.
+  virtual void Rotate(
+      const Node *node,
+      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
+
+  // Returns all the nodes that data is being written for.
+  const std::vector<const Node *> &nodes() const { return nodes_; }
+
+  // Returns the node the logger is running on.
+  const Node *node() const { return node_; }
+
+ protected:
+  // Modifies the header to have the provided UUID and part id.
+  void UpdateHeader(
+      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+      const UUID &uuid, int part_id) const;
+
+  const Node *const node_;
+  std::vector<const Node *> nodes_;
+};
+
+// Local log namer is a simple version which only names things
+// "base_name.part#.bfbs" and increments the part number.  It doesn't support
+// any other log type.
+class LocalLogNamer : public LogNamer {
+ public:
+  LocalLogNamer(std::string_view base_name, const Node *node)
+      : LogNamer(node),
+        base_name_(base_name),
+        uuid_(UUID::Random()),
+        data_writer_(OpenDataWriter()) {}
+
+  void WriteHeader(
+      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+      const Node *node) override;
+
+  DetachedBufferWriter *MakeWriter(const Channel *channel) override;
+
+  void Rotate(const Node *node,
+              aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+      override;
+
+  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
+
+  DetachedBufferWriter *MakeForwardedTimestampWriter(
+      const Channel * /*channel*/, const Node * /*node*/) override;
+
+ private:
+  // Creates a new data writer with the new part number.
+  std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
+    return std::make_unique<DetachedBufferWriter>(
+        absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
+  }
+
+  const std::string base_name_;
+  const UUID uuid_;
+  size_t part_number_ = 0;
+  std::unique_ptr<DetachedBufferWriter> data_writer_;
+};
+
+// Log namer which uses a config and a base name to name a bunch of files.
+class MultiNodeLogNamer : public LogNamer {
+ public:
+  MultiNodeLogNamer(std::string_view base_name,
+                    const Configuration *configuration, const Node *node);
+
+  void WriteHeader(
+      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
+      const Node *node) override;
+
+  void Rotate(const Node *node,
+              aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
+      override;
+
+  DetachedBufferWriter *MakeWriter(const Channel *channel) override;
+
+  DetachedBufferWriter *MakeForwardedTimestampWriter(
+      const Channel *channel, const Node *node) override;
+
+  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override;
+
+ private:
+  // Files to write remote data to.  We want one per channel.  Maps the channel
+  // to the writer, Node, and part number.
+  struct DataWriter {
+    std::unique_ptr<DetachedBufferWriter> writer = nullptr;
+    const Node *node;
+    size_t part_number = 0;
+    UUID uuid = UUID::Random();
+    std::function<void(const Channel *, DataWriter *)> rotate;
+  };
+
+  // Opens up a writer for timestamps forwarded back.
+  void OpenForwardedTimestampWriter(const Channel *channel,
+                                    DataWriter *data_writer);
+
+  // Opens up a writer for remote data.
+  void OpenWriter(const Channel *channel, DataWriter *data_writer);
+
+  // Opens the main data writer file for this node responsible for data_writer_.
+  std::unique_ptr<DetachedBufferWriter> OpenDataWriter();
+
+  const std::string base_name_;
+  const Configuration *const configuration_;
+  const UUID uuid_;
+
+  size_t part_number_ = 0;
+
+  // File to write both delivery timestamps and local data to.
+  std::unique_ptr<DetachedBufferWriter> data_writer_;
+
+  std::map<const Channel *, DataWriter> data_writers_;
+};
+
+}  // namespace logger
+}  // namespace aos
+
+#endif  // AOS_EVENTS_LOGGING_LOG_NAMER_H_
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 85809b1..c94cb3a 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -35,56 +35,6 @@
 namespace logger {
 namespace chrono = std::chrono;
 
-void LogNamer::UpdateHeader(
-    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
-    const UUID &uuid, int parts_index) {
-  header->mutable_message()->mutate_parts_index(parts_index);
-  CHECK_EQ(uuid.string_view().size(),
-           header->mutable_message()->mutable_parts_uuid()->size());
-  std::copy(uuid.string_view().begin(), uuid.string_view().end(),
-            reinterpret_cast<char *>(
-                header->mutable_message()->mutable_parts_uuid()->Data()));
-}
-
-void MultiNodeLogNamer::WriteHeader(
-    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
-    const Node *node) {
-  if (node == this->node()) {
-    UpdateHeader(header, uuid_, part_number_);
-    data_writer_->WriteSizedFlatbuffer(header->full_span());
-  } else {
-    for (std::pair<const Channel *const, DataWriter> &data_writer :
-         data_writers_) {
-      if (node == data_writer.second.node) {
-        UpdateHeader(header, data_writer.second.uuid,
-                     data_writer.second.part_number);
-        data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
-      }
-    }
-  }
-}
-
-void MultiNodeLogNamer::Rotate(
-    const Node *node,
-    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
-  if (node == this->node()) {
-    ++part_number_;
-    *data_writer_ = std::move(*OpenDataWriter());
-    UpdateHeader(header, uuid_, part_number_);
-    data_writer_->WriteSizedFlatbuffer(header->full_span());
-  } else {
-    for (std::pair<const Channel *const, DataWriter> &data_writer :
-         data_writers_) {
-      if (node == data_writer.second.node) {
-        ++data_writer.second.part_number;
-        data_writer.second.rotate(data_writer.first, &data_writer.second);
-        UpdateHeader(header, data_writer.second.uuid,
-                     data_writer.second.part_number);
-        data_writer.second.writer->WriteSizedFlatbuffer(header->full_span());
-      }
-    }
-  }
-}
 
 Logger::Logger(std::string_view base_name, EventLoop *event_loop,
                std::chrono::milliseconds polling_period)
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index d4ed786..6c67130 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -12,6 +12,7 @@
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/eigen_mpq.h"
+#include "aos/events/logging/log_namer.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/logging/uuid.h"
@@ -25,261 +26,6 @@
 namespace aos {
 namespace logger {
 
-class LogNamer {
- public:
-  LogNamer(const Node *node) : node_(node) { nodes_.emplace_back(node_); }
-  virtual ~LogNamer() {}
-
-  virtual void WriteHeader(
-      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
-      const Node *node) = 0;
-  virtual DetachedBufferWriter *MakeWriter(const Channel *channel) = 0;
-
-  virtual DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) = 0;
-  virtual DetachedBufferWriter *MakeForwardedTimestampWriter(
-      const Channel *channel, const Node *node) = 0;
-  virtual void Rotate(
-      const Node *node,
-      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) = 0;
-  const std::vector<const Node *> &nodes() const { return nodes_; }
-
-  const Node *node() const { return node_; }
-
- protected:
-  void UpdateHeader(
-      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
-      const UUID &uuid, int part_id);
-
-  const Node *const node_;
-  std::vector<const Node *> nodes_;
-};
-
-class LocalLogNamer : public LogNamer {
- public:
-  LocalLogNamer(std::string_view base_name, const Node *node)
-      : LogNamer(node),
-        base_name_(base_name),
-        uuid_(UUID::Random()),
-        data_writer_(OpenDataWriter()) {}
-
-  void WriteHeader(
-      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
-      const Node *node) override {
-    CHECK_EQ(node, this->node());
-    UpdateHeader(header, uuid_, part_number_);
-    data_writer_->WriteSizedFlatbuffer(header->full_span());
-  }
-
-  DetachedBufferWriter *MakeWriter(const Channel *channel) override {
-    CHECK(configuration::ChannelIsSendableOnNode(channel, node()));
-    return data_writer_.get();
-  }
-
-  void Rotate(const Node *node,
-              aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
-      override {
-    CHECK(node == this->node());
-    ++part_number_;
-    *data_writer_ = std::move(*OpenDataWriter());
-    UpdateHeader(header, uuid_, part_number_);
-    data_writer_->WriteSizedFlatbuffer(header->full_span());
-  }
-
-  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
-    CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
-        << ": Message is not delivered to this node.";
-    CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
-    CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
-                                                              node_))
-        << ": Delivery times aren't logged for this channel on this node.";
-    return data_writer_.get();
-  }
-
-  DetachedBufferWriter *MakeForwardedTimestampWriter(
-      const Channel * /*channel*/, const Node * /*node*/) override {
-    LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
-    return nullptr;
-  }
-
- private:
-  std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
-    return std::make_unique<DetachedBufferWriter>(
-        absl::StrCat(base_name_, ".part", part_number_, ".bfbs"));
-  }
-  const std::string base_name_;
-  const UUID uuid_;
-  size_t part_number_ = 0;
-  std::unique_ptr<DetachedBufferWriter> data_writer_;
-};
-
-// TODO(austin): Split naming files from making files so we can re-use the
-// naming code to predict the log file names for a provided base name.
-class MultiNodeLogNamer : public LogNamer {
- public:
-  MultiNodeLogNamer(std::string_view base_name,
-                    const Configuration *configuration, const Node *node)
-      : LogNamer(node),
-        base_name_(base_name),
-        configuration_(configuration),
-        uuid_(UUID::Random()),
-        data_writer_(OpenDataWriter()) {}
-
-  // Writes the header to all log files for a specific node.  This function
-  // needs to be called after all the writers are created.
-  void WriteHeader(
-      aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
-      const Node *node) override;
-
-  void Rotate(const Node *node,
-              aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header)
-      override;
-
-  // Makes a data logger for a specific channel.
-  DetachedBufferWriter *MakeWriter(const Channel *channel) override {
-    // See if we can read the data on this node at all.
-    const bool is_readable =
-        configuration::ChannelIsReadableOnNode(channel, this->node());
-    if (!is_readable) {
-      return nullptr;
-    }
-
-    // Then, see if we are supposed to log the data here.
-    const bool log_message =
-        configuration::ChannelMessageIsLoggedOnNode(channel, this->node());
-
-    if (!log_message) {
-      return nullptr;
-    }
-
-    // Now, sort out if this is data generated on this node, or not.  It is
-    // generated if it is sendable on this node.
-    if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
-      return data_writer_.get();
-    }
-
-    // Ok, we have data that is being forwarded to us that we are supposed to
-    // log.  It needs to be logged with send timestamps, but be sorted enough
-    // to be able to be processed.
-    CHECK(data_writers_.find(channel) == data_writers_.end());
-
-    // Track that this node is being logged.
-    const Node *source_node = configuration::GetNode(
-        configuration_, channel->source_node()->string_view());
-
-    if (std::find(nodes_.begin(), nodes_.end(), source_node) == nodes_.end()) {
-      nodes_.emplace_back(source_node);
-    }
-
-    DataWriter data_writer;
-    data_writer.node = source_node;
-    data_writer.rotate = [this](const Channel *channel,
-                                DataWriter *data_writer) {
-      OpenWriter(channel, data_writer);
-    };
-    data_writer.rotate(channel, &data_writer);
-
-    return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
-        .first->second.writer.get();
-  }
-
-  DetachedBufferWriter *MakeForwardedTimestampWriter(
-      const Channel *channel, const Node *node) override {
-    // See if we can read the data on this node at all.
-    const bool is_readable =
-        configuration::ChannelIsReadableOnNode(channel, this->node());
-    CHECK(is_readable) << ": "
-                       << configuration::CleanedChannelToString(channel);
-
-    CHECK(data_writers_.find(channel) == data_writers_.end());
-
-    if (std::find(nodes_.begin(), nodes_.end(), node) == nodes_.end()) {
-      nodes_.emplace_back(node);
-    }
-
-    DataWriter data_writer;
-    data_writer.node = node;
-    data_writer.rotate = [this](const Channel *channel,
-                                DataWriter *data_writer) {
-      OpenForwardedTimestampWriter(channel, data_writer);
-    };
-    data_writer.rotate(channel, &data_writer);
-
-    return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
-        .first->second.writer.get();
-  }
-
-  // Makes a timestamp (or timestamp and data) logger for a channel and
-  // forwarding connection.
-  DetachedBufferWriter *MakeTimestampWriter(const Channel *channel) override {
-    const bool log_delivery_times =
-        (this->node() == nullptr)
-            ? false
-            : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-                  channel, this->node(), this->node());
-    if (!log_delivery_times) {
-      return nullptr;
-    }
-
-    return data_writer_.get();
-  }
-
-  const std::vector<const Node *> &nodes() const { return nodes_; }
-
- private:
-  // Files to write remote data to.  We want one per channel.  Maps the channel
-  // to the writer, Node, and part number.
-  struct DataWriter {
-    std::unique_ptr<DetachedBufferWriter> writer = nullptr;
-    const Node *node;
-    size_t part_number = 0;
-    UUID uuid = UUID::Random();
-    std::function<void(const Channel *, DataWriter *)> rotate;
-  };
-
-  void OpenForwardedTimestampWriter(const Channel *channel,
-                                    DataWriter *data_writer) {
-    std::string filename =
-        absl::StrCat(base_name_, "_timestamps", channel->name()->string_view(),
-                     "/", channel->type()->string_view(), ".part",
-                     data_writer->part_number, ".bfbs");
-
-    if (!data_writer->writer) {
-      data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
-    } else {
-      *data_writer->writer = DetachedBufferWriter(filename);
-    }
-  }
-
-  void OpenWriter(const Channel *channel, DataWriter *data_writer) {
-    const std::string filename = absl::StrCat(
-        base_name_, "_", channel->source_node()->string_view(), "_data",
-        channel->name()->string_view(), "/", channel->type()->string_view(),
-        ".part", data_writer->part_number, ".bfbs");
-    if (!data_writer->writer) {
-      data_writer->writer = std::make_unique<DetachedBufferWriter>(filename);
-    } else {
-      *data_writer->writer = DetachedBufferWriter(filename);
-    }
-  }
-
-  std::unique_ptr<DetachedBufferWriter> OpenDataWriter() {
-    return std::make_unique<DetachedBufferWriter>(
-        absl::StrCat(base_name_, "_", node()->name()->string_view(),
-                     "_data.part", part_number_, ".bfbs"));
-  }
-
-  const std::string base_name_;
-  const Configuration *const configuration_;
-  const UUID uuid_;
-
-  size_t part_number_ = 0;
-
-  // File to write both delivery timestamps and local data to.
-  std::unique_ptr<DetachedBufferWriter> data_writer_;
-
-  std::map<const Channel *, DataWriter> data_writers_;
-};
-
 // Logs all channels available in the event loop to disk every 100 ms.
 // Start by logging one message per channel to capture any state and
 // configuration that is sent rately on a channel and would affect execution.