Split DetachedBufferWriter into a separate file.

logger.h is too big.  Let's pull out some of the concepts, especially
since we want to reuse them for multiple log files.  This is the first
patch of a couple.

Change-Id: Ic8065fde8284ea68c4869777b9a8237eff987866
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 80d8798..93caf31 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -12,8 +12,14 @@
 
 cc_library(
     name = "logger",
-    srcs = ["logger.cc"],
-    hdrs = ["logger.h"],
+    srcs = [
+        "logfile_utils.cc",
+        "logger.cc",
+    ],
+    hdrs = [
+        "logfile_utils.h",
+        "logger.h",
+    ],
     visibility = ["//visibility:public"],
     deps = [
         ":logger_fbs",
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
new file mode 100644
index 0000000..dc1801d
--- /dev/null
+++ b/aos/events/logging/logfile_utils.cc
@@ -0,0 +1,120 @@
+#include "aos/events/logging/logfile_utils.h"
+
+#include <fcntl.h>
+#include <limits.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+
+#include <vector>
+
+#include "aos/events/logging/logger_generated.h"
+#include "flatbuffers/flatbuffers.h"
+
+DEFINE_int32(flush_size, 1000000,
+             "Number of outstanding bytes to allow before flushing to disk.");
+
+namespace aos {
+namespace logger {
+
+DetachedBufferWriter::DetachedBufferWriter(std::string_view filename)
+    : fd_(open(std::string(filename).c_str(),
+               O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
+  PCHECK(fd_ != -1) << ": Failed to open " << filename;
+}
+
+DetachedBufferWriter::~DetachedBufferWriter() {
+  Flush();
+  PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+    flatbuffers::FlatBufferBuilder *fbb) {
+  QueueSizedFlatbuffer(fbb->Release());
+}
+
+void DetachedBufferWriter::QueueSizedFlatbuffer(
+    flatbuffers::DetachedBuffer &&buffer) {
+  queued_size_ += buffer.size();
+  queue_.emplace_back(std::move(buffer));
+
+  // Flush if we are at the max number of iovs per writev, or have written
+  // enough data.  Otherwise writev will fail with an invalid argument.
+  if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
+      queue_.size() == IOV_MAX) {
+    Flush();
+  }
+}
+
+void DetachedBufferWriter::Flush() {
+  if (queue_.size() == 0u) {
+    return;
+  }
+  iovec_.clear();
+  iovec_.reserve(queue_.size());
+  size_t counted_size = 0;
+  for (size_t i = 0; i < queue_.size(); ++i) {
+    struct iovec n;
+    n.iov_base = queue_[i].data();
+    n.iov_len = queue_[i].size();
+    counted_size += n.iov_len;
+    iovec_.emplace_back(std::move(n));
+  }
+  CHECK_EQ(counted_size, queued_size_);
+  const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
+
+  PCHECK(written == static_cast<ssize_t>(queued_size_))
+      << ": Wrote " << written << " expected " << queued_size_;
+
+  queued_size_ = 0;
+  queue_.clear();
+  // TODO(austin): Handle partial writes in some way other than crashing...
+}
+
+flatbuffers::Offset<MessageHeader> PackMessage(
+    flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+    int channel_index, LogType log_type) {
+  flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
+
+  switch (log_type) {
+    case LogType::kLogMessage:
+    case LogType::kLogMessageAndDeliveryTime:
+      data_offset =
+          fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
+      break;
+
+    case LogType::kLogDeliveryTimeOnly:
+      break;
+  }
+
+  MessageHeader::Builder message_header_builder(*fbb);
+  message_header_builder.add_channel_index(channel_index);
+  message_header_builder.add_queue_index(context.queue_index);
+  message_header_builder.add_monotonic_sent_time(
+      context.monotonic_event_time.time_since_epoch().count());
+  message_header_builder.add_realtime_sent_time(
+      context.realtime_event_time.time_since_epoch().count());
+
+  switch (log_type) {
+    case LogType::kLogMessage:
+      message_header_builder.add_data(data_offset);
+      break;
+
+    case LogType::kLogMessageAndDeliveryTime:
+      message_header_builder.add_data(data_offset);
+      [[fallthrough]];
+
+    case LogType::kLogDeliveryTimeOnly:
+      message_header_builder.add_monotonic_remote_time(
+          context.monotonic_remote_time.time_since_epoch().count());
+      message_header_builder.add_realtime_remote_time(
+          context.realtime_remote_time.time_since_epoch().count());
+      message_header_builder.add_remote_queue_index(context.remote_queue_index);
+      break;
+  }
+
+  return message_header_builder.Finish();
+}
+
+}  // namespace logger
+}  // namespace aos
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
new file mode 100644
index 0000000..0892290
--- /dev/null
+++ b/aos/events/logging/logfile_utils.h
@@ -0,0 +1,78 @@
+#ifndef AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
+#define AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
+
+#include <sys/uio.h>
+
+#include <string_view>
+#include <vector>
+
+#include "aos/events/event_loop.h"
+#include "aos/events/logging/logger_generated.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+enum class LogType : uint8_t {
+  // The message originated on this node and should be logged here.
+  kLogMessage,
+  // The message originated on another node, but only the delivery times are
+  // logged here.
+  kLogDeliveryTimeOnly,
+  // The message originated on another node. Log it and the delivery times
+  // together.  The message_gateway is responsible for logging any messages
+  // which didn't get delivered.
+  kLogMessageAndDeliveryTime
+};
+
+
+// This class manages efficiently writing a sequence of detached buffers to a
+// file.  It queues them up and batches the write operation.
+class DetachedBufferWriter {
+ public:
+  DetachedBufferWriter(std::string_view filename);
+  ~DetachedBufferWriter();
+
+  // TODO(austin): Snappy compress the log file if it ends with .snappy!
+
+  // Queues up a finished FlatBufferBuilder to be written.  Steals the detached
+  // buffer from it.
+  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
+  // Queues up a detached buffer directly.
+  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
+
+  // Triggers data to be provided to the kernel and written.
+  void Flush();
+
+ private:
+  int fd_ = -1;
+
+  // Size of all the data in the queue.
+  size_t queued_size_ = 0;
+
+  // List of buffers to flush.
+  std::vector<flatbuffers::DetachedBuffer> queue_;
+  // List of iovecs to use with writev.  This is a member variable to avoid
+  // churn.
+  std::vector<struct iovec> iovec_;
+};
+
+// Packes a message pointed to by the context into a MessageHeader.
+flatbuffers::Offset<MessageHeader> PackMessage(
+    flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+    int channel_index, LogType log_type);
+
+// TODO(austin): 3 objects:
+// 1) log chunk reader.  Returns span.
+// 2) Sorted message header reader.  Returns sorted messages.
+// 3) LogReader, which does all the registration.
+//
+// Then, we can do a multi-node sim which forwards data nicely, try logging it, and then try replaying it.
+
+// Optimization:
+//   Allocate the 256k blocks like we do today.  But, refcount them with shared_ptr pointed to by the messageheader that is returned.  This avoids the copy.
+
+}  // namespace logger
+}  // namespace aos
+
+#endif  // AOS_EVENTS_LOGGING_LOGFILE_UTILS_H_
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index b247944..e7d0d3b 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -16,8 +16,6 @@
 #include "aos/time/time.h"
 #include "flatbuffers/flatbuffers.h"
 
