Wrap DetachedBufferWriter in an object for logging

We have 2 use cases for more careful control of the
DetachedBufferWriter.  We are seeing headers written twice, and need to
start tracking boot UUID per node per log file.  Both would benifit from
an object which can control and track how the log files are being
written.

This should be purely a refactor.  Follow-up commits will change how
headers are written and track UUIDs.

Change-Id: Ib5c1ed8f6cdda5e9ac4ec2cd4034d7cc61d61478
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index 6cf91db..166e505 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -30,24 +30,23 @@
     aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
     const Node *node) {
   CHECK_EQ(node, this->node());
-  UpdateHeader(header, uuid_, part_number_);
-  data_writer_->QueueSpan(header->span());
+  UpdateHeader(header, data_writer_.uuid(), data_writer_.part_number);
+  data_writer_.writer->QueueSpan(header->span());
 }
 
-DetachedBufferWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
+NewDataWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
   CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
       << ": " << configuration::CleanedChannelToString(channel);
-  return data_writer_.get();
+  return &data_writer_;
 }
 
 void LocalLogNamer::Rotate(
     const Node *node,
     aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header) {
   CHECK(node == this->node());
-  ++part_number_;
-  *data_writer_ = std::move(*OpenDataWriter());
-  UpdateHeader(header, uuid_, part_number_);
-  data_writer_->QueueSpan(header->span());
+  data_writer_.Rotate();
+  UpdateHeader(header, data_writer_.uuid(), data_writer_.part_number);
+  data_writer_.writer->QueueSpan(header->span());
 }
 
 void LocalLogNamer::WriteConfiguration(
@@ -67,18 +66,17 @@
   LOG(FATAL) << "Can't reboot a single node.";
 }
 
