Split logger.h into log_reader and log_writer

Much simpler!

Change-Id: I6c4ee363b56b67dac40c456261bbed79d01b8eb6
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
new file mode 100644
index 0000000..cb0f7c6
--- /dev/null
+++ b/aos/events/logging/log_writer.cc
@@ -0,0 +1,837 @@
+#include "aos/events/logging/log_writer.h"
+
+#include <functional>
+#include <map>
+#include <vector>
+
+#include "aos/configuration.h"
+#include "aos/events/event_loop.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/team_number.h"
+
+namespace aos {
+namespace logger {
+namespace {
+using message_bridge::RemoteMessage;
+}  // namespace
+
+Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
+               std::function<bool(const Channel *)> should_log)
+    : event_loop_(event_loop),
+      configuration_(configuration),
+      name_(network::GetHostname()),
+      timer_handler_(event_loop_->AddTimer(
+          [this]() { DoLogData(event_loop_->monotonic_now()); })),
+      server_statistics_fetcher_(
+          configuration::MultiNode(event_loop_->configuration())
+              ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
+                    "/aos")
+              : aos::Fetcher<message_bridge::ServerStatistics>()) {
+  VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
+
+  // Find all the nodes which are logging timestamps on our node.  This may
+  // over-estimate if should_log is specified.
+  std::vector<const Node *> timestamp_logger_nodes =
+      configuration::TimestampNodes(configuration_, event_loop_->node());
+
+  std::map<const Channel *, const Node *> timestamp_logger_channels;
+
+  // Now that we have all the nodes accumulated, make remote timestamp loggers
+  // for them.
+  for (const Node *node : timestamp_logger_nodes) {
+    // Note: since we are doing a find using the event loop channel, we need to
+    // make sure this channel pointer is part of the event loop configuration,
+    // not configuration_.  This only matters when configuration_ !=
+    // event_loop->configuration();
+    const Channel *channel = configuration::GetChannel(
+        event_loop->configuration(),
+        absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+        RemoteMessage::GetFullyQualifiedName(), event_loop_->name(),
+        event_loop_->node());
+
+    CHECK(channel != nullptr)
+        << ": Remote timestamps are logged on "
+        << event_loop_->node()->name()->string_view()
+        << " but can't find channel /aos/remote_timestamps/"
+        << node->name()->string_view();
+    if (!should_log(channel)) {
+      continue;
+    }
+    timestamp_logger_channels.insert(std::make_pair(channel, node));
+  }
+
+  const size_t our_node_index =
+      configuration::GetNodeIndex(configuration_, event_loop_->node());
+
+  for (size_t channel_index = 0;
+       channel_index < configuration_->channels()->size(); ++channel_index) {
+    const Channel *const config_channel =
+        configuration_->channels()->Get(channel_index);
+    // The MakeRawFetcher method needs a channel which is in the event loop
+    // configuration() object, not the configuration_ object.  Go look that up
+    // from the config.
+    const Channel *channel = aos::configuration::GetChannel(
+        event_loop_->configuration(), config_channel->name()->string_view(),
+        config_channel->type()->string_view(), "", event_loop_->node());
+    CHECK(channel != nullptr)
+        << ": Failed to look up channel "
+        << aos::configuration::CleanedChannelToString(config_channel);
+    if (!should_log(channel)) {
+      continue;
+    }
+
+    FetcherStruct fs;
+    fs.channel_index = channel_index;
+    fs.channel = channel;
+
+    const bool is_local =
+        configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
+
+    const bool is_readable =
+        configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
+    const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
+        channel, event_loop_->node());
+    const bool log_message = is_logged && is_readable;
+
+    bool log_delivery_times = false;
+    if (event_loop_->node() != nullptr) {
+      log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+          channel, event_loop_->node(), event_loop_->node());
+    }
+
+    // Now, detect a RemoteMessage timestamp logger where we should just log the
+    // contents to a file directly.
+    const bool log_contents = timestamp_logger_channels.find(channel) !=
+                              timestamp_logger_channels.end();
+
+    if (log_message || log_delivery_times || log_contents) {
+      fs.fetcher = event_loop->MakeRawFetcher(channel);
+      VLOG(1) << "Logging channel "
+              << configuration::CleanedChannelToString(channel);
+
+      if (log_delivery_times) {
+        VLOG(1) << "  Delivery times";
+        fs.wants_timestamp_writer = true;
+        fs.timestamp_node_index = our_node_index;
+      }
+      if (log_message) {
+        VLOG(1) << "  Data";
+        fs.wants_writer = true;
+        if (!is_local) {
+          const Node *source_node = configuration::GetNode(
+              configuration_, channel->source_node()->string_view());
+          fs.data_node_index =
+              configuration::GetNodeIndex(configuration_, source_node);
+          fs.log_type = LogType::kLogRemoteMessage;
+        } else {
+          fs.data_node_index = our_node_index;
+        }
+      }
+      if (log_contents) {
+        VLOG(1) << "Timestamp logger channel "
+                << configuration::CleanedChannelToString(channel);
+        fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
+        fs.wants_contents_writer = true;
+        fs.contents_node_index =
+            configuration::GetNodeIndex(configuration_, fs.timestamp_node);
+      }
+      fetchers_.emplace_back(std::move(fs));
+    }
+  }
+
+  // When we are logging remote timestamps, we need to be able to translate from
+  // the channel index that the event loop uses to the channel index in the
+  // config in the log file.
+  event_loop_to_logged_channel_index_.resize(
+      event_loop->configuration()->channels()->size(), -1);
+  for (size_t event_loop_channel_index = 0;
+       event_loop_channel_index <
+       event_loop->configuration()->channels()->size();
+       ++event_loop_channel_index) {
+    const Channel *event_loop_channel =
+        event_loop->configuration()->channels()->Get(event_loop_channel_index);
+
+    const Channel *logged_channel = aos::configuration::GetChannel(
+        configuration_, event_loop_channel->name()->string_view(),
+        event_loop_channel->type()->string_view(), "",
+        configuration::GetNode(configuration_, event_loop_->node()));
+
+    if (logged_channel != nullptr) {
+      event_loop_to_logged_channel_index_[event_loop_channel_index] =
+          configuration::ChannelIndex(configuration_, logged_channel);
+    }
+  }
+}
+
+Logger::~Logger() {
+  if (log_namer_) {
+    // If we are replaying a log file, or in simulation, we want to force the
+    // last bit of data to be logged.  The easiest way to deal with this is to
+    // poll everything as we go to destroy the class, ie, shut down the logger,
+    // and write it to disk.
+    StopLogging(event_loop_->monotonic_now());
+  }
+}
+
+void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
+                          std::string_view log_start_uuid) {
+  CHECK(!log_namer_) << ": Already logging";
+  log_namer_ = std::move(log_namer);
+
+  std::string config_sha256;
+  if (separate_config_) {
+    flatbuffers::FlatBufferBuilder fbb;
+    flatbuffers::Offset<aos::Configuration> configuration_offset =
+        CopyFlatBuffer(configuration_, &fbb);
+    LogFileHeader::Builder log_file_header_builder(fbb);
+    log_file_header_builder.add_configuration(configuration_offset);
+    fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
+        fbb.Release());
+    config_sha256 = Sha256(config_header.span());
+    LOG(INFO) << "Config sha256 of " << config_sha256;
+    log_namer_->WriteConfiguration(&config_header, config_sha256);
+  }
+
+  log_event_uuid_ = UUID::Random();
+  log_start_uuid_ = log_start_uuid;
+  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+
+  // We want to do as much work as possible before the initial Fetch. Time
+  // between that and actually starting to log opens up the possibility of
+  // falling off the end of the queue during that time.
+
+  for (FetcherStruct &f : fetchers_) {
+    if (f.wants_writer) {
+      f.writer = log_namer_->MakeWriter(f.channel);
+    }
+    if (f.wants_timestamp_writer) {
+      f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+    }
+    if (f.wants_contents_writer) {
+      f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
+          f.channel, CHECK_NOTNULL(f.timestamp_node));
+    }
+  }
+
+  CHECK(node_state_.empty());
+  node_state_.resize(configuration::MultiNode(configuration_)
+                         ? configuration_->nodes()->size()
+                         : 1u);
+
+  for (const Node *node : log_namer_->nodes()) {
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
+
+    node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
+  }
+
+  // Grab data from each channel right before we declare the log file started
+  // so we can capture the latest message on each channel.  This lets us have
+  // non periodic messages with configuration that now get logged.
+  for (FetcherStruct &f : fetchers_) {
+    const auto start = event_loop_->monotonic_now();
+    const bool got_new = f.fetcher->Fetch();
+    const auto end = event_loop_->monotonic_now();
+    RecordFetchResult(start, end, got_new, &f);
+
+    // If there is a message, we want to write it.
+    f.written = f.fetcher->context().data == nullptr;
+  }
+
+  // Clear out any old timestamps in case we are re-starting logging.
+  for (size_t i = 0; i < node_state_.size(); ++i) {
+    SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
+                 monotonic_clock::min_time, realtime_clock::min_time);
+  }
+
+  WriteHeader();
+
+  LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
+            << " start_time " << last_synchronized_time_;
+
+  // Force logging up until the start of the log file now, so the messages at
+  // the start are always ordered before the rest of the messages.
+  // Note: this ship may have already sailed, but we don't have to make it
+  // worse.
+  // TODO(austin): Test...
+  LogUntil(last_synchronized_time_);
+
+  timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
+                        polling_period_);
+}
+
+std::unique_ptr<LogNamer> Logger::StopLogging(
+    aos::monotonic_clock::time_point end_time) {
+  CHECK(log_namer_) << ": Not logging right now";
+
+  if (end_time != aos::monotonic_clock::min_time) {
+    LogUntil(end_time);
+  }
+  timer_handler_->Disable();
+
+  for (FetcherStruct &f : fetchers_) {
+    f.writer = nullptr;
+    f.timestamp_writer = nullptr;
+    f.contents_writer = nullptr;
+  }
+  node_state_.clear();
+
+  log_event_uuid_ = UUID::Zero();
+  log_start_uuid_ = std::string();
+
+  return std::move(log_namer_);
+}
+
+void Logger::WriteHeader() {
+  if (configuration::MultiNode(configuration_)) {
+    server_statistics_fetcher_.Fetch();
+  }
+
+  aos::monotonic_clock::time_point monotonic_start_time =
+      event_loop_->monotonic_now();
+  aos::realtime_clock::time_point realtime_start_time =
+      event_loop_->realtime_now();
+
+  // We need to pick a point in time to declare the log file "started".  This
+  // starts here.  It needs to be after everything is fetched so that the
+  // fetchers are all pointed at the most recent message before the start
+  // time.
+  last_synchronized_time_ = monotonic_start_time;
+
+  for (const Node *node : log_namer_->nodes()) {
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
+    MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
+                         realtime_start_time);
+    MaybeWriteHeader(node_index, node);
+  }
+}
+
+void Logger::MaybeWriteHeader(int node_index) {
+  if (configuration::MultiNode(configuration_)) {
+    return MaybeWriteHeader(node_index,
+                            configuration_->nodes()->Get(node_index));
+  } else {
+    return MaybeWriteHeader(node_index, nullptr);
+  }
+}
+
+void Logger::MaybeWriteHeader(int node_index, const Node *node) {
+  // This function is responsible for writing the header when the header both
+  // has valid data, and when it needs to be written.
+  if (node_state_[node_index].header_written &&
+      node_state_[node_index].header_valid) {
+    // The header has been written and is valid, nothing to do.
+    return;
+  }
+  if (!node_state_[node_index].has_source_node_boot_uuid) {
+    // Can't write a header if we don't have the boot UUID.
+    return;
+  }
+
+  // WriteHeader writes the first header in a log file.  We want to do this only
+  // once.
+  //
+  // Rotate rewrites the same header with a new part ID, but keeps the same part
+  // UUID.  We don't want that when things reboot, because that implies that
+  // parts go together across a reboot.
+  //
+  // Reboot resets the parts UUID.  So, once we've written a header the first
+  // time, we want to use Reboot to rotate the log and reset the parts UUID.
+  //
+  // header_valid is cleared whenever the remote reboots.
+  if (node_state_[node_index].header_written) {
+    log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
+  } else {
+    log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
+
+    node_state_[node_index].header_written = true;
+  }
+  node_state_[node_index].header_valid = true;
+}
+
+void Logger::WriteMissingTimestamps() {
+  if (configuration::MultiNode(configuration_)) {
+    server_statistics_fetcher_.Fetch();
+  } else {
+    return;
+  }
+
+  if (server_statistics_fetcher_.get() == nullptr) {
+    return;
+  }
+
+  for (const Node *node : log_namer_->nodes()) {
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
+    if (MaybeUpdateTimestamp(
+            node, node_index,
+            server_statistics_fetcher_.context().monotonic_event_time,
+            server_statistics_fetcher_.context().realtime_event_time)) {
+      CHECK(node_state_[node_index].header_written);
+      CHECK(node_state_[node_index].header_valid);
+      log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+    } else {
+      MaybeWriteHeader(node_index, node);
+    }
+  }
+}
+
+void Logger::SetStartTime(
+    size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
+    aos::realtime_clock::time_point realtime_start_time,
+    aos::monotonic_clock::time_point logger_monotonic_start_time,
+    aos::realtime_clock::time_point logger_realtime_start_time) {
+  node_state_[node_index].monotonic_start_time = monotonic_start_time;
+  node_state_[node_index].realtime_start_time = realtime_start_time;
+  node_state_[node_index]
+      .log_file_header.mutable_message()
+      ->mutate_monotonic_start_time(
+          std::chrono::duration_cast<std::chrono::nanoseconds>(
+              monotonic_start_time.time_since_epoch())
+              .count());
+
+  // Add logger start times if they are available in the log file header.
+  if (node_state_[node_index]
+          .log_file_header.mutable_message()
+          ->has_logger_monotonic_start_time()) {
+    node_state_[node_index]
+        .log_file_header.mutable_message()
+        ->mutate_logger_monotonic_start_time(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                logger_monotonic_start_time.time_since_epoch())
+                .count());
+  }
+
+  if (node_state_[node_index]
+          .log_file_header.mutable_message()
+          ->has_logger_realtime_start_time()) {
+    node_state_[node_index]
+        .log_file_header.mutable_message()
+        ->mutate_logger_realtime_start_time(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                logger_realtime_start_time.time_since_epoch())
+                .count());
+  }
+
+  if (node_state_[node_index]
+          .log_file_header.mutable_message()
+          ->has_realtime_start_time()) {
+    node_state_[node_index]
+        .log_file_header.mutable_message()
+        ->mutate_realtime_start_time(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                realtime_start_time.time_since_epoch())
+                .count());
+  }
+}
+
+bool Logger::MaybeUpdateTimestamp(
+    const Node *node, int node_index,
+    aos::monotonic_clock::time_point monotonic_start_time,
+    aos::realtime_clock::time_point realtime_start_time) {
+  // Bail early if the start times are already set.
+  if (node_state_[node_index].monotonic_start_time !=
+      monotonic_clock::min_time) {
+    return false;
+  }
+  if (event_loop_->node() == node ||
+      !configuration::MultiNode(configuration_)) {
+    // There are no offsets to compute for ourself, so always succeed.
+    SetStartTime(node_index, monotonic_start_time, realtime_start_time,
+                 monotonic_start_time, realtime_start_time);
+    node_state_[node_index].SetBootUUID(event_loop_->boot_uuid().string_view());
+    return true;
+  } else if (server_statistics_fetcher_.get() != nullptr) {
+    // We must be a remote node now.  Look for the connection and see if it is
+    // connected.
+
+    for (const message_bridge::ServerConnection *connection :
+         *server_statistics_fetcher_->connections()) {
+      if (connection->node()->name()->string_view() !=
+          node->name()->string_view()) {
+        continue;
+      }
+
+      if (connection->state() != message_bridge::State::CONNECTED) {
+        VLOG(1) << node->name()->string_view()
+                << " is not connected, can't start it yet.";
+        break;
+      }
+
+      // Update the boot UUID as soon as we know we are connected.
+      if (!connection->has_boot_uuid()) {
+        VLOG(1) << "Missing boot_uuid for node " << aos::FlatbufferToJson(node);
+        break;
+      }
+
+      if (!node_state_[node_index].has_source_node_boot_uuid ||
+          node_state_[node_index].source_node_boot_uuid !=
+              connection->boot_uuid()->string_view()) {
+        node_state_[node_index].SetBootUUID(
+            connection->boot_uuid()->string_view());
+      }
+
+      if (!connection->has_monotonic_offset()) {
+        VLOG(1) << "Missing monotonic offset for setting start time for node "
+                << aos::FlatbufferToJson(node);
+        break;
+      }
+
+      // Found it and it is connected.  Compensate and go.
+      SetStartTime(node_index,
+                   monotonic_start_time +
+                       std::chrono::nanoseconds(connection->monotonic_offset()),
+                   realtime_start_time, monotonic_start_time,
+                   realtime_start_time);
+      return true;
+    }
+  }
+  return false;
+}
+
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
+    const Node *node, std::string_view config_sha256) {
+  // Now write the header with this timestamp in it.
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(true);
+
+  flatbuffers::Offset<aos::Configuration> configuration_offset;
+  if (!separate_config_) {
+    configuration_offset = CopyFlatBuffer(configuration_, &fbb);
+  } else {
+    CHECK(!config_sha256.empty());
+  }
+
+  const flatbuffers::Offset<flatbuffers::String> name_offset =
+      fbb.CreateString(name_);
+
+  CHECK(log_event_uuid_ != UUID::Zero());
+  const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
+      fbb.CreateString(log_event_uuid_.string_view());
+
+  const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
+      fbb.CreateString(logger_instance_uuid_.string_view());
+
+  flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
+  if (!log_start_uuid_.empty()) {
+    log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
+  }
+
+  flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
+  if (!config_sha256.empty()) {
+    config_sha256_offset = fbb.CreateString(config_sha256);
+  }
+
+  const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
+      fbb.CreateString(event_loop_->boot_uuid().string_view());
+
+  const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
+      fbb.CreateString(event_loop_->boot_uuid().string_view());
+
+  const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
+      fbb.CreateString("00000000-0000-4000-8000-000000000000");
+
+  flatbuffers::Offset<Node> node_offset;
+  flatbuffers::Offset<Node> logger_node_offset;
+
+  if (configuration::MultiNode(configuration_)) {
+    node_offset = RecursiveCopyFlatBuffer(node, &fbb);
+    logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
+  }
+
+  aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
+
+  log_file_header_builder.add_name(name_offset);
+
+  // Only add the node if we are running in a multinode configuration.
+  if (node != nullptr) {
+    log_file_header_builder.add_node(node_offset);
+    log_file_header_builder.add_logger_node(logger_node_offset);
+  }
+
+  if (!configuration_offset.IsNull()) {
+    log_file_header_builder.add_configuration(configuration_offset);
+  }
+  // The worst case theoretical out of order is the polling period times 2.
+  // One message could get logged right after the boundary, but be for right
+  // before the next boundary.  And the reverse could happen for another
+  // message.  Report back 3x to be extra safe, and because the cost isn't
+  // huge on the read side.
+  log_file_header_builder.add_max_out_of_order_duration(
+      std::chrono::nanoseconds(3 * polling_period_).count());
+
+  log_file_header_builder.add_monotonic_start_time(
+      std::chrono::duration_cast<std::chrono::nanoseconds>(
+          monotonic_clock::min_time.time_since_epoch())
+          .count());
+  if (node == event_loop_->node()) {
+    log_file_header_builder.add_realtime_start_time(
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            realtime_clock::min_time.time_since_epoch())
+            .count());
+  } else {
+    log_file_header_builder.add_logger_monotonic_start_time(
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            monotonic_clock::min_time.time_since_epoch())
+            .count());
+    log_file_header_builder.add_logger_realtime_start_time(
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            realtime_clock::min_time.time_since_epoch())
+            .count());
+  }
+
+  log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
+  log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
+  if (!log_start_uuid_offset.IsNull()) {
+    log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
+  }
+  log_file_header_builder.add_logger_node_boot_uuid(
+      logger_node_boot_uuid_offset);
+  log_file_header_builder.add_source_node_boot_uuid(
+      source_node_boot_uuid_offset);
+
+  log_file_header_builder.add_parts_uuid(parts_uuid_offset);
+  log_file_header_builder.add_parts_index(0);
+
+  log_file_header_builder.add_configuration_sha256(0);
+
+  if (!config_sha256_offset.IsNull()) {
+    log_file_header_builder.add_configuration_sha256(config_sha256_offset);
+  }
+
+  fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
+      fbb.Release());
+
+  CHECK(result.Verify()) << ": Built a corrupted header.";
+
+  return result;
+}
+
+void Logger::ResetStatisics() {
+  max_message_fetch_time_ = std::chrono::nanoseconds::zero();
+  max_message_fetch_time_channel_ = -1;
+  max_message_fetch_time_size_ = -1;
+  total_message_fetch_time_ = std::chrono::nanoseconds::zero();
+  total_message_fetch_count_ = 0;
+  total_message_fetch_bytes_ = 0;
+  total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
+  total_nop_fetch_count_ = 0;
+  max_copy_time_ = std::chrono::nanoseconds::zero();
+  max_copy_time_channel_ = -1;
+  max_copy_time_size_ = -1;
+  total_copy_time_ = std::chrono::nanoseconds::zero();
+  total_copy_count_ = 0;
+  total_copy_bytes_ = 0;
+}
+
+void Logger::Rotate() {
+  for (const Node *node : log_namer_->nodes()) {
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
+    log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+  }
+}
+
+void Logger::LogUntil(monotonic_clock::time_point t) {
+  // Grab the latest ServerStatistics message.  This will always have the
+  // oppertunity to be >= to the current time, so it will always represent any
+  // reboots which may have happened.
+  WriteMissingTimestamps();
+
+  // Write each channel to disk, one at a time.
+  for (FetcherStruct &f : fetchers_) {
+    while (true) {
+      if (f.written) {
+        const auto start = event_loop_->monotonic_now();
+        const bool got_new = f.fetcher->FetchNext();
+        const auto end = event_loop_->monotonic_now();
+        RecordFetchResult(start, end, got_new, &f);
+        if (!got_new) {
+          VLOG(2) << "No new data on "
+                  << configuration::CleanedChannelToString(
+                         f.fetcher->channel());
+          break;
+        }
+        f.written = false;
+      }
+
+      // TODO(james): Write tests to exercise this logic.
+      if (f.fetcher->context().monotonic_event_time >= t) {
+        break;
+      }
+      if (f.writer != nullptr) {
+        // Write!
+        const auto start = event_loop_->monotonic_now();
+        flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+                                           max_header_size_);
+        fbb.ForceDefaults(true);
+
+        fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                           f.channel_index, f.log_type));
+        const auto end = event_loop_->monotonic_now();
+        RecordCreateMessageTime(start, end, &f);
+
+        VLOG(2) << "Writing data as node "
+                << FlatbufferToJson(event_loop_->node()) << " for channel "
+                << configuration::CleanedChannelToString(f.fetcher->channel())
+                << " to " << f.writer->filename() << " data "
+                << FlatbufferToJson(
+                       flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                           fbb.GetBufferPointer()));
+
+        max_header_size_ = std::max(max_header_size_,
+                                    fbb.GetSize() - f.fetcher->context().size);
+        CHECK(node_state_[f.data_node_index].header_valid)
+            << ": Can't write data before the header on channel "
+            << configuration::CleanedChannelToString(f.fetcher->channel());
+        f.writer->QueueSizedFlatbuffer(&fbb);
+      }
+
+      if (f.timestamp_writer != nullptr) {
+        // And now handle timestamps.
+        const auto start = event_loop_->monotonic_now();
+        flatbuffers::FlatBufferBuilder fbb;
+        fbb.ForceDefaults(true);
+
+        fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                           f.channel_index,
+                                           LogType::kLogDeliveryTimeOnly));
+        const auto end = event_loop_->monotonic_now();
+        RecordCreateMessageTime(start, end, &f);
+
+        VLOG(2) << "Writing timestamps as node "
+                << FlatbufferToJson(event_loop_->node()) << " for channel "
+                << configuration::CleanedChannelToString(f.fetcher->channel())
+                << " to " << f.timestamp_writer->filename() << " timestamp "
+                << FlatbufferToJson(
+                       flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                           fbb.GetBufferPointer()));
+
+        CHECK(node_state_[f.timestamp_node_index].header_valid)
+            << ": Can't write data before the header on channel "
+            << configuration::CleanedChannelToString(f.fetcher->channel());
+        f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+      }
+
+      if (f.contents_writer != nullptr) {
+        const auto start = event_loop_->monotonic_now();
+        // And now handle the special message contents channel.  Copy the
+        // message into a FlatBufferBuilder and save it to disk.
+        // TODO(austin): We can be more efficient here when we start to
+        // care...
+        flatbuffers::FlatBufferBuilder fbb;
+        fbb.ForceDefaults(true);
+
+        const RemoteMessage *msg =
+            flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
+
+        CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
+        if (!node_state_[f.contents_node_index].has_source_node_boot_uuid ||
+            node_state_[f.contents_node_index].source_node_boot_uuid !=
+                msg->boot_uuid()->string_view()) {
+          node_state_[f.contents_node_index].SetBootUUID(
+              msg->boot_uuid()->string_view());
+
+          MaybeWriteHeader(f.contents_node_index);
+        }
+
+        logger::MessageHeader::Builder message_header_builder(fbb);
+
+        // TODO(austin): This needs to check the channel_index and confirm
+        // that it should be logged before squirreling away the timestamp to
+        // disk.  We don't want to log irrelevant timestamps.
+
+        // Note: this must match the same order as MessageBridgeServer and
+        // PackMessage.  We want identical headers to have identical
+        // on-the-wire formats to make comparing them easier.
+
+        // Translate from the channel index that the event loop uses to the
+        // channel index in the log file.
+        message_header_builder.add_channel_index(
+            event_loop_to_logged_channel_index_[msg->channel_index()]);
+
+        message_header_builder.add_queue_index(msg->queue_index());
+        message_header_builder.add_monotonic_sent_time(
+            msg->monotonic_sent_time());
+        message_header_builder.add_realtime_sent_time(
+            msg->realtime_sent_time());
+
+        message_header_builder.add_monotonic_remote_time(
+            msg->monotonic_remote_time());
+        message_header_builder.add_realtime_remote_time(
+            msg->realtime_remote_time());
+        message_header_builder.add_remote_queue_index(
+            msg->remote_queue_index());
+
+        message_header_builder.add_monotonic_timestamp_time(
+            f.fetcher->context()
+                .monotonic_event_time.time_since_epoch()
+                .count());
+
+        fbb.FinishSizePrefixed(message_header_builder.Finish());
+        const auto end = event_loop_->monotonic_now();
+        RecordCreateMessageTime(start, end, &f);
+
+        CHECK(node_state_[f.contents_node_index].header_valid)
+            << ": Can't write data before the header on channel "
+            << configuration::CleanedChannelToString(f.fetcher->channel());
+        f.contents_writer->QueueSizedFlatbuffer(&fbb);
+      }
+
+      f.written = true;
+    }
+  }
+  last_synchronized_time_ = t;
+}
+
+void Logger::DoLogData(const monotonic_clock::time_point end_time) {
+  // We want to guarantee that messages aren't out of order by more than
+  // max_out_of_order_duration.  To do this, we need sync points.  Every write
+  // cycle should be a sync point.
+
+  do {
+    // Move the sync point up by at most polling_period.  This forces one sync
+    // per iteration, even if it is small.
+    LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
+
+    on_logged_period_();
+
+    // If we missed cycles, we could be pretty far behind.  Spin until we are
+    // caught up.
+  } while (last_synchronized_time_ + polling_period_ < end_time);
+}
+
+void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
+                               aos::monotonic_clock::time_point end,
+                               bool got_new, FetcherStruct *fetcher) {
+  const auto duration = end - start;
+  if (!got_new) {
+    ++total_nop_fetch_count_;
+    total_nop_fetch_time_ += duration;
+    return;
+  }
+  ++total_message_fetch_count_;
+  total_message_fetch_bytes_ += fetcher->fetcher->context().size;
+  total_message_fetch_time_ += duration;
+  if (duration > max_message_fetch_time_) {
+    max_message_fetch_time_ = duration;
+    max_message_fetch_time_channel_ = fetcher->channel_index;
+    max_message_fetch_time_size_ = fetcher->fetcher->context().size;
+  }
+}
+
+void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
+                                     aos::monotonic_clock::time_point end,
+                                     FetcherStruct *fetcher) {
+  const auto duration = end - start;
+  total_copy_time_ += duration;
+  ++total_copy_count_;
+  total_copy_bytes_ += fetcher->fetcher->context().size;
+  if (duration > max_copy_time_) {
+    max_copy_time_ = duration;
+    max_copy_time_channel_ = fetcher->channel_index;
+    max_copy_time_size_ = fetcher->fetcher->context().size;
+  }
+}
+
+}  // namespace logger
+}  // namespace aos