-DEFINE_int32(flush_size, 1000000,
-             "Number of outstanding bytes to allow before flushing to disk.");
 DEFINE_bool(skip_missing_forwarding_entries, false,
             "If true, drop any forwarding entries with missing data.  If "
             "false, CHECK.");
@@ -27,60 +25,6 @@
 
 namespace chrono = std::chrono;
 
-DetachedBufferWriter::DetachedBufferWriter(absl::string_view filename)
-    : fd_(open(std::string(filename).c_str(),
-               O_RDWR | O_CLOEXEC | O_CREAT | O_EXCL, 0774)) {
-  PCHECK(fd_ != -1) << ": Failed to open " << filename;
-}
-
-DetachedBufferWriter::~DetachedBufferWriter() {
-  Flush();
-  PLOG_IF(ERROR, close(fd_) == -1) << " Failed to close logfile";
-}
-
-void DetachedBufferWriter::QueueSizedFlatbuffer(
-    flatbuffers::FlatBufferBuilder *fbb) {
-  QueueSizedFlatbuffer(fbb->Release());
-}
-
-void DetachedBufferWriter::QueueSizedFlatbuffer(
-    flatbuffers::DetachedBuffer &&buffer) {
-  queued_size_ += buffer.size();
-  queue_.emplace_back(std::move(buffer));
-
-  // Flush if we are at the max number of iovs per writev, or have written
-  // enough data.  Otherwise writev will fail with an invalid argument.
-  if (queued_size_ > static_cast<size_t>(FLAGS_flush_size) ||
-      queue_.size() == IOV_MAX) {
-    Flush();
-  }
-}
-
-void DetachedBufferWriter::Flush() {
-  if (queue_.size() == 0u) {
-    return;
-  }
-  iovec_.clear();
-  iovec_.reserve(queue_.size());
-  size_t counted_size = 0;
-  for (size_t i = 0; i < queue_.size(); ++i) {
-    struct iovec n;
-    n.iov_base = queue_[i].data();
-    n.iov_len = queue_[i].size();
-    counted_size += n.iov_len;
-    iovec_.emplace_back(std::move(n));
-  }
-  CHECK_EQ(counted_size, queued_size_);
-  const ssize_t written = writev(fd_, iovec_.data(), iovec_.size());
-
-  PCHECK(written == static_cast<ssize_t>(queued_size_))
-      << ": Wrote " << written << " expected " << queued_size_;
-
-  queued_size_ = 0;
-  queue_.clear();
-  // TODO(austin): Handle partial writes in some way other than crashing...
-}
-
 Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
                std::chrono::milliseconds polling_period)
     : event_loop_(event_loop),
