Move reboot and header responsibility into DataWriter

Now that we have a class wrapped around the raw flatbuffer writing to
disk, we can start to make it responsible for tracking and writing the
header.  It can also track reboots.  Each message that is to be written
now needs to be passed in with the corresponding boot UUID.

This removes a ton of code in LogWriter since it is simpler to do it in
DataWriter.  We don't need to detect state and then trigger the right
behavior nearly as much because the bottom layers track it very easily
now and just work.

Side effects here of delaying writing a bunch of stuff is that we
shouldn't write headers with no body because we delay writing the header
until we know the body.  This would potentially be a performance problem
with inline configuration, but with the separate configuration files,
this isn't a problem.

Change-Id: Iec91284fcceeb73e3b6c181aebd2fb7edc7bc1ac
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index 7ff3bc4..ca7fc61 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -17,6 +17,76 @@
 namespace aos {
 namespace logger {
 
+NewDataWriter::NewDataWriter(LogNamer *log_namer, const Node *node,
+                             std::function<void(NewDataWriter *)> reopen,
+                             std::function<void(NewDataWriter *)> close)
+    : node_(node),
+      node_index_(configuration::GetNodeIndex(log_namer->configuration_, node)),
+      log_namer_(log_namer),
+      reopen_(std::move(reopen)),
+      close_(std::move(close)) {
+  reopen_(this);
+}
+
+NewDataWriter::~NewDataWriter() {
+  if (writer) {
+    Close();
+  }
+}
+
+void NewDataWriter::Rotate() {
+  ++parts_index_;
+  reopen_(this);
+  header_written_ = false;
+  QueueHeader(log_namer_->MakeHeader(node_index_, source_node_boot_uuid_,
+                                     parts_uuid(), parts_index_));
+}
+
+void NewDataWriter::Reboot() {
+  parts_uuid_ = UUID::Random();
+  ++parts_index_;
+  reopen_(this);
+  header_written_ = false;
+}
+
+void NewDataWriter::QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb,
+                                         const UUID &source_node_boot_uuid,
+                                         aos::monotonic_clock::time_point now) {
+  // TODO(austin): Handle remote nodes changing too, not just the source node.
+  if (source_node_boot_uuid_ != source_node_boot_uuid) {
+    if (header_written_) {
+      Reboot();
+    }
+
+    QueueHeader(log_namer_->MakeHeader(node_index_, source_node_boot_uuid,
+                                       parts_uuid(), parts_index_));
+  }
+  CHECK_EQ(source_node_boot_uuid_, source_node_boot_uuid);
+  CHECK(header_written_) << ": Attempting to write message before header to "
+                         << writer->filename();
+  writer->QueueSizedFlatbuffer(fbb, now);
+}
+
+void NewDataWriter::QueueHeader(
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> &&header) {
+  CHECK(!header_written_) << ": Attempting to write duplicate header to "
+                          << writer->filename();
+  CHECK(header.message().has_source_node_boot_uuid());
+  source_node_boot_uuid_ =
+      UUID::FromString(header.message().source_node_boot_uuid());
+  // TODO(austin): This triggers a dummy allocation that we don't need as part
+  // of releasing.  Can we skip it?
+  writer->QueueSizedFlatbuffer(header.Release());
+  header_written_ = true;
+}
+
+void NewDataWriter::Close() {
+  CHECK(writer);
+  close_(this);
+  writer.reset();
+  header_written_ = false;
+}
+
 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
     size_t node_index, const UUID &source_node_boot_uuid,
     const UUID &parts_uuid, int parts_index) const {
@@ -146,14 +216,6 @@
   return result;
 }
 
-void LocalLogNamer::WriteHeader(const Node *node) {
-  CHECK_EQ(node, this->node());
-  const size_t node_index = configuration::GetNodeIndex(configuration_, node);
-  data_writer_.QueueHeader(
-      MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
-                 data_writer_.uuid(), data_writer_.part_number));
-}
-
 NewDataWriter *LocalLogNamer::MakeWriter(const Channel *channel) {
   CHECK(configuration::ChannelIsSendableOnNode(channel, node()))
       << ": " << configuration::CleanedChannelToString(channel);
@@ -162,12 +224,7 @@
 
 void LocalLogNamer::Rotate(const Node *node) {
   CHECK(node == this->node());
-  const size_t node_index = configuration::GetNodeIndex(configuration_, node);
   data_writer_.Rotate();
-
-  data_writer_.QueueHeader(
-      MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
-                 data_writer_.uuid(), data_writer_.part_number));
 }
 
 void LocalLogNamer::WriteConfiguration(
@@ -181,11 +238,6 @@
   writer->QueueSizedFlatbuffer(header->Release());
 }
 
-void LocalLogNamer::Reboot(const Node * /*node*/
-) {
-  LOG(FATAL) << "Can't reboot a single node.";
-}
-
 NewDataWriter *LocalLogNamer::MakeTimestampWriter(const Channel *channel) {
   CHECK(configuration::ChannelIsReadableOnNode(channel, node_))
       << ": Message is not delivered to this node.";
@@ -214,63 +266,16 @@
   }
 }
 
-void MultiNodeLogNamer::WriteHeader(const Node *node) {
-  if (node == this->node()) {
-    if (!data_writer_) {
-      OpenDataWriter();
-    }
-
-    const size_t node_index = configuration::GetNodeIndex(configuration_, node);
-    data_writer_->QueueHeader(
-        MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
-                   data_writer_->uuid(), data_writer_->part_number));
-  } else {
-    const size_t node_index = configuration::GetNodeIndex(configuration_, node);
-    for (std::pair<const Channel *const, NewDataWriter> &data_writer :
-         data_writers_) {
-      if (node == data_writer.second.node) {
-        data_writer.second.QueueHeader(MakeHeader(
-            node_index, node_states_[node_index].source_node_boot_uuid,
-            data_writer.second.uuid(), data_writer.second.part_number));
-      }
-    }
-  }
-}
-
-void MultiNodeLogNamer::Rotate(const Node *node) { DoRotate(node, false); }
-
-void MultiNodeLogNamer::Reboot(const Node *node) { DoRotate(node, true); }
-
-void MultiNodeLogNamer::DoRotate(const Node *node, bool reboot) {
+void MultiNodeLogNamer::Rotate(const Node *node) {
   if (node == this->node()) {
     if (data_writer_) {
-      if (reboot) {
-        data_writer_->Reboot();
-      } else {
-        data_writer_->Rotate();
-      }
-      // TODO(austin): Move this logic down once we have a better ownership
-      // model for the header.
-
-      const size_t node_index =
-          configuration::GetNodeIndex(configuration_, node);
-      data_writer_->QueueHeader(
-          MakeHeader(node_index, node_states_[node_index].source_node_boot_uuid,
-                     data_writer_->uuid(), data_writer_->part_number));
+      data_writer_->Rotate();
     }
   } else {
-    const size_t node_index = configuration::GetNodeIndex(configuration_, node);
     for (std::pair<const Channel *const, NewDataWriter> &data_writer :
          data_writers_) {
-      if (node == data_writer.second.node) {
-        if (reboot) {
-          data_writer.second.Reboot();
-        } else {
-          data_writer.second.Rotate();
-        }
-        data_writer.second.QueueHeader(MakeHeader(
-            node_index, node_states_[node_index].source_node_boot_uuid,
-            data_writer.second.uuid(), data_writer.second.part_number));
+      if (node == data_writer.second.node()) {
+        data_writer.second.Rotate();
       }
     }
   }
@@ -336,15 +341,13 @@
     nodes_.emplace_back(source_node);
   }
 
-  NewDataWriter data_writer(
-      [this, channel](NewDataWriter *data_writer) {
-        OpenWriter(channel, data_writer);
-      },
-      [this](NewDataWriter *data_writer) {
-        CloseWriter(&data_writer->writer);
-      });
-  data_writer.node = source_node;
-
+  NewDataWriter data_writer(this, source_node,
+                            [this, channel](NewDataWriter *data_writer) {
+                              OpenWriter(channel, data_writer);
+                            },
+                            [this](NewDataWriter *data_writer) {
+                              CloseWriter(&data_writer->writer);
+                            });
   return &(
       data_writers_.emplace(channel, std::move(data_writer)).first->second);
 }
@@ -362,15 +365,14 @@
     nodes_.emplace_back(node);
   }
 
-  NewDataWriter data_writer(
-      [this, channel](NewDataWriter *data_writer) {
-        OpenForwardedTimestampWriter(channel, data_writer);
-      },
-      [this](NewDataWriter *data_writer) {
-        CloseWriter(&data_writer->writer);
-      });
-  data_writer.node = node;
-
+  NewDataWriter data_writer(this, node,
+                            [this, channel](NewDataWriter *data_writer) {
+                              OpenForwardedTimestampWriter(channel,
+                                                           data_writer);
+                            },
+                            [this](NewDataWriter *data_writer) {
+                              CloseWriter(&data_writer->writer);
+                            });
   return &(
       data_writers_.emplace(channel, std::move(data_writer)).first->second);
 }
@@ -419,7 +421,7 @@
   std::string filename =
       absl::StrCat("timestamps", channel->name()->string_view(), "/",
                    channel->type()->string_view(), ".part",
-                   data_writer->part_number, ".bfbs", extension_);
+                   data_writer->parts_index(), ".bfbs", extension_);
   CreateBufferWriter(filename, &data_writer->writer);
 }
 
@@ -428,19 +430,20 @@
   const std::string filename = absl::StrCat(
       CHECK_NOTNULL(channel->source_node())->string_view(), "_data",
       channel->name()->string_view(), "/", channel->type()->string_view(),
-      ".part", data_writer->part_number, ".bfbs", extension_);
+      ".part", data_writer->parts_index(), ".bfbs", extension_);
   CreateBufferWriter(filename, &data_writer->writer);
 }
 
 void MultiNodeLogNamer::OpenDataWriter() {
   if (!data_writer_) {
     data_writer_ = std::make_unique<NewDataWriter>(
+        this, node_,
         [this](NewDataWriter *writer) {
           std::string name;
           if (node() != nullptr) {
             name = absl::StrCat(name, node()->name()->string_view(), "_");
           }
-          absl::StrAppend(&name, "data.part", writer->part_number, ".bfbs",
+          absl::StrAppend(&name, "data.part", writer->parts_index(), ".bfbs",
                           extension_);
           CreateBufferWriter(name, &writer->writer);
         },