-DetachedBufferWriter *LocalLogNamer::MakeTimestampWriter(
-    const Channel *channel) {
+NewDataWriter *LocalLogNamer::MakeTimestampWriter(const Channel *channel) {
   CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
       << ": Message is not delivered to this node.";
   CHECK(node_ != nullptr) << ": Can't log timestamps in a single node world";
   CHECK(configuration::ConnectionDeliveryTimeIsLoggedOnNode(channel, node_,
                                                             node_))
       << ": Delivery times aren't logged for this channel on this node.";
-  return data_writer_.get();
+  return &data_writer_;
 }
 
-DetachedBufferWriter *LocalLogNamer::MakeForwardedTimestampWriter(
+NewDataWriter *LocalLogNamer::MakeForwardedTimestampWriter(
     const Channel * /*channel*/, const Node * /*node*/) {
   LOG(FATAL) << "Can't log forwarded timestamps in a singe log file.";
   return nullptr;
@@ -103,16 +101,16 @@
     aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
     const Node *node) {
   if (node == this->node()) {
-    if (!data_writer_.writer) {
+    if (!data_writer_) {
       OpenDataWriter();
     }
-    UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
-    data_writer_.writer->QueueSpan(header->span());
+    UpdateHeader(header, data_writer_->uuid(), data_writer_->part_number);
+    data_writer_->writer->QueueSpan(header->span());
   } else {
-    for (std::pair<const Channel *const, DataWriter> &data_writer :
+    for (std::pair<const Channel *const, NewDataWriter> &data_writer :
          data_writers_) {
       if (node == data_writer.second.node) {
-        UpdateHeader(header, data_writer.second.uuid,
+        UpdateHeader(header, data_writer.second.uuid(),
                      data_writer.second.part_number);
         data_writer.second.writer->QueueSpan(header->span());
       }
@@ -137,25 +135,27 @@
     aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
     bool reboot) {
   if (node == this->node()) {
-    if (data_writer_.writer) {
+    if (data_writer_) {
       if (reboot) {
-        data_writer_.uuid = UUID::Random();
+        data_writer_->Reboot();
+      } else {
+        data_writer_->Rotate();
       }
-      ++data_writer_.part_number;
+      // TODO(austin): Move this logic down once we have a better ownership
+      // model for the header.
+      UpdateHeader(header, data_writer_->uuid(), data_writer_->part_number);
+      data_writer_->writer->QueueSpan(header->span());
     }
-    OpenDataWriter();
-    UpdateHeader(header, data_writer_.uuid, data_writer_.part_number);
-    data_writer_.writer->QueueSpan(header->span());
   } else {
-    for (std::pair<const Channel *const, DataWriter> &data_writer :
+    for (std::pair<const Channel *const, NewDataWriter> &data_writer :
          data_writers_) {
       if (node == data_writer.second.node) {
         if (reboot) {
-          data_writer.second.uuid = UUID::Random();
+          data_writer.second.Reboot();
+        } else {
+          data_writer.second.Rotate();
         }
-        ++data_writer.second.part_number;
-        data_writer.second.rotate(data_writer.first, &data_writer.second);
-        UpdateHeader(header, data_writer.second.uuid,
+        UpdateHeader(header, data_writer.second.uuid(),
                      data_writer.second.part_number);
         data_writer.second.writer->QueueSpan(header->span());
       }
@@ -185,7 +185,7 @@
   CloseWriter(&writer);
 }
 
-DetachedBufferWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
+NewDataWriter *MultiNodeLogNamer::MakeWriter(const Channel *channel) {
   // See if we can read the data on this node at all.
   const bool is_readable =
       configuration::ChannelIsReadableOnNode(channel, this->node());
@@ -204,10 +204,10 @@
   // Now, sort out if this is data generated on this node, or not.  It is
   // generated if it is sendable on this node.
   if (configuration::ChannelIsSendableOnNode(channel, this->node())) {
-    if (!data_writer_.writer) {
+    if (!data_writer_) {
       OpenDataWriter();
     }
-    return data_writer_.writer.get();
+    return data_writer_.get();
   }
 
   // Ok, we have data that is being forwarded to us that we are supposed to
@@ -223,18 +223,20 @@
     nodes_.emplace_back(source_node);
   }
 
-  DataWriter data_writer;
+  NewDataWriter data_writer(
+      [this, channel](NewDataWriter *data_writer) {
+        OpenWriter(channel, data_writer);
+      },
+      [this](NewDataWriter *data_writer) {
+        CloseWriter(&data_writer->writer);
+      });
   data_writer.node = source_node;
-  data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
-    OpenWriter(channel, data_writer);
-  };
-  data_writer.rotate(channel, &data_writer);
 
-  return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
-      .first->second.writer.get();
+  return &(
+      data_writers_.emplace(channel, std::move(data_writer)).first->second);
 }
 
-DetachedBufferWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
+NewDataWriter *MultiNodeLogNamer::MakeForwardedTimestampWriter(
     const Channel *channel, const Node *node) {
   // See if we can read the data on this node at all.
   const bool is_readable =
@@ -247,19 +249,20 @@
     nodes_.emplace_back(node);
   }
 
-  DataWriter data_writer;
+  NewDataWriter data_writer(
+      [this, channel](NewDataWriter *data_writer) {
+        OpenForwardedTimestampWriter(channel, data_writer);
+      },
+      [this](NewDataWriter *data_writer) {
+        CloseWriter(&data_writer->writer);
+      });
   data_writer.node = node;
-  data_writer.rotate = [this](const Channel *channel, DataWriter *data_writer) {
-    OpenForwardedTimestampWriter(channel, data_writer);
-  };
-  data_writer.rotate(channel, &data_writer);
 
-  return data_writers_.insert(std::make_pair(channel, std::move(data_writer)))
-      .first->second.writer.get();
+  return &(
+      data_writers_.emplace(channel, std::move(data_writer)).first->second);
 }
 
-DetachedBufferWriter *MultiNodeLogNamer::MakeTimestampWriter(
-    const Channel *channel) {
+NewDataWriter *MultiNodeLogNamer::MakeTimestampWriter(const Channel *channel) {
   bool log_delivery_times = false;
   if (this->node() != nullptr) {
     log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
@@ -269,30 +272,25 @@
     return nullptr;
   }
 
-  if (!data_writer_.writer) {
+  if (!data_writer_) {
     OpenDataWriter();
   }
-  return data_writer_.writer.get();
+  return data_writer_.get();
 }
 
 void MultiNodeLogNamer::Close() {
-  for (std::pair<const Channel *const, DataWriter> &data_writer :
-       data_writers_) {
-    CloseWriter(&data_writer.second.writer);
-    data_writer.second.writer.reset();
-  }
-  CloseWriter(&data_writer_.writer);
-  data_writer_.writer.reset();
+  data_writers_.clear();
+  data_writer_.reset();
 }
 
 void MultiNodeLogNamer::ResetStatistics() {
-  for (std::pair<const Channel *const, DataWriter> &data_writer :
+  for (std::pair<const Channel *const, NewDataWriter> &data_writer :
        data_writers_) {
     if (!data_writer.second.writer) continue;
     data_writer.second.writer->ResetStatistics();
   }
-  if (data_writer_.writer) {
-    data_writer_.writer->ResetStatistics();
+  if (data_writer_) {
+    data_writer_->writer->ResetStatistics();
   }
   max_write_time_ = std::chrono::nanoseconds::zero();
   max_write_time_bytes_ = -1;
@@ -303,8 +301,8 @@
   total_write_bytes_ = 0;
 }
 
-void MultiNodeLogNamer::OpenForwardedTimestampWriter(const Channel *channel,
-                                                     DataWriter *data_writer) {
+void MultiNodeLogNamer::OpenForwardedTimestampWriter(
+    const Channel *channel, NewDataWriter *data_writer) {
   std::string filename =
       absl::StrCat("timestamps", channel->name()->string_view(), "/",
                    channel->type()->string_view(), ".part",
@@ -313,7 +311,7 @@
 }
 
 void MultiNodeLogNamer::OpenWriter(const Channel *channel,
-                                   DataWriter *data_writer) {
+                                   NewDataWriter *data_writer) {
   const std::string filename = absl::StrCat(
       CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
       channel->name()->string_view(), "/", channel->type()->string_view(),
@@ -322,13 +320,21 @@
 }
 
 void MultiNodeLogNamer::OpenDataWriter() {
-  std::string name;
-  if (node() != nullptr) {
-    name = absl::StrCat(name, node()->name()->string_view(), "_");
+  if (!data_writer_) {
+    data_writer_ = std::make_unique<NewDataWriter>(
+        [this](NewDataWriter *writer) {
+          std::string name;
+          if (node() != nullptr) {
+            name = absl::StrCat(name, node()->name()->string_view(), "_");
+          }
+          absl::StrAppend(&name, "data.part", writer->part_number, ".bfbs",
+                          extension_);
+          CreateBufferWriter(name, &writer->writer);
+        },
+        [this](NewDataWriter *data_writer) {
+          CloseWriter(&data_writer->writer);
+        });
   }
-  absl::StrAppend(&name, "data.part", data_writer_.part_number, ".bfbs",
-                  extension_);
-  CreateBufferWriter(name, &data_writer_.writer);
 }
 
 void MultiNodeLogNamer::CreateBufferWriter(