@@ -208,51 +152,6 @@
   });
 }
 
-flatbuffers::Offset<MessageHeader> PackMessage(
-    flatbuffers::FlatBufferBuilder *fbb, const Context &context,
-    int channel_index, LogType log_type) {
-  flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
-
-  switch(log_type) {
-    case LogType::kLogMessage:
-    case LogType::kLogMessageAndDeliveryTime:
-      data_offset =
-          fbb->CreateVector(static_cast<uint8_t *>(context.data), context.size);
-      break;
-
-    case LogType::kLogDeliveryTimeOnly:
-      break;
-  }
-
-  MessageHeader::Builder message_header_builder(*fbb);
-  message_header_builder.add_channel_index(channel_index);
-  message_header_builder.add_queue_index(context.queue_index);
-  message_header_builder.add_monotonic_sent_time(
-      context.monotonic_event_time.time_since_epoch().count());
-  message_header_builder.add_realtime_sent_time(
-      context.realtime_event_time.time_since_epoch().count());
-
-  switch (log_type) {
-    case LogType::kLogMessage:
-      message_header_builder.add_data(data_offset);
-      break;
-
-    case LogType::kLogMessageAndDeliveryTime:
-      message_header_builder.add_data(data_offset);
-      [[fallthrough]];
-
-    case LogType::kLogDeliveryTimeOnly:
-      message_header_builder.add_monotonic_remote_time(
-          context.monotonic_remote_time.time_since_epoch().count());
-      message_header_builder.add_realtime_remote_time(
-          context.realtime_remote_time.time_since_epoch().count());
-      message_header_builder.add_remote_queue_index(context.remote_queue_index);
-      break;
-  }
-
-  return message_header_builder.Finish();
-}
-
 void Logger::DoLogData() {
   // We want to guarentee that messages aren't out of order by more than
   // max_out_of_order_duration.  To do this, we need sync points.  Every write
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index d44a885..e44f65f 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -7,6 +7,7 @@
 #include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/time/time.h"
@@ -15,49 +16,6 @@
 namespace aos {
 namespace logger {
 
-// This class manages efficiently writing a sequence of detached buffers to a
-// file.  It queues them up and batches the write operation.
-class DetachedBufferWriter {
- public:
-  DetachedBufferWriter(absl::string_view filename);
-  ~DetachedBufferWriter();
-
-  // TODO(austin): Snappy compress the log file if it ends with .snappy!
-
-  // Queues up a finished FlatBufferBuilder to be written.  Steals the detached
-  // buffer from it.
-  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb);
-  // Queues up a detached buffer directly.
-  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer);
-
-  // Triggers data to be provided to the kernel and written.
-  void Flush();
-
- private:
-  int fd_ = -1;
-
-  // Size of all the data in the queue.
-  size_t queued_size_ = 0;
-
-  // List of buffers to flush.
-  std::vector<flatbuffers::DetachedBuffer> queue_;
-  // List of iovecs to use with writev.  This is a member variable to avoid
-  // churn.
-  std::vector<struct iovec> iovec_;
-};
-
-enum class LogType : uint8_t {
-  // The message originated on this node and should be logged here.
-  kLogMessage,
-  // The message originated on another node, but only the delivery times are
-  // logged here.
-  kLogDeliveryTimeOnly,
-  // The message originated on another node. Log it and the delivery times
-  // together.  The message_gateway is responsible for logging any messages
-  // which didn't get delivered.
-  kLogMessageAndDeliveryTime
-};
-
 // Logs all channels available in the event loop to disk every 100 ms.
 // Start by logging one message per channel to capture any state and
 // configuration that is sent rately on a channel and would affect execution.
@@ -98,11 +56,6 @@
   size_t max_header_size_ = 0;
 };
 
-// Packes a message pointed to by the context into a MessageHeader.
-flatbuffers::Offset<MessageHeader> PackMessage(
-    flatbuffers::FlatBufferBuilder *fbb, const Context &context,
-    int channel_index, LogType log_type);
-
 // Replays all the channels in the logfile to the event loop.
 class LogReader {
  public: