Split logger.h into log_reader and log_writer

Much simpler!

Change-Id: I6c4ee363b56b67dac40c456261bbed79d01b8eb6
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index b88478d..fcf7683 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -166,18 +166,52 @@
 )
 
 cc_library(
-    name = "logger",
+    name = "log_namer",
     srcs = [
         "log_namer.cc",
-        "logger.cc",
     ],
     hdrs = [
         "log_namer.h",
-        "logger.h",
+    ],
+    deps = [
+        ":logfile_utils",
+        ":logger_fbs",
+        ":uuid",
+        "@com_github_google_flatbuffers//:flatbuffers",
+    ],
+)
+
+cc_library(
+    name = "log_writer",
+    srcs = [
+        "log_writer.cc",
+    ],
+    hdrs = [
+        "log_writer.h",
+    ],
+    visibility = ["//visibility:public"],
+    deps = [
+        ":log_namer",
+        "//aos:configuration",
+        "//aos/events:event_loop",
+        "//aos/events:simulated_event_loop",
+        "//aos/network:message_bridge_server_fbs",
+    ],
+)
+
+cc_library(
+    name = "log_reader",
+    srcs = [
+        "log_reader.cc",
+    ],
+    hdrs = [
+        "log_reader.h",
     ],
     target_compatible_with = ["@platforms//os:linux"],
     visibility = ["//visibility:public"],
     deps = [
+        ":log_namer",
+        ":log_writer",
         ":logfile_utils",
         ":logger_fbs",
         ":uuid",
@@ -204,7 +238,7 @@
     target_compatible_with = ["@platforms//os:linux"],
     visibility = ["//visibility:public"],
     deps = [
-        ":logger",
+        ":log_reader",
         "//aos:configuration",
         "//aos:init",
         "//aos:json_to_flatbuffer",
@@ -239,7 +273,7 @@
     ],
     target_compatible_with = ["@platforms//os:linux"],
     deps = [
-        ":logger",
+        ":log_reader",
         "//aos:configuration",
         "//aos:init",
         "//aos:json_to_flatbuffer",
@@ -256,7 +290,7 @@
     ],
     target_compatible_with = ["@platforms//os:linux"],
     deps = [
-        ":logger",
+        ":log_reader",
         "//aos:configuration",
         "//aos:init",
         "//aos:json_to_flatbuffer",
@@ -275,7 +309,7 @@
     target_compatible_with = ["@platforms//os:linux"],
     visibility = ["//visibility:public"],
     deps = [
-        ":logger",
+        ":log_writer",
         "//aos:configuration",
         "//aos:init",
         "//aos/events:shm_event_loop",
@@ -315,7 +349,8 @@
     shard_count = 5,
     target_compatible_with = ["@platforms//os:linux"],
     deps = [
-        ":logger",
+        ":log_reader",
+        ":log_writer",
         "//aos/events:message_counter",
         "//aos/events:ping_lib",
         "//aos/events:pong_lib",
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 2e23e0a..2032f9f 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -7,7 +7,7 @@
 #include <vector>
 
 #include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/init.h"
 #include "aos/json_to_flatbuffer.h"
diff --git a/aos/events/logging/log_edit.cc b/aos/events/logging/log_edit.cc
index 02a1402..7eff9e7 100644
--- a/aos/events/logging/log_edit.cc
+++ b/aos/events/logging/log_edit.cc
@@ -1,7 +1,7 @@
 #include <iostream>
 
 #include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/init.h"
 #include "aos/json_to_flatbuffer.h"
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/log_reader.cc
similarity index 62%
rename from aos/events/logging/logger.cc
rename to aos/events/logging/log_reader.cc
index ff45143..c111a73 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/log_reader.cc
@@ -1,4 +1,4 @@
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
 
 #include <fcntl.h>
 #include <limits.h>
@@ -113,839 +113,6 @@
 using message_bridge::RemoteMessage;
 }  // namespace
 
-Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
-               std::function<bool(const Channel *)> should_log)
-    : event_loop_(event_loop),
-      configuration_(configuration),
-      name_(network::GetHostname()),
-      timer_handler_(event_loop_->AddTimer(
-          [this]() { DoLogData(event_loop_->monotonic_now()); })),
-      server_statistics_fetcher_(
-          configuration::MultiNode(event_loop_->configuration())
-              ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
-                    "/aos")
-              : aos::Fetcher<message_bridge::ServerStatistics>()) {
-  VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
-
-  // Find all the nodes which are logging timestamps on our node.  This may
-  // over-estimate if should_log is specified.
-  std::vector<const Node *> timestamp_logger_nodes =
-      configuration::TimestampNodes(configuration_, event_loop_->node());
-
-  std::map<const Channel *, const Node *> timestamp_logger_channels;
-
-  // Now that we have all the nodes accumulated, make remote timestamp loggers
-  // for them.
-  for (const Node *node : timestamp_logger_nodes) {
-    // Note: since we are doing a find using the event loop channel, we need to
-    // make sure this channel pointer is part of the event loop configuration,
-    // not configuration_.  This only matters when configuration_ !=
-    // event_loop->configuration();
-    const Channel *channel = configuration::GetChannel(
-        event_loop->configuration(),
-        absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
-        RemoteMessage::GetFullyQualifiedName(), event_loop_->name(),
-        event_loop_->node());
-
-    CHECK(channel != nullptr)
-        << ": Remote timestamps are logged on "
-        << event_loop_->node()->name()->string_view()
-        << " but can't find channel /aos/remote_timestamps/"
-        << node->name()->string_view();
-    if (!should_log(channel)) {
-      continue;
-    }
-    timestamp_logger_channels.insert(std::make_pair(channel, node));
-  }
-
-  const size_t our_node_index =
-      configuration::GetNodeIndex(configuration_, event_loop_->node());
-
-  for (size_t channel_index = 0;
-       channel_index < configuration_->channels()->size(); ++channel_index) {
-    const Channel *const config_channel =
-        configuration_->channels()->Get(channel_index);
-    // The MakeRawFetcher method needs a channel which is in the event loop
-    // configuration() object, not the configuration_ object.  Go look that up
-    // from the config.
-    const Channel *channel = aos::configuration::GetChannel(
-        event_loop_->configuration(), config_channel->name()->string_view(),
-        config_channel->type()->string_view(), "", event_loop_->node());
-    CHECK(channel != nullptr)
-        << ": Failed to look up channel "
-        << aos::configuration::CleanedChannelToString(config_channel);
-    if (!should_log(channel)) {
-      continue;
-    }
-
-    FetcherStruct fs;
-    fs.channel_index = channel_index;
-    fs.channel = channel;
-
-    const bool is_local =
-        configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
-
-    const bool is_readable =
-        configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
-    const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
-        channel, event_loop_->node());
-    const bool log_message = is_logged && is_readable;
-
-    bool log_delivery_times = false;
-    if (event_loop_->node() != nullptr) {
-      log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-          channel, event_loop_->node(), event_loop_->node());
-    }
-
-    // Now, detect a RemoteMessage timestamp logger where we should just log the
-    // contents to a file directly.
-    const bool log_contents = timestamp_logger_channels.find(channel) !=
-                              timestamp_logger_channels.end();
-
-    if (log_message || log_delivery_times || log_contents) {
-      fs.fetcher = event_loop->MakeRawFetcher(channel);
-      VLOG(1) << "Logging channel "
-              << configuration::CleanedChannelToString(channel);
-
-      if (log_delivery_times) {
-        VLOG(1) << "  Delivery times";
-        fs.wants_timestamp_writer = true;
-        fs.timestamp_node_index = our_node_index;
-      }
-      if (log_message) {
-        VLOG(1) << "  Data";
-        fs.wants_writer = true;
-        if (!is_local) {
-          const Node *source_node = configuration::GetNode(
-              configuration_, channel->source_node()->string_view());
-          fs.data_node_index =
-              configuration::GetNodeIndex(configuration_, source_node);
-          fs.log_type = LogType::kLogRemoteMessage;
-        } else {
-          fs.data_node_index = our_node_index;
-        }
-      }
-      if (log_contents) {
-        VLOG(1) << "Timestamp logger channel "
-                << configuration::CleanedChannelToString(channel);
-        fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
-        fs.wants_contents_writer = true;
-        fs.contents_node_index =
-            configuration::GetNodeIndex(configuration_, fs.timestamp_node);
-      }
-      fetchers_.emplace_back(std::move(fs));
-    }
-  }
-
-  // When we are logging remote timestamps, we need to be able to translate from
-  // the channel index that the event loop uses to the channel index in the
-  // config in the log file.
-  event_loop_to_logged_channel_index_.resize(
-      event_loop->configuration()->channels()->size(), -1);
-  for (size_t event_loop_channel_index = 0;
-       event_loop_channel_index <
-       event_loop->configuration()->channels()->size();
-       ++event_loop_channel_index) {
-    const Channel *event_loop_channel =
-        event_loop->configuration()->channels()->Get(event_loop_channel_index);
-
-    const Channel *logged_channel = aos::configuration::GetChannel(
-        configuration_, event_loop_channel->name()->string_view(),
-        event_loop_channel->type()->string_view(), "",
-        configuration::GetNode(configuration_, event_loop_->node()));
-
-    if (logged_channel != nullptr) {
-      event_loop_to_logged_channel_index_[event_loop_channel_index] =
-          configuration::ChannelIndex(configuration_, logged_channel);
-    }
-  }
-}
-
-Logger::~Logger() {
-  if (log_namer_) {
-    // If we are replaying a log file, or in simulation, we want to force the
-    // last bit of data to be logged.  The easiest way to deal with this is to
-    // poll everything as we go to destroy the class, ie, shut down the logger,
-    // and write it to disk.
-    StopLogging(event_loop_->monotonic_now());
-  }
-}
-
-void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
-                          std::string_view log_start_uuid) {
-  CHECK(!log_namer_) << ": Already logging";
-  log_namer_ = std::move(log_namer);
-
-  std::string config_sha256;
-  if (separate_config_) {
-    flatbuffers::FlatBufferBuilder fbb;
-    flatbuffers::Offset<aos::Configuration> configuration_offset =
-        CopyFlatBuffer(configuration_, &fbb);
-    LogFileHeader::Builder log_file_header_builder(fbb);
-    log_file_header_builder.add_configuration(configuration_offset);
-    fbb.FinishSizePrefixed(log_file_header_builder.Finish());
-    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
-        fbb.Release());
-    config_sha256 = Sha256(config_header.span());
-    LOG(INFO) << "Config sha256 of " << config_sha256;
-    log_namer_->WriteConfiguration(&config_header, config_sha256);
-  }
-
-  log_event_uuid_ = UUID::Random();
-  log_start_uuid_ = log_start_uuid;
-  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
-
-  // We want to do as much work as possible before the initial Fetch. Time
-  // between that and actually starting to log opens up the possibility of
-  // falling off the end of the queue during that time.
-
-  for (FetcherStruct &f : fetchers_) {
-    if (f.wants_writer) {
-      f.writer = log_namer_->MakeWriter(f.channel);
-    }
-    if (f.wants_timestamp_writer) {
-      f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
-    }
-    if (f.wants_contents_writer) {
-      f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
-          f.channel, CHECK_NOTNULL(f.timestamp_node));
-    }
-  }
-
-  CHECK(node_state_.empty());
-  node_state_.resize(configuration::MultiNode(configuration_)
-                         ? configuration_->nodes()->size()
-                         : 1u);
-
-  for (const Node *node : log_namer_->nodes()) {
-    const int node_index = configuration::GetNodeIndex(configuration_, node);
-
-    node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
-  }
-
-  // Grab data from each channel right before we declare the log file started
-  // so we can capture the latest message on each channel.  This lets us have
-  // non periodic messages with configuration that now get logged.
-  for (FetcherStruct &f : fetchers_) {
-    const auto start = event_loop_->monotonic_now();
-    const bool got_new = f.fetcher->Fetch();
-    const auto end = event_loop_->monotonic_now();
-    RecordFetchResult(start, end, got_new, &f);
-
-    // If there is a message, we want to write it.
-    f.written = f.fetcher->context().data == nullptr;
-  }
-
-  // Clear out any old timestamps in case we are re-starting logging.
-  for (size_t i = 0; i < node_state_.size(); ++i) {
-    SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
-                 monotonic_clock::min_time, realtime_clock::min_time);
-  }
-
-  WriteHeader();
-
-  LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
-            << " start_time " << last_synchronized_time_;
-
-  // Force logging up until the start of the log file now, so the messages at
-  // the start are always ordered before the rest of the messages.
-  // Note: this ship may have already sailed, but we don't have to make it
-  // worse.
-  // TODO(austin): Test...
-  LogUntil(last_synchronized_time_);
-
-  timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
-                        polling_period_);
-}
-
-std::unique_ptr<LogNamer> Logger::StopLogging(
-    aos::monotonic_clock::time_point end_time) {
-  CHECK(log_namer_) << ": Not logging right now";
-
-  if (end_time != aos::monotonic_clock::min_time) {
-    LogUntil(end_time);
-  }
-  timer_handler_->Disable();
-
-  for (FetcherStruct &f : fetchers_) {
-    f.writer = nullptr;
-    f.timestamp_writer = nullptr;
-    f.contents_writer = nullptr;
-  }
-  node_state_.clear();
-
-  log_event_uuid_ = UUID::Zero();
-  log_start_uuid_ = std::string();
-
-  return std::move(log_namer_);
-}
-
-void Logger::WriteHeader() {
-  if (configuration::MultiNode(configuration_)) {
-    server_statistics_fetcher_.Fetch();
-  }
-
-  aos::monotonic_clock::time_point monotonic_start_time =
-      event_loop_->monotonic_now();
-  aos::realtime_clock::time_point realtime_start_time =
-      event_loop_->realtime_now();
-
-  // We need to pick a point in time to declare the log file "started".  This
-  // starts here.  It needs to be after everything is fetched so that the
-  // fetchers are all pointed at the most recent message before the start
-  // time.
-  last_synchronized_time_ = monotonic_start_time;
-
-  for (const Node *node : log_namer_->nodes()) {
-    const int node_index = configuration::GetNodeIndex(configuration_, node);
-    MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
-                         realtime_start_time);
-    MaybeWriteHeader(node_index, node);
-  }
-}
-
-void Logger::MaybeWriteHeader(int node_index) {
-  if (configuration::MultiNode(configuration_)) {
-    return MaybeWriteHeader(node_index,
-                            configuration_->nodes()->Get(node_index));
-  } else {
-    return MaybeWriteHeader(node_index, nullptr);
-  }
-}
-
-void Logger::MaybeWriteHeader(int node_index, const Node *node) {
-  // This function is responsible for writing the header when the header both
-  // has valid data, and when it needs to be written.
-  if (node_state_[node_index].header_written &&
-      node_state_[node_index].header_valid) {
-    // The header has been written and is valid, nothing to do.
-    return;
-  }
-  if (!node_state_[node_index].has_source_node_boot_uuid) {
-    // Can't write a header if we don't have the boot UUID.
-    return;
-  }
-
-  // WriteHeader writes the first header in a log file.  We want to do this only
-  // once.
-  //
-  // Rotate rewrites the same header with a new part ID, but keeps the same part
-  // UUID.  We don't want that when things reboot, because that implies that
-  // parts go together across a reboot.
-  //
-  // Reboot resets the parts UUID.  So, once we've written a header the first
-  // time, we want to use Reboot to rotate the log and reset the parts UUID.
-  //
-  // header_valid is cleared whenever the remote reboots.
-  if (node_state_[node_index].header_written) {
-    log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
-  } else {
-    log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
-
-    node_state_[node_index].header_written = true;
-  }
-  node_state_[node_index].header_valid = true;
-}
-
-void Logger::WriteMissingTimestamps() {
-  if (configuration::MultiNode(configuration_)) {
-    server_statistics_fetcher_.Fetch();
-  } else {
-    return;
-  }
-
-  if (server_statistics_fetcher_.get() == nullptr) {
-    return;
-  }
-
-  for (const Node *node : log_namer_->nodes()) {
-    const int node_index = configuration::GetNodeIndex(configuration_, node);
-    if (MaybeUpdateTimestamp(
-            node, node_index,
-            server_statistics_fetcher_.context().monotonic_event_time,
-            server_statistics_fetcher_.context().realtime_event_time)) {
-      CHECK(node_state_[node_index].header_written);
-      CHECK(node_state_[node_index].header_valid);
-      log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
-    } else {
-      MaybeWriteHeader(node_index, node);
-    }
-  }
-}
-
-void Logger::SetStartTime(
-    size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
-    aos::realtime_clock::time_point realtime_start_time,
-    aos::monotonic_clock::time_point logger_monotonic_start_time,
-    aos::realtime_clock::time_point logger_realtime_start_time) {
-  node_state_[node_index].monotonic_start_time = monotonic_start_time;
-  node_state_[node_index].realtime_start_time = realtime_start_time;
-  node_state_[node_index]
-      .log_file_header.mutable_message()
-      ->mutate_monotonic_start_time(
-          std::chrono::duration_cast<std::chrono::nanoseconds>(
-              monotonic_start_time.time_since_epoch())
-              .count());
-
-  // Add logger start times if they are available in the log file header.
-  if (node_state_[node_index]
-          .log_file_header.mutable_message()
-          ->has_logger_monotonic_start_time()) {
-    node_state_[node_index]
-        .log_file_header.mutable_message()
-        ->mutate_logger_monotonic_start_time(
-            std::chrono::duration_cast<std::chrono::nanoseconds>(
-                logger_monotonic_start_time.time_since_epoch())
-                .count());
-  }
-
-  if (node_state_[node_index]
-          .log_file_header.mutable_message()
-          ->has_logger_realtime_start_time()) {
-    node_state_[node_index]
-        .log_file_header.mutable_message()
-        ->mutate_logger_realtime_start_time(
-            std::chrono::duration_cast<std::chrono::nanoseconds>(
-                logger_realtime_start_time.time_since_epoch())
-                .count());
-  }
-
-  if (node_state_[node_index]
-          .log_file_header.mutable_message()
-          ->has_realtime_start_time()) {
-    node_state_[node_index]
-        .log_file_header.mutable_message()
-        ->mutate_realtime_start_time(
-            std::chrono::duration_cast<std::chrono::nanoseconds>(
-                realtime_start_time.time_since_epoch())
-                .count());
-  }
-}
-
-bool Logger::MaybeUpdateTimestamp(
-    const Node *node, int node_index,
-    aos::monotonic_clock::time_point monotonic_start_time,
-    aos::realtime_clock::time_point realtime_start_time) {
-  // Bail early if the start times are already set.
-  if (node_state_[node_index].monotonic_start_time !=
-      monotonic_clock::min_time) {
-    return false;
-  }
-  if (event_loop_->node() == node ||
-      !configuration::MultiNode(configuration_)) {
-    // There are no offsets to compute for ourself, so always succeed.
-    SetStartTime(node_index, monotonic_start_time, realtime_start_time,
-                 monotonic_start_time, realtime_start_time);
-    node_state_[node_index].SetBootUUID(event_loop_->boot_uuid().string_view());
-    return true;
-  } else if (server_statistics_fetcher_.get() != nullptr) {
-    // We must be a remote node now.  Look for the connection and see if it is
-    // connected.
-
-    for (const message_bridge::ServerConnection *connection :
-         *server_statistics_fetcher_->connections()) {
-      if (connection->node()->name()->string_view() !=
-          node->name()->string_view()) {
-        continue;
-      }
-
-      if (connection->state() != message_bridge::State::CONNECTED) {
-        VLOG(1) << node->name()->string_view()
-                << " is not connected, can't start it yet.";
-        break;
-      }
-
-      // Update the boot UUID as soon as we know we are connected.
-      if (!connection->has_boot_uuid()) {
-        VLOG(1) << "Missing boot_uuid for node " << aos::FlatbufferToJson(node);
-        break;
-      }
-
-      if (!node_state_[node_index].has_source_node_boot_uuid ||
-          node_state_[node_index].source_node_boot_uuid !=
-              connection->boot_uuid()->string_view()) {
-        node_state_[node_index].SetBootUUID(
-            connection->boot_uuid()->string_view());
-      }
-
-      if (!connection->has_monotonic_offset()) {
-        VLOG(1) << "Missing monotonic offset for setting start time for node "
-                << aos::FlatbufferToJson(node);
-        break;
-      }
-
-      // Found it and it is connected.  Compensate and go.
-      SetStartTime(node_index,
-                   monotonic_start_time +
-                       std::chrono::nanoseconds(connection->monotonic_offset()),
-                   realtime_start_time, monotonic_start_time,
-                   realtime_start_time);
-      return true;
-    }
-  }
-  return false;
-}
-
-aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
-    const Node *node, std::string_view config_sha256) {
-  // Now write the header with this timestamp in it.
-  flatbuffers::FlatBufferBuilder fbb;
-  fbb.ForceDefaults(true);
-
-  flatbuffers::Offset<aos::Configuration> configuration_offset;
-  if (!separate_config_) {
-    configuration_offset = CopyFlatBuffer(configuration_, &fbb);
-  } else {
-    CHECK(!config_sha256.empty());
-  }
-
-  const flatbuffers::Offset<flatbuffers::String> name_offset =
-      fbb.CreateString(name_);
-
-  CHECK(log_event_uuid_ != UUID::Zero());
-  const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
-      fbb.CreateString(log_event_uuid_.string_view());
-
-  const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
-      fbb.CreateString(logger_instance_uuid_.string_view());
-
-  flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
-  if (!log_start_uuid_.empty()) {
-    log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
-  }
-
-  flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
-  if (!config_sha256.empty()) {
-    config_sha256_offset = fbb.CreateString(config_sha256);
-  }
-
-  const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
-      fbb.CreateString(event_loop_->boot_uuid().string_view());
-
-  const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
-      fbb.CreateString(event_loop_->boot_uuid().string_view());
-
-  const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
-      fbb.CreateString("00000000-0000-4000-8000-000000000000");
-
-  flatbuffers::Offset<Node> node_offset;
-  flatbuffers::Offset<Node> logger_node_offset;
-
-  if (configuration::MultiNode(configuration_)) {
-    node_offset = RecursiveCopyFlatBuffer(node, &fbb);
-    logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
-  }
-
-  aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
-
-  log_file_header_builder.add_name(name_offset);
-
-  // Only add the node if we are running in a multinode configuration.
-  if (node != nullptr) {
-    log_file_header_builder.add_node(node_offset);
-    log_file_header_builder.add_logger_node(logger_node_offset);
-  }
-
-  if (!configuration_offset.IsNull()) {
-    log_file_header_builder.add_configuration(configuration_offset);
-  }
-  // The worst case theoretical out of order is the polling period times 2.
-  // One message could get logged right after the boundary, but be for right
-  // before the next boundary.  And the reverse could happen for another
-  // message.  Report back 3x to be extra safe, and because the cost isn't
-  // huge on the read side.
-  log_file_header_builder.add_max_out_of_order_duration(
-      std::chrono::nanoseconds(3 * polling_period_).count());
-
-  log_file_header_builder.add_monotonic_start_time(
-      std::chrono::duration_cast<std::chrono::nanoseconds>(
-          monotonic_clock::min_time.time_since_epoch())
-          .count());
-  if (node == event_loop_->node()) {
-    log_file_header_builder.add_realtime_start_time(
-        std::chrono::duration_cast<std::chrono::nanoseconds>(
-            realtime_clock::min_time.time_since_epoch())
-            .count());
-  } else {
-    log_file_header_builder.add_logger_monotonic_start_time(
-        std::chrono::duration_cast<std::chrono::nanoseconds>(
-            monotonic_clock::min_time.time_since_epoch())
-            .count());
-    log_file_header_builder.add_logger_realtime_start_time(
-        std::chrono::duration_cast<std::chrono::nanoseconds>(
-            realtime_clock::min_time.time_since_epoch())
-            .count());
-  }
-
-  log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
-  log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
-  if (!log_start_uuid_offset.IsNull()) {
-    log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
-  }
-  log_file_header_builder.add_logger_node_boot_uuid(
-      logger_node_boot_uuid_offset);
-  log_file_header_builder.add_source_node_boot_uuid(
-      source_node_boot_uuid_offset);
-
-  log_file_header_builder.add_parts_uuid(parts_uuid_offset);
-  log_file_header_builder.add_parts_index(0);
-
-  log_file_header_builder.add_configuration_sha256(0);
-
-  if (!config_sha256_offset.IsNull()) {
-    log_file_header_builder.add_configuration_sha256(config_sha256_offset);
-  }
-
-  fbb.FinishSizePrefixed(log_file_header_builder.Finish());
-  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
-      fbb.Release());
-
-  CHECK(result.Verify()) << ": Built a corrupted header.";
-
-  return result;
-}
-
-void Logger::ResetStatisics() {
-  max_message_fetch_time_ = std::chrono::nanoseconds::zero();
-  max_message_fetch_time_channel_ = -1;
-  max_message_fetch_time_size_ = -1;
-  total_message_fetch_time_ = std::chrono::nanoseconds::zero();
-  total_message_fetch_count_ = 0;
-  total_message_fetch_bytes_ = 0;
-  total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
-  total_nop_fetch_count_ = 0;
-  max_copy_time_ = std::chrono::nanoseconds::zero();
-  max_copy_time_channel_ = -1;
-  max_copy_time_size_ = -1;
-  total_copy_time_ = std::chrono::nanoseconds::zero();
-  total_copy_count_ = 0;
-  total_copy_bytes_ = 0;
-}
-
-void Logger::Rotate() {
-  for (const Node *node : log_namer_->nodes()) {
-    const int node_index = configuration::GetNodeIndex(configuration_, node);
-    log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
-  }
-}
-
-void Logger::LogUntil(monotonic_clock::time_point t) {
-  // Grab the latest ServerStatistics message.  This will always have the
-  // oppertunity to be >= to the current time, so it will always represent any
-  // reboots which may have happened.
-  WriteMissingTimestamps();
-
-  // Write each channel to disk, one at a time.
-  for (FetcherStruct &f : fetchers_) {
-    while (true) {
-      if (f.written) {
-        const auto start = event_loop_->monotonic_now();
-        const bool got_new = f.fetcher->FetchNext();
-        const auto end = event_loop_->monotonic_now();
-        RecordFetchResult(start, end, got_new, &f);
-        if (!got_new) {
-          VLOG(2) << "No new data on "
-                  << configuration::CleanedChannelToString(
-                         f.fetcher->channel());
-          break;
-        }
-        f.written = false;
-      }
-
-      // TODO(james): Write tests to exercise this logic.
-      if (f.fetcher->context().monotonic_event_time >= t) {
-        break;
-      }
-      if (f.writer != nullptr) {
-        // Write!
-        const auto start = event_loop_->monotonic_now();
-        flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
-                                           max_header_size_);
-        fbb.ForceDefaults(true);
-
-        fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
-                                           f.channel_index, f.log_type));
-        const auto end = event_loop_->monotonic_now();
-        RecordCreateMessageTime(start, end, &f);
-
-        VLOG(2) << "Writing data as node "
-                << FlatbufferToJson(event_loop_->node()) << " for channel "
-                << configuration::CleanedChannelToString(f.fetcher->channel())
-                << " to " << f.writer->filename() << " data "
-                << FlatbufferToJson(
-                       flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                           fbb.GetBufferPointer()));
-
-        max_header_size_ = std::max(max_header_size_,
-                                    fbb.GetSize() - f.fetcher->context().size);
-        CHECK(node_state_[f.data_node_index].header_valid)
-            << ": Can't write data before the header on channel "
-            << configuration::CleanedChannelToString(f.fetcher->channel());
-        f.writer->QueueSizedFlatbuffer(&fbb);
-      }
-
-      if (f.timestamp_writer != nullptr) {
-        // And now handle timestamps.
-        const auto start = event_loop_->monotonic_now();
-        flatbuffers::FlatBufferBuilder fbb;
-        fbb.ForceDefaults(true);
-
-        fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
-                                           f.channel_index,
-                                           LogType::kLogDeliveryTimeOnly));
-        const auto end = event_loop_->monotonic_now();
-        RecordCreateMessageTime(start, end, &f);
-
-        VLOG(2) << "Writing timestamps as node "
-                << FlatbufferToJson(event_loop_->node()) << " for channel "
-                << configuration::CleanedChannelToString(f.fetcher->channel())
-                << " to " << f.timestamp_writer->filename() << " timestamp "
-                << FlatbufferToJson(
-                       flatbuffers::GetSizePrefixedRoot<MessageHeader>(
-                           fbb.GetBufferPointer()));
-
-        CHECK(node_state_[f.timestamp_node_index].header_valid)
-            << ": Can't write data before the header on channel "
-            << configuration::CleanedChannelToString(f.fetcher->channel());
-        f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
-      }
-
-      if (f.contents_writer != nullptr) {
-        const auto start = event_loop_->monotonic_now();
-        // And now handle the special message contents channel.  Copy the
-        // message into a FlatBufferBuilder and save it to disk.
-        // TODO(austin): We can be more efficient here when we start to
-        // care...
-        flatbuffers::FlatBufferBuilder fbb;
-        fbb.ForceDefaults(true);
-
-        const RemoteMessage *msg =
-            flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
-
-        CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
-        if (!node_state_[f.contents_node_index].has_source_node_boot_uuid ||
-            node_state_[f.contents_node_index].source_node_boot_uuid !=
-                msg->boot_uuid()->string_view()) {
-          node_state_[f.contents_node_index].SetBootUUID(
-              msg->boot_uuid()->string_view());
-
-          MaybeWriteHeader(f.contents_node_index);
-        }
-
-        logger::MessageHeader::Builder message_header_builder(fbb);
-
-        // TODO(austin): This needs to check the channel_index and confirm
-        // that it should be logged before squirreling away the timestamp to
-        // disk.  We don't want to log irrelevant timestamps.
-
-        // Note: this must match the same order as MessageBridgeServer and
-        // PackMessage.  We want identical headers to have identical
-        // on-the-wire formats to make comparing them easier.
-
-        // Translate from the channel index that the event loop uses to the
-        // channel index in the log file.
-        message_header_builder.add_channel_index(
-            event_loop_to_logged_channel_index_[msg->channel_index()]);
-
-        message_header_builder.add_queue_index(msg->queue_index());
-        message_header_builder.add_monotonic_sent_time(
-            msg->monotonic_sent_time());
-        message_header_builder.add_realtime_sent_time(
-            msg->realtime_sent_time());
-
-        message_header_builder.add_monotonic_remote_time(
-            msg->monotonic_remote_time());
-        message_header_builder.add_realtime_remote_time(
-            msg->realtime_remote_time());
-        message_header_builder.add_remote_queue_index(
-            msg->remote_queue_index());
-
-        message_header_builder.add_monotonic_timestamp_time(
-            f.fetcher->context()
-                .monotonic_event_time.time_since_epoch()
-                .count());
-
-        fbb.FinishSizePrefixed(message_header_builder.Finish());
-        const auto end = event_loop_->monotonic_now();
-        RecordCreateMessageTime(start, end, &f);
-
-        CHECK(node_state_[f.contents_node_index].header_valid)
-            << ": Can't write data before the header on channel "
-            << configuration::CleanedChannelToString(f.fetcher->channel());
-        f.contents_writer->QueueSizedFlatbuffer(&fbb);
-      }
-
-      f.written = true;
-    }
-  }
-  last_synchronized_time_ = t;
-}
-
-void Logger::DoLogData(const monotonic_clock::time_point end_time) {
-  // We want to guarantee that messages aren't out of order by more than
-  // max_out_of_order_duration.  To do this, we need sync points.  Every write
-  // cycle should be a sync point.
-
-  do {
-    // Move the sync point up by at most polling_period.  This forces one sync
-    // per iteration, even if it is small.
-    LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
-
-    on_logged_period_();
-
-    // If we missed cycles, we could be pretty far behind.  Spin until we are
-    // caught up.
-  } while (last_synchronized_time_ + polling_period_ < end_time);
-}
-
-void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
-                               aos::monotonic_clock::time_point end,
-                               bool got_new, FetcherStruct *fetcher) {
-  const auto duration = end - start;
-  if (!got_new) {
-    ++total_nop_fetch_count_;
-    total_nop_fetch_time_ += duration;
-    return;
-  }
-  ++total_message_fetch_count_;
-  total_message_fetch_bytes_ += fetcher->fetcher->context().size;
-  total_message_fetch_time_ += duration;
-  if (duration > max_message_fetch_time_) {
-    max_message_fetch_time_ = duration;
-    max_message_fetch_time_channel_ = fetcher->channel_index;
-    max_message_fetch_time_size_ = fetcher->fetcher->context().size;
-  }
-}
-
-void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
-                                     aos::monotonic_clock::time_point end,
-                                     FetcherStruct *fetcher) {
-  const auto duration = end - start;
-  total_copy_time_ += duration;
-  ++total_copy_count_;
-  total_copy_bytes_ += fetcher->fetcher->context().size;
-  if (duration > max_copy_time_) {
-    max_copy_time_ = duration;
-    max_copy_time_channel_ = fetcher->channel_index;
-    max_copy_time_size_ = fetcher->fetcher->context().size;
-  }
-}
-
-std::vector<std::vector<std::string>> ToLogReaderVector(
-    const std::vector<LogFile> &log_files) {
-  std::vector<std::vector<std::string>> result;
-  for (const LogFile &log_file : log_files) {
-    for (const LogParts &log_parts : log_file.parts) {
-      std::vector<std::string> parts;
-      for (const std::string &part : log_parts.parts) {
-        parts.emplace_back(part);
-      }
-      result.emplace_back(std::move(parts));
-    }
-  }
-  return result;
-}
-
 LogReader::LogReader(std::string_view filename,
                      const Configuration *replay_configuration)
     : LogReader(SortParts({std::string(filename)}), replay_configuration) {}
diff --git a/aos/events/logging/logger.h b/aos/events/logging/log_reader.h
similarity index 61%
rename from aos/events/logging/logger.h
rename to aos/events/logging/log_reader.h
index d142bd8..875c90d 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/log_reader.h
@@ -1,5 +1,5 @@
-#ifndef AOS_EVENTS_LOGGER_H_
-#define AOS_EVENTS_LOGGER_H_
+#ifndef AOS_EVENTS_LOGGING_LOG_READER_H_
+#define AOS_EVENTS_LOGGING_LOG_READER_H_
 
 #include <chrono>
 #include <deque>
@@ -8,7 +8,6 @@
 #include <vector>
 
 #include "aos/events/event_loop.h"
-#include "aos/events/logging/log_namer.h"
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
@@ -24,307 +23,6 @@
 namespace aos {
 namespace logger {
 
-// Logs all channels available in the event loop to disk every 100 ms.
-// Start by logging one message per channel to capture any state and
-// configuration that is sent rately on a channel and would affect execution.
-class Logger {
- public:
-  // Constructs a logger.
-  //   event_loop: The event loop used to read the messages.
-  //   configuration: When provided, this is the configuration to log, and the
-  //     configuration to use for the channel list to log.  If not provided,
-  //     this becomes the configuration from the event loop.
-  //   should_log: When provided, a filter for channels to log. If not provided,
-  //     all available channels are logged.
-  Logger(EventLoop *event_loop)
-      : Logger(event_loop, event_loop->configuration()) {}
-  Logger(EventLoop *event_loop, const Configuration *configuration)
-      : Logger(event_loop, configuration,
-               [](const Channel *) { return true; }) {}
-  Logger(EventLoop *event_loop, const Configuration *configuration,
-         std::function<bool(const Channel *)> should_log);
-  ~Logger();
-
-  // Overrides the name in the log file header.
-  void set_name(std::string_view name) { name_ = name; }
-
-  // Sets the callback to run after each period of data is logged. Defaults to
-  // doing nothing.
-  //
-  // This callback may safely do things like call Rotate().
-  void set_on_logged_period(std::function<void()> on_logged_period) {
-    on_logged_period_ = std::move(on_logged_period);
-  }
-
-  void set_separate_config(bool separate_config) {
-    separate_config_ = separate_config;
-  }
-
-  // Sets the period between polling the data. Defaults to 100ms.
-  //
-  // Changing this while a set of files is being written may result in
-  // unreadable files.
-  void set_polling_period(std::chrono::nanoseconds polling_period) {
-    polling_period_ = polling_period;
-  }
-
-  std::string_view log_start_uuid() const { return log_start_uuid_; }
-  UUID logger_instance_uuid() const { return logger_instance_uuid_; }
-
-  // The maximum time for a single fetch which returned a message, or 0 if none
-  // of those have happened.
-  std::chrono::nanoseconds max_message_fetch_time() const {
-    return max_message_fetch_time_;
-  }
-  // The channel for that longest fetch which returned a message, or -1 if none
-  // of those have happened.
-  int max_message_fetch_time_channel() const {
-    return max_message_fetch_time_channel_;
-  }
-  // The size of the message returned by that longest fetch, or -1 if none of
-  // those have happened.
-  int max_message_fetch_time_size() const {
-    return max_message_fetch_time_size_;
-  }
-  // The total time spent fetching messages.
-  std::chrono::nanoseconds total_message_fetch_time() const {
-    return total_message_fetch_time_;
-  }
-  // The total number of fetch calls which returned messages.
-  int total_message_fetch_count() const { return total_message_fetch_count_; }
-  // The total number of bytes fetched.
-  int64_t total_message_fetch_bytes() const {
-    return total_message_fetch_bytes_;
-  }
-
-  // The total time spent in fetches which did not return a message.
-  std::chrono::nanoseconds total_nop_fetch_time() const {
-    return total_nop_fetch_time_;
-  }
-  // The total number of fetches which did not return a message.
-  int total_nop_fetch_count() const { return total_nop_fetch_count_; }
-
-  // The maximum time for a single copy, or 0 if none of those have happened.
-  std::chrono::nanoseconds max_copy_time() const { return max_copy_time_; }
-  // The channel for that longest copy, or -1 if none of those have happened.
-  int max_copy_time_channel() const { return max_copy_time_channel_; }
-  // The size of the message for that longest copy, or -1 if none of those have
-  // happened.
-  int max_copy_time_size() const { return max_copy_time_size_; }
-  // The total time spent copying messages.
-  std::chrono::nanoseconds total_copy_time() const { return total_copy_time_; }
-  // The total number of messages copied.
-  int total_copy_count() const { return total_copy_count_; }
-  // The total number of bytes copied.
-  int64_t total_copy_bytes() const { return total_copy_bytes_; }
-
-  void ResetStatisics();
-
-  // Rotates the log file(s), triggering new part files to be written for each
-  // log file.
-  void Rotate();
-
-  // Starts logging to files with the given naming scheme.
-  //
-  // log_start_uuid may be used to tie this log event to other log events across
-  // multiple nodes. The default (empty string) indicates there isn't one
-  // available.
-  void StartLogging(std::unique_ptr<LogNamer> log_namer,
-                    std::string_view log_start_uuid = "");
-
-  // Stops logging. Ensures any messages through end_time make it into the log.
-  //
-  // If you want to stop ASAP, pass min_time to avoid reading any more messages.
-  //
-  // Returns the LogNamer in case the caller wants to do anything else with it
-  // before destroying it.
-  std::unique_ptr<LogNamer> StopLogging(
-      aos::monotonic_clock::time_point end_time);
-
-  // Returns whether a log is currently being written.
-  bool is_started() const { return static_cast<bool>(log_namer_); }
-
-  // Shortcut to call StartLogging with a LocalLogNamer when event processing
-  // starts.
-  void StartLoggingLocalNamerOnRun(std::string base_name) {
-    event_loop_->OnRun([this, base_name]() {
-      StartLogging(
-          std::make_unique<LocalLogNamer>(base_name, event_loop_->node()));
-    });
-  }
-
- private:
-  // Structure to track both a fetcher, and if the data fetched has been
-  // written.  We may want to delay writing data to disk so that we don't let
-  // data get too far out of order when written to disk so we can avoid making
-  // it too hard to sort when reading.
-  struct FetcherStruct {
-    std::unique_ptr<RawFetcher> fetcher;
-    bool written = false;
-
-    // Channel index to log to.
-    int channel_index = -1;
-    const Channel *channel = nullptr;
-    const Node *timestamp_node = nullptr;
-
-    LogType log_type = LogType::kLogMessage;
-
-    // We fill out the metadata at construction, but the actual writers have to
-    // be updated each time we start logging. To avoid duplicating the complex
-    // logic determining whether each writer should be initialized, we just
-    // stash the answer in separate member variables.
-    bool wants_writer = false;
-    DetachedBufferWriter *writer = nullptr;
-    bool wants_timestamp_writer = false;
-    DetachedBufferWriter *timestamp_writer = nullptr;
-    bool wants_contents_writer = false;
-    DetachedBufferWriter *contents_writer = nullptr;
-
-    // Node which this data is from, or -1 if it is unknown.
-    int data_node_index = -1;
-    // Node that this timestamp is for, or -1 if it is known.
-    int timestamp_node_index = -1;
-    // Node that the contents this contents_writer will log are from.
-    int contents_node_index = -1;
-  };
-
-  // Vector mapping from the channel index from the event loop to the logged
-  // channel index.
-  std::vector<int> event_loop_to_logged_channel_index_;
-
-  struct NodeState {
-    aos::monotonic_clock::time_point monotonic_start_time =
-        aos::monotonic_clock::min_time;
-    aos::realtime_clock::time_point realtime_start_time =
-        aos::realtime_clock::min_time;
-
-    bool has_source_node_boot_uuid = false;
-
-    // This is an initial UUID that is a valid UUID4 and is pretty obvious that
-    // it isn't valid.
-    std::string source_node_boot_uuid = "00000000-0000-4000-8000-000000000000";
-
-    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
-        aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
-
-    // True if a header has been written to the start of a log file.
-    bool header_written = false;
-    // True if the current written header represents the contents which will
-    // follow.  This is cleared when boot_uuid is known to not match anymore.
-    bool header_valid = false;
-
-    // Sets the source_node_boot_uuid, properly updating everything.
-    void SetBootUUID(std::string_view new_source_node_boot_uuid) {
-      source_node_boot_uuid = new_source_node_boot_uuid;
-      header_valid = false;
-      has_source_node_boot_uuid = true;
-
-      flatbuffers::String *source_node_boot_uuid_string =
-          log_file_header.mutable_message()->mutable_source_node_boot_uuid();
-      CHECK_EQ(source_node_boot_uuid.size(),
-               source_node_boot_uuid_string->size());
-      memcpy(source_node_boot_uuid_string->data(), source_node_boot_uuid.data(),
-             source_node_boot_uuid.size());
-    }
-  };
-
-  void WriteHeader();
-
-  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
-      const Node *node, std::string_view config_sha256);
-
-  // Writes the header for the provided node if enough information is valid.
-  void MaybeWriteHeader(int node_index);
-  // Overload for when we already know node as well.
-  void MaybeWriteHeader(int node_index, const Node *node);
-
-  bool MaybeUpdateTimestamp(
-      const Node *node, int node_index,
-      aos::monotonic_clock::time_point monotonic_start_time,
-      aos::realtime_clock::time_point realtime_start_time);
-
-  void DoLogData(const monotonic_clock::time_point end_time);
-
-  void WriteMissingTimestamps();
-
-  // Fetches from each channel until all the data is logged.
-  void LogUntil(monotonic_clock::time_point t);
-
-  void RecordFetchResult(aos::monotonic_clock::time_point start,
-                         aos::monotonic_clock::time_point end, bool got_new,
-                         FetcherStruct *fetcher);
-
-  void RecordCreateMessageTime(aos::monotonic_clock::time_point start,
-                               aos::monotonic_clock::time_point end,
-                               FetcherStruct *fetcher);
-
-  // Sets the start time for a specific node.
-  void SetStartTime(
-      size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
-      aos::realtime_clock::time_point realtime_start_time,
-      aos::monotonic_clock::time_point logger_monotonic_start_time,
-      aos::realtime_clock::time_point logger_realtime_start_time);
-
-  EventLoop *const event_loop_;
-  // The configuration to place at the top of the log file.
-  const Configuration *const configuration_;
-
-  UUID log_event_uuid_ = UUID::Zero();
-  const UUID logger_instance_uuid_ = UUID::Random();
-  std::unique_ptr<LogNamer> log_namer_;
-  // Empty indicates there isn't one.
-  std::string log_start_uuid_;
-
-  // Name to save in the log file.  Defaults to hostname.
-  std::string name_;
-
-  std::function<void()> on_logged_period_ = []() {};
-
-  std::chrono::nanoseconds max_message_fetch_time_ =
-      std::chrono::nanoseconds::zero();
-  int max_message_fetch_time_channel_ = -1;
-  int max_message_fetch_time_size_ = -1;
-  std::chrono::nanoseconds total_message_fetch_time_ =
-      std::chrono::nanoseconds::zero();
-  int total_message_fetch_count_ = 0;
-  int64_t total_message_fetch_bytes_ = 0;
-
-  std::chrono::nanoseconds total_nop_fetch_time_ =
-      std::chrono::nanoseconds::zero();
-  int total_nop_fetch_count_ = 0;
-
-  std::chrono::nanoseconds max_copy_time_ = std::chrono::nanoseconds::zero();
-  int max_copy_time_channel_ = -1;
-  int max_copy_time_size_ = -1;
-  std::chrono::nanoseconds total_copy_time_ = std::chrono::nanoseconds::zero();
-  int total_copy_count_ = 0;
-  int64_t total_copy_bytes_ = 0;
-
-  std::vector<FetcherStruct> fetchers_;
-  TimerHandler *timer_handler_;
-
-  // Period to poll the channels.
-  std::chrono::nanoseconds polling_period_ = std::chrono::milliseconds(100);
-
-  // Last time that data was written for all channels to disk.
-  monotonic_clock::time_point last_synchronized_time_;
-
-  // Max size that the header has consumed.  This much extra data will be
-  // reserved in the builder to avoid reallocating.
-  size_t max_header_size_ = 0;
-
-  // If true, write the message header into a separate file.
-  bool separate_config_ = true;
-
-  // Fetcher for all the statistics from all the nodes.
-  aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
-
-  std::vector<NodeState> node_state_;
-};
-
-std::vector<std::vector<std::string>> ToLogReaderVector(
-    const std::vector<LogFile> &log_files);
-
 // We end up with one of the following 3 log file types.
 //
 // Single node logged as the source node.
@@ -771,4 +469,4 @@
 }  // namespace logger
 }  // namespace aos
 
-#endif  // AOS_EVENTS_LOGGER_H_
+#endif  // AOS_EVENTS_LOGGING_LOG_READER_H_
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index 47d0ced..d4f04c3 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -1,7 +1,7 @@
 #include <iomanip>
 #include <iostream>
 
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/init.h"
 #include "aos/json_to_flatbuffer.h"
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
new file mode 100644
index 0000000..cb0f7c6
--- /dev/null
+++ b/aos/events/logging/log_writer.cc
@@ -0,0 +1,837 @@
+#include "aos/events/logging/log_writer.h"
+
+#include <functional>
+#include <map>
+#include <vector>
+
+#include "aos/configuration.h"
+#include "aos/events/event_loop.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/team_number.h"
+
+namespace aos {
+namespace logger {
+namespace {
+using message_bridge::RemoteMessage;
+}  // namespace
+
+Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
+               std::function<bool(const Channel *)> should_log)
+    : event_loop_(event_loop),
+      configuration_(configuration),
+      name_(network::GetHostname()),
+      timer_handler_(event_loop_->AddTimer(
+          [this]() { DoLogData(event_loop_->monotonic_now()); })),
+      server_statistics_fetcher_(
+          configuration::MultiNode(event_loop_->configuration())
+              ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
+                    "/aos")
+              : aos::Fetcher<message_bridge::ServerStatistics>()) {
+  VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
+
+  // Find all the nodes which are logging timestamps on our node.  This may
+  // over-estimate if should_log is specified.
+  std::vector<const Node *> timestamp_logger_nodes =
+      configuration::TimestampNodes(configuration_, event_loop_->node());
+
+  std::map<const Channel *, const Node *> timestamp_logger_channels;
+
+  // Now that we have all the nodes accumulated, make remote timestamp loggers
+  // for them.
+  for (const Node *node : timestamp_logger_nodes) {
+    // Note: since we are doing a find using the event loop channel, we need to
+    // make sure this channel pointer is part of the event loop configuration,
+    // not configuration_.  This only matters when configuration_ !=
+    // event_loop->configuration();
+    const Channel *channel = configuration::GetChannel(
+        event_loop->configuration(),
+        absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
+        RemoteMessage::GetFullyQualifiedName(), event_loop_->name(),
+        event_loop_->node());
+
+    CHECK(channel != nullptr)
+        << ": Remote timestamps are logged on "
+        << event_loop_->node()->name()->string_view()
+        << " but can't find channel /aos/remote_timestamps/"
+        << node->name()->string_view();
+    if (!should_log(channel)) {
+      continue;
+    }
+    timestamp_logger_channels.insert(std::make_pair(channel, node));
+  }
+
+  const size_t our_node_index =
+      configuration::GetNodeIndex(configuration_, event_loop_->node());
+
+  for (size_t channel_index = 0;
+       channel_index < configuration_->channels()->size(); ++channel_index) {
+    const Channel *const config_channel =
+        configuration_->channels()->Get(channel_index);
+    // The MakeRawFetcher method needs a channel which is in the event loop
+    // configuration() object, not the configuration_ object.  Go look that up
+    // from the config.
+    const Channel *channel = aos::configuration::GetChannel(
+        event_loop_->configuration(), config_channel->name()->string_view(),
+        config_channel->type()->string_view(), "", event_loop_->node());
+    CHECK(channel != nullptr)
+        << ": Failed to look up channel "
+        << aos::configuration::CleanedChannelToString(config_channel);
+    if (!should_log(channel)) {
+      continue;
+    }
+
+    FetcherStruct fs;
+    fs.channel_index = channel_index;
+    fs.channel = channel;
+
+    const bool is_local =
+        configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
+
+    const bool is_readable =
+        configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
+    const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
+        channel, event_loop_->node());
+    const bool log_message = is_logged && is_readable;
+
+    bool log_delivery_times = false;
+    if (event_loop_->node() != nullptr) {
+      log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+          channel, event_loop_->node(), event_loop_->node());
+    }
+
+    // Now, detect a RemoteMessage timestamp logger where we should just log the
+    // contents to a file directly.
+    const bool log_contents = timestamp_logger_channels.find(channel) !=
+                              timestamp_logger_channels.end();
+
+    if (log_message || log_delivery_times || log_contents) {
+      fs.fetcher = event_loop->MakeRawFetcher(channel);
+      VLOG(1) << "Logging channel "
+              << configuration::CleanedChannelToString(channel);
+
+      if (log_delivery_times) {
+        VLOG(1) << "  Delivery times";
+        fs.wants_timestamp_writer = true;
+        fs.timestamp_node_index = our_node_index;
+      }
+      if (log_message) {
+        VLOG(1) << "  Data";
+        fs.wants_writer = true;
+        if (!is_local) {
+          const Node *source_node = configuration::GetNode(
+              configuration_, channel->source_node()->string_view());
+          fs.data_node_index =
+              configuration::GetNodeIndex(configuration_, source_node);
+          fs.log_type = LogType::kLogRemoteMessage;
+        } else {
+          fs.data_node_index = our_node_index;
+        }
+      }
+      if (log_contents) {
+        VLOG(1) << "Timestamp logger channel "
+                << configuration::CleanedChannelToString(channel);
+        fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
+        fs.wants_contents_writer = true;
+        fs.contents_node_index =
+            configuration::GetNodeIndex(configuration_, fs.timestamp_node);
+      }
+      fetchers_.emplace_back(std::move(fs));
+    }
+  }
+
+  // When we are logging remote timestamps, we need to be able to translate from
+  // the channel index that the event loop uses to the channel index in the
+  // config in the log file.
+  event_loop_to_logged_channel_index_.resize(
+      event_loop->configuration()->channels()->size(), -1);
+  for (size_t event_loop_channel_index = 0;
+       event_loop_channel_index <
+       event_loop->configuration()->channels()->size();
+       ++event_loop_channel_index) {
+    const Channel *event_loop_channel =
+        event_loop->configuration()->channels()->Get(event_loop_channel_index);
+
+    const Channel *logged_channel = aos::configuration::GetChannel(
+        configuration_, event_loop_channel->name()->string_view(),
+        event_loop_channel->type()->string_view(), "",
+        configuration::GetNode(configuration_, event_loop_->node()));
+
+    if (logged_channel != nullptr) {
+      event_loop_to_logged_channel_index_[event_loop_channel_index] =
+          configuration::ChannelIndex(configuration_, logged_channel);
+    }
+  }
+}
+
+Logger::~Logger() {
+  if (log_namer_) {
+    // If we are replaying a log file, or in simulation, we want to force the
+    // last bit of data to be logged.  The easiest way to deal with this is to
+    // poll everything as we go to destroy the class, ie, shut down the logger,
+    // and write it to disk.
+    StopLogging(event_loop_->monotonic_now());
+  }
+}
+
+void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer,
+                          std::string_view log_start_uuid) {
+  CHECK(!log_namer_) << ": Already logging";
+  log_namer_ = std::move(log_namer);
+
+  std::string config_sha256;
+  if (separate_config_) {
+    flatbuffers::FlatBufferBuilder fbb;
+    flatbuffers::Offset<aos::Configuration> configuration_offset =
+        CopyFlatBuffer(configuration_, &fbb);
+    LogFileHeader::Builder log_file_header_builder(fbb);
+    log_file_header_builder.add_configuration(configuration_offset);
+    fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config_header(
+        fbb.Release());
+    config_sha256 = Sha256(config_header.span());
+    LOG(INFO) << "Config sha256 of " << config_sha256;
+    log_namer_->WriteConfiguration(&config_header, config_sha256);
+  }
+
+  log_event_uuid_ = UUID::Random();
+  log_start_uuid_ = log_start_uuid;
+  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+
+  // We want to do as much work as possible before the initial Fetch. Time
+  // between that and actually starting to log opens up the possibility of
+  // falling off the end of the queue during that time.
+
+  for (FetcherStruct &f : fetchers_) {
+    if (f.wants_writer) {
+      f.writer = log_namer_->MakeWriter(f.channel);
+    }
+    if (f.wants_timestamp_writer) {
+      f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+    }
+    if (f.wants_contents_writer) {
+      f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
+          f.channel, CHECK_NOTNULL(f.timestamp_node));
+    }
+  }
+
+  CHECK(node_state_.empty());
+  node_state_.resize(configuration::MultiNode(configuration_)
+                         ? configuration_->nodes()->size()
+                         : 1u);
+
+  for (const Node *node : log_namer_->nodes()) {
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
+
+    node_state_[node_index].log_file_header = MakeHeader(node, config_sha256);
+  }
+
+  // Grab data from each channel right before we declare the log file started
+  // so we can capture the latest message on each channel.  This lets us have
+  // non periodic messages with configuration that now get logged.
+  for (FetcherStruct &f : fetchers_) {
+    const auto start = event_loop_->monotonic_now();
+    const bool got_new = f.fetcher->Fetch();
+    const auto end = event_loop_->monotonic_now();
+    RecordFetchResult(start, end, got_new, &f);
+
+    // If there is a message, we want to write it.
+    f.written = f.fetcher->context().data == nullptr;
+  }
+
+  // Clear out any old timestamps in case we are re-starting logging.
+  for (size_t i = 0; i < node_state_.size(); ++i) {
+    SetStartTime(i, monotonic_clock::min_time, realtime_clock::min_time,
+                 monotonic_clock::min_time, realtime_clock::min_time);
+  }
+
+  WriteHeader();
+
+  LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
+            << " start_time " << last_synchronized_time_;
+
+  // Force logging up until the start of the log file now, so the messages at
+  // the start are always ordered before the rest of the messages.
+  // Note: this ship may have already sailed, but we don't have to make it
+  // worse.
+  // TODO(austin): Test...
+  LogUntil(last_synchronized_time_);
+
+  timer_handler_->Setup(event_loop_->monotonic_now() + polling_period_,
+                        polling_period_);
+}
+
+std::unique_ptr<LogNamer> Logger::StopLogging(
+    aos::monotonic_clock::time_point end_time) {
+  CHECK(log_namer_) << ": Not logging right now";
+
+  if (end_time != aos::monotonic_clock::min_time) {
+    LogUntil(end_time);
+  }
+  timer_handler_->Disable();
+
+  for (FetcherStruct &f : fetchers_) {
+    f.writer = nullptr;
+    f.timestamp_writer = nullptr;
+    f.contents_writer = nullptr;
+  }
+  node_state_.clear();
+
+  log_event_uuid_ = UUID::Zero();
+  log_start_uuid_ = std::string();
+
+  return std::move(log_namer_);
+}
+
+void Logger::WriteHeader() {
+  if (configuration::MultiNode(configuration_)) {
+    server_statistics_fetcher_.Fetch();
+  }
+
+  aos::monotonic_clock::time_point monotonic_start_time =
+      event_loop_->monotonic_now();
+  aos::realtime_clock::time_point realtime_start_time =
+      event_loop_->realtime_now();
+
+  // We need to pick a point in time to declare the log file "started".  This
+  // starts here.  It needs to be after everything is fetched so that the
+  // fetchers are all pointed at the most recent message before the start
+  // time.
+  last_synchronized_time_ = monotonic_start_time;
+
+  for (const Node *node : log_namer_->nodes()) {
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
+    MaybeUpdateTimestamp(node, node_index, monotonic_start_time,
+                         realtime_start_time);
+    MaybeWriteHeader(node_index, node);
+  }
+}
+
+void Logger::MaybeWriteHeader(int node_index) {
+  if (configuration::MultiNode(configuration_)) {
+    return MaybeWriteHeader(node_index,
+                            configuration_->nodes()->Get(node_index));
+  } else {
+    return MaybeWriteHeader(node_index, nullptr);
+  }
+}
+
+void Logger::MaybeWriteHeader(int node_index, const Node *node) {
+  // This function is responsible for writing the header when the header both
+  // has valid data, and when it needs to be written.
+  if (node_state_[node_index].header_written &&
+      node_state_[node_index].header_valid) {
+    // The header has been written and is valid, nothing to do.
+    return;
+  }
+  if (!node_state_[node_index].has_source_node_boot_uuid) {
+    // Can't write a header if we don't have the boot UUID.
+    return;
+  }
+
+  // WriteHeader writes the first header in a log file.  We want to do this only
+  // once.
+  //
+  // Rotate rewrites the same header with a new part ID, but keeps the same part
+  // UUID.  We don't want that when things reboot, because that implies that
+  // parts go together across a reboot.
+  //
+  // Reboot resets the parts UUID.  So, once we've written a header the first
+  // time, we want to use Reboot to rotate the log and reset the parts UUID.
+  //
+  // header_valid is cleared whenever the remote reboots.
+  if (node_state_[node_index].header_written) {
+    log_namer_->Reboot(node, &node_state_[node_index].log_file_header);
+  } else {
+    log_namer_->WriteHeader(&node_state_[node_index].log_file_header, node);
+
+    node_state_[node_index].header_written = true;
+  }
+  node_state_[node_index].header_valid = true;
+}
+
+void Logger::WriteMissingTimestamps() {
+  if (configuration::MultiNode(configuration_)) {
+    server_statistics_fetcher_.Fetch();
+  } else {
+    return;
+  }
+
+  if (server_statistics_fetcher_.get() == nullptr) {
+    return;
+  }
+
+  for (const Node *node : log_namer_->nodes()) {
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
+    if (MaybeUpdateTimestamp(
+            node, node_index,
+            server_statistics_fetcher_.context().monotonic_event_time,
+            server_statistics_fetcher_.context().realtime_event_time)) {
+      CHECK(node_state_[node_index].header_written);
+      CHECK(node_state_[node_index].header_valid);
+      log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+    } else {
+      MaybeWriteHeader(node_index, node);
+    }
+  }
+}
+
+void Logger::SetStartTime(
+    size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
+    aos::realtime_clock::time_point realtime_start_time,
+    aos::monotonic_clock::time_point logger_monotonic_start_time,
+    aos::realtime_clock::time_point logger_realtime_start_time) {
+  node_state_[node_index].monotonic_start_time = monotonic_start_time;
+  node_state_[node_index].realtime_start_time = realtime_start_time;
+  node_state_[node_index]
+      .log_file_header.mutable_message()
+      ->mutate_monotonic_start_time(
+          std::chrono::duration_cast<std::chrono::nanoseconds>(
+              monotonic_start_time.time_since_epoch())
+              .count());
+
+  // Add logger start times if they are available in the log file header.
+  if (node_state_[node_index]
+          .log_file_header.mutable_message()
+          ->has_logger_monotonic_start_time()) {
+    node_state_[node_index]
+        .log_file_header.mutable_message()
+        ->mutate_logger_monotonic_start_time(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                logger_monotonic_start_time.time_since_epoch())
+                .count());
+  }
+
+  if (node_state_[node_index]
+          .log_file_header.mutable_message()
+          ->has_logger_realtime_start_time()) {
+    node_state_[node_index]
+        .log_file_header.mutable_message()
+        ->mutate_logger_realtime_start_time(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                logger_realtime_start_time.time_since_epoch())
+                .count());
+  }
+
+  if (node_state_[node_index]
+          .log_file_header.mutable_message()
+          ->has_realtime_start_time()) {
+    node_state_[node_index]
+        .log_file_header.mutable_message()
+        ->mutate_realtime_start_time(
+            std::chrono::duration_cast<std::chrono::nanoseconds>(
+                realtime_start_time.time_since_epoch())
+                .count());
+  }
+}
+
+bool Logger::MaybeUpdateTimestamp(
+    const Node *node, int node_index,
+    aos::monotonic_clock::time_point monotonic_start_time,
+    aos::realtime_clock::time_point realtime_start_time) {
+  // Bail early if the start times are already set.
+  if (node_state_[node_index].monotonic_start_time !=
+      monotonic_clock::min_time) {
+    return false;
+  }
+  if (event_loop_->node() == node ||
+      !configuration::MultiNode(configuration_)) {
+    // There are no offsets to compute for ourself, so always succeed.
+    SetStartTime(node_index, monotonic_start_time, realtime_start_time,
+                 monotonic_start_time, realtime_start_time);
+    node_state_[node_index].SetBootUUID(event_loop_->boot_uuid().string_view());
+    return true;
+  } else if (server_statistics_fetcher_.get() != nullptr) {
+    // We must be a remote node now.  Look for the connection and see if it is
+    // connected.
+
+    for (const message_bridge::ServerConnection *connection :
+         *server_statistics_fetcher_->connections()) {
+      if (connection->node()->name()->string_view() !=
+          node->name()->string_view()) {
+        continue;
+      }
+
+      if (connection->state() != message_bridge::State::CONNECTED) {
+        VLOG(1) << node->name()->string_view()
+                << " is not connected, can't start it yet.";
+        break;
+      }
+
+      // Update the boot UUID as soon as we know we are connected.
+      if (!connection->has_boot_uuid()) {
+        VLOG(1) << "Missing boot_uuid for node " << aos::FlatbufferToJson(node);
+        break;
+      }
+
+      if (!node_state_[node_index].has_source_node_boot_uuid ||
+          node_state_[node_index].source_node_boot_uuid !=
+              connection->boot_uuid()->string_view()) {
+        node_state_[node_index].SetBootUUID(
+            connection->boot_uuid()->string_view());
+      }
+
+      if (!connection->has_monotonic_offset()) {
+        VLOG(1) << "Missing monotonic offset for setting start time for node "
+                << aos::FlatbufferToJson(node);
+        break;
+      }
+
+      // Found it and it is connected.  Compensate and go.
+      SetStartTime(node_index,
+                   monotonic_start_time +
+                       std::chrono::nanoseconds(connection->monotonic_offset()),
+                   realtime_start_time, monotonic_start_time,
+                   realtime_start_time);
+      return true;
+    }
+  }
+  return false;
+}
+
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> Logger::MakeHeader(
+    const Node *node, std::string_view config_sha256) {
+  // Now write the header with this timestamp in it.
+  flatbuffers::FlatBufferBuilder fbb;
+  fbb.ForceDefaults(true);
+
+  flatbuffers::Offset<aos::Configuration> configuration_offset;
+  if (!separate_config_) {
+    configuration_offset = CopyFlatBuffer(configuration_, &fbb);
+  } else {
+    CHECK(!config_sha256.empty());
+  }
+
+  const flatbuffers::Offset<flatbuffers::String> name_offset =
+      fbb.CreateString(name_);
+
+  CHECK(log_event_uuid_ != UUID::Zero());
+  const flatbuffers::Offset<flatbuffers::String> log_event_uuid_offset =
+      fbb.CreateString(log_event_uuid_.string_view());
+
+  const flatbuffers::Offset<flatbuffers::String> logger_instance_uuid_offset =
+      fbb.CreateString(logger_instance_uuid_.string_view());
+
+  flatbuffers::Offset<flatbuffers::String> log_start_uuid_offset;
+  if (!log_start_uuid_.empty()) {
+    log_start_uuid_offset = fbb.CreateString(log_start_uuid_);
+  }
+
+  flatbuffers::Offset<flatbuffers::String> config_sha256_offset;
+  if (!config_sha256.empty()) {
+    config_sha256_offset = fbb.CreateString(config_sha256);
+  }
+
+  const flatbuffers::Offset<flatbuffers::String> logger_node_boot_uuid_offset =
+      fbb.CreateString(event_loop_->boot_uuid().string_view());
+
+  const flatbuffers::Offset<flatbuffers::String> source_node_boot_uuid_offset =
+      fbb.CreateString(event_loop_->boot_uuid().string_view());
+
+  const flatbuffers::Offset<flatbuffers::String> parts_uuid_offset =
+      fbb.CreateString("00000000-0000-4000-8000-000000000000");
+
+  flatbuffers::Offset<Node> node_offset;
+  flatbuffers::Offset<Node> logger_node_offset;
+
+  if (configuration::MultiNode(configuration_)) {
+    node_offset = RecursiveCopyFlatBuffer(node, &fbb);
+    logger_node_offset = RecursiveCopyFlatBuffer(event_loop_->node(), &fbb);
+  }
+
+  aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
+
+  log_file_header_builder.add_name(name_offset);
+
+  // Only add the node if we are running in a multinode configuration.
+  if (node != nullptr) {
+    log_file_header_builder.add_node(node_offset);
+    log_file_header_builder.add_logger_node(logger_node_offset);
+  }
+
+  if (!configuration_offset.IsNull()) {
+    log_file_header_builder.add_configuration(configuration_offset);
+  }
+  // The worst case theoretical out of order is the polling period times 2.
+  // One message could get logged right after the boundary, but be for right
+  // before the next boundary.  And the reverse could happen for another
+  // message.  Report back 3x to be extra safe, and because the cost isn't
+  // huge on the read side.
+  log_file_header_builder.add_max_out_of_order_duration(
+      std::chrono::nanoseconds(3 * polling_period_).count());
+
+  log_file_header_builder.add_monotonic_start_time(
+      std::chrono::duration_cast<std::chrono::nanoseconds>(
+          monotonic_clock::min_time.time_since_epoch())
+          .count());
+  if (node == event_loop_->node()) {
+    log_file_header_builder.add_realtime_start_time(
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            realtime_clock::min_time.time_since_epoch())
+            .count());
+  } else {
+    log_file_header_builder.add_logger_monotonic_start_time(
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            monotonic_clock::min_time.time_since_epoch())
+            .count());
+    log_file_header_builder.add_logger_realtime_start_time(
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            realtime_clock::min_time.time_since_epoch())
+            .count());
+  }
+
+  log_file_header_builder.add_log_event_uuid(log_event_uuid_offset);
+  log_file_header_builder.add_logger_instance_uuid(logger_instance_uuid_offset);
+  if (!log_start_uuid_offset.IsNull()) {
+    log_file_header_builder.add_log_start_uuid(log_start_uuid_offset);
+  }
+  log_file_header_builder.add_logger_node_boot_uuid(
+      logger_node_boot_uuid_offset);
+  log_file_header_builder.add_source_node_boot_uuid(
+      source_node_boot_uuid_offset);
+
+  log_file_header_builder.add_parts_uuid(parts_uuid_offset);
+  log_file_header_builder.add_parts_index(0);
+
+  log_file_header_builder.add_configuration_sha256(0);
+
+  if (!config_sha256_offset.IsNull()) {
+    log_file_header_builder.add_configuration_sha256(config_sha256_offset);
+  }
+
+  fbb.FinishSizePrefixed(log_file_header_builder.Finish());
+  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
+      fbb.Release());
+
+  CHECK(result.Verify()) << ": Built a corrupted header.";
+
+  return result;
+}
+
+void Logger::ResetStatisics() {
+  max_message_fetch_time_ = std::chrono::nanoseconds::zero();
+  max_message_fetch_time_channel_ = -1;
+  max_message_fetch_time_size_ = -1;
+  total_message_fetch_time_ = std::chrono::nanoseconds::zero();
+  total_message_fetch_count_ = 0;
+  total_message_fetch_bytes_ = 0;
+  total_nop_fetch_time_ = std::chrono::nanoseconds::zero();
+  total_nop_fetch_count_ = 0;
+  max_copy_time_ = std::chrono::nanoseconds::zero();
+  max_copy_time_channel_ = -1;
+  max_copy_time_size_ = -1;
+  total_copy_time_ = std::chrono::nanoseconds::zero();
+  total_copy_count_ = 0;
+  total_copy_bytes_ = 0;
+}
+
+void Logger::Rotate() {
+  for (const Node *node : log_namer_->nodes()) {
+    const int node_index = configuration::GetNodeIndex(configuration_, node);
+    log_namer_->Rotate(node, &node_state_[node_index].log_file_header);
+  }
+}
+
+void Logger::LogUntil(monotonic_clock::time_point t) {
+  // Grab the latest ServerStatistics message.  This will always have the
+  // oppertunity to be >= to the current time, so it will always represent any
+  // reboots which may have happened.
+  WriteMissingTimestamps();
+
+  // Write each channel to disk, one at a time.
+  for (FetcherStruct &f : fetchers_) {
+    while (true) {
+      if (f.written) {
+        const auto start = event_loop_->monotonic_now();
+        const bool got_new = f.fetcher->FetchNext();
+        const auto end = event_loop_->monotonic_now();
+        RecordFetchResult(start, end, got_new, &f);
+        if (!got_new) {
+          VLOG(2) << "No new data on "
+                  << configuration::CleanedChannelToString(
+                         f.fetcher->channel());
+          break;
+        }
+        f.written = false;
+      }
+
+      // TODO(james): Write tests to exercise this logic.
+      if (f.fetcher->context().monotonic_event_time >= t) {
+        break;
+      }
+      if (f.writer != nullptr) {
+        // Write!
+        const auto start = event_loop_->monotonic_now();
+        flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
+                                           max_header_size_);
+        fbb.ForceDefaults(true);
+
+        fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                           f.channel_index, f.log_type));
+        const auto end = event_loop_->monotonic_now();
+        RecordCreateMessageTime(start, end, &f);
+
+        VLOG(2) << "Writing data as node "
+                << FlatbufferToJson(event_loop_->node()) << " for channel "
+                << configuration::CleanedChannelToString(f.fetcher->channel())
+                << " to " << f.writer->filename() << " data "
+                << FlatbufferToJson(
+                       flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                           fbb.GetBufferPointer()));
+
+        max_header_size_ = std::max(max_header_size_,
+                                    fbb.GetSize() - f.fetcher->context().size);
+        CHECK(node_state_[f.data_node_index].header_valid)
+            << ": Can't write data before the header on channel "
+            << configuration::CleanedChannelToString(f.fetcher->channel());
+        f.writer->QueueSizedFlatbuffer(&fbb);
+      }
+
+      if (f.timestamp_writer != nullptr) {
+        // And now handle timestamps.
+        const auto start = event_loop_->monotonic_now();
+        flatbuffers::FlatBufferBuilder fbb;
+        fbb.ForceDefaults(true);
+
+        fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+                                           f.channel_index,
+                                           LogType::kLogDeliveryTimeOnly));
+        const auto end = event_loop_->monotonic_now();
+        RecordCreateMessageTime(start, end, &f);
+
+        VLOG(2) << "Writing timestamps as node "
+                << FlatbufferToJson(event_loop_->node()) << " for channel "
+                << configuration::CleanedChannelToString(f.fetcher->channel())
+                << " to " << f.timestamp_writer->filename() << " timestamp "
+                << FlatbufferToJson(
+                       flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+                           fbb.GetBufferPointer()));
+
+        CHECK(node_state_[f.timestamp_node_index].header_valid)
+            << ": Can't write data before the header on channel "
+            << configuration::CleanedChannelToString(f.fetcher->channel());
+        f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+      }
+
+      if (f.contents_writer != nullptr) {
+        const auto start = event_loop_->monotonic_now();
+        // And now handle the special message contents channel.  Copy the
+        // message into a FlatBufferBuilder and save it to disk.
+        // TODO(austin): We can be more efficient here when we start to
+        // care...
+        flatbuffers::FlatBufferBuilder fbb;
+        fbb.ForceDefaults(true);
+
+        const RemoteMessage *msg =
+            flatbuffers::GetRoot<RemoteMessage>(f.fetcher->context().data);
+
+        CHECK(msg->has_boot_uuid()) << ": " << aos::FlatbufferToJson(msg);
+        if (!node_state_[f.contents_node_index].has_source_node_boot_uuid ||
+            node_state_[f.contents_node_index].source_node_boot_uuid !=
+                msg->boot_uuid()->string_view()) {
+          node_state_[f.contents_node_index].SetBootUUID(
+              msg->boot_uuid()->string_view());
+
+          MaybeWriteHeader(f.contents_node_index);
+        }
+
+        logger::MessageHeader::Builder message_header_builder(fbb);
+
+        // TODO(austin): This needs to check the channel_index and confirm
+        // that it should be logged before squirreling away the timestamp to
+        // disk.  We don't want to log irrelevant timestamps.
+
+        // Note: this must match the same order as MessageBridgeServer and
+        // PackMessage.  We want identical headers to have identical
+        // on-the-wire formats to make comparing them easier.
+
+        // Translate from the channel index that the event loop uses to the
+        // channel index in the log file.
+        message_header_builder.add_channel_index(
+            event_loop_to_logged_channel_index_[msg->channel_index()]);
+
+        message_header_builder.add_queue_index(msg->queue_index());
+        message_header_builder.add_monotonic_sent_time(
+            msg->monotonic_sent_time());
+        message_header_builder.add_realtime_sent_time(
+            msg->realtime_sent_time());
+
+        message_header_builder.add_monotonic_remote_time(
+            msg->monotonic_remote_time());
+        message_header_builder.add_realtime_remote_time(
+            msg->realtime_remote_time());
+        message_header_builder.add_remote_queue_index(
+            msg->remote_queue_index());
+
+        message_header_builder.add_monotonic_timestamp_time(
+            f.fetcher->context()
+                .monotonic_event_time.time_since_epoch()
+                .count());
+
+        fbb.FinishSizePrefixed(message_header_builder.Finish());
+        const auto end = event_loop_->monotonic_now();
+        RecordCreateMessageTime(start, end, &f);
+
+        CHECK(node_state_[f.contents_node_index].header_valid)
+            << ": Can't write data before the header on channel "
+            << configuration::CleanedChannelToString(f.fetcher->channel());
+        f.contents_writer->QueueSizedFlatbuffer(&fbb);
+      }
+
+      f.written = true;
+    }
+  }
+  last_synchronized_time_ = t;
+}
+
+void Logger::DoLogData(const monotonic_clock::time_point end_time) {
+  // We want to guarantee that messages aren't out of order by more than
+  // max_out_of_order_duration.  To do this, we need sync points.  Every write
+  // cycle should be a sync point.
+
+  do {
+    // Move the sync point up by at most polling_period.  This forces one sync
+    // per iteration, even if it is small.
+    LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
+
+    on_logged_period_();
+
+    // If we missed cycles, we could be pretty far behind.  Spin until we are
+    // caught up.
+  } while (last_synchronized_time_ + polling_period_ < end_time);
+}
+
+void Logger::RecordFetchResult(aos::monotonic_clock::time_point start,
+                               aos::monotonic_clock::time_point end,
+                               bool got_new, FetcherStruct *fetcher) {
+  const auto duration = end - start;
+  if (!got_new) {
+    ++total_nop_fetch_count_;
+    total_nop_fetch_time_ += duration;
+    return;
+  }
+  ++total_message_fetch_count_;
+  total_message_fetch_bytes_ += fetcher->fetcher->context().size;
+  total_message_fetch_time_ += duration;
+  if (duration > max_message_fetch_time_) {
+    max_message_fetch_time_ = duration;
+    max_message_fetch_time_channel_ = fetcher->channel_index;
+    max_message_fetch_time_size_ = fetcher->fetcher->context().size;
+  }
+}
+
+void Logger::RecordCreateMessageTime(aos::monotonic_clock::time_point start,
+                                     aos::monotonic_clock::time_point end,
+                                     FetcherStruct *fetcher) {
+  const auto duration = end - start;
+  total_copy_time_ += duration;
+  ++total_copy_count_;
+  total_copy_bytes_ += fetcher->fetcher->context().size;
+  if (duration > max_copy_time_) {
+    max_copy_time_ = duration;
+    max_copy_time_channel_ = fetcher->channel_index;
+    max_copy_time_size_ = fetcher->fetcher->context().size;
+  }
+}
+
+}  // namespace logger
+}  // namespace aos
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
new file mode 100644
index 0000000..4c7d39f
--- /dev/null
+++ b/aos/events/logging/log_writer.h
@@ -0,0 +1,323 @@
+#ifndef AOS_EVENTS_LOGGING_LOG_WRITER_H_
+#define AOS_EVENTS_LOGGING_LOG_WRITER_H_
+
+#include <chrono>
+#include <string_view>
+#include <vector>
+
+#include "aos/events/event_loop.h"
+#include "aos/events/logging/log_namer.h"
+#include "aos/events/logging/logfile_utils.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/uuid.h"
+#include "aos/events/simulated_event_loop.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/remote_message_generated.h"
+#include "aos/time/time.h"
+#include "flatbuffers/flatbuffers.h"
+
+namespace aos {
+namespace logger {
+
+// Logs all channels available in the event loop to disk every 100 ms.
+// Start by logging one message per channel to capture any state and
+// configuration that is sent rately on a channel and would affect execution.
+class Logger {
+ public:
+  // Constructs a logger.
+  //   event_loop: The event loop used to read the messages.
+  //   configuration: When provided, this is the configuration to log, and the
+  //     configuration to use for the channel list to log.  If not provided,
+  //     this becomes the configuration from the event loop.
+  //   should_log: When provided, a filter for channels to log. If not provided,
+  //     all available channels are logged.
+  Logger(EventLoop *event_loop)
+      : Logger(event_loop, event_loop->configuration()) {}
+  Logger(EventLoop *event_loop, const Configuration *configuration)
+      : Logger(event_loop, configuration,
+               [](const Channel *) { return true; }) {}
+  Logger(EventLoop *event_loop, const Configuration *configuration,
+         std::function<bool(const Channel *)> should_log);
+  ~Logger();
+
+  // Overrides the name in the log file header.
+  void set_name(std::string_view name) { name_ = name; }
+
+  // Sets the callback to run after each period of data is logged. Defaults to
+  // doing nothing.
+  //
+  // This callback may safely do things like call Rotate().
+  void set_on_logged_period(std::function<void()> on_logged_period) {
+    on_logged_period_ = std::move(on_logged_period);
+  }
+
+  void set_separate_config(bool separate_config) {
+    separate_config_ = separate_config;
+  }
+
+  // Sets the period between polling the data. Defaults to 100ms.
+  //
+  // Changing this while a set of files is being written may result in
+  // unreadable files.
+  void set_polling_period(std::chrono::nanoseconds polling_period) {
+    polling_period_ = polling_period;
+  }
+
+  std::string_view log_start_uuid() const { return log_start_uuid_; }
+  UUID logger_instance_uuid() const { return logger_instance_uuid_; }
+
+  // The maximum time for a single fetch which returned a message, or 0 if none
+  // of those have happened.
+  std::chrono::nanoseconds max_message_fetch_time() const {
+    return max_message_fetch_time_;
+  }
+  // The channel for that longest fetch which returned a message, or -1 if none
+  // of those have happened.
+  int max_message_fetch_time_channel() const {
+    return max_message_fetch_time_channel_;
+  }
+  // The size of the message returned by that longest fetch, or -1 if none of
+  // those have happened.
+  int max_message_fetch_time_size() const {
+    return max_message_fetch_time_size_;
+  }
+  // The total time spent fetching messages.
+  std::chrono::nanoseconds total_message_fetch_time() const {
+    return total_message_fetch_time_;
+  }
+  // The total number of fetch calls which returned messages.
+  int total_message_fetch_count() const { return total_message_fetch_count_; }
+  // The total number of bytes fetched.
+  int64_t total_message_fetch_bytes() const {
+    return total_message_fetch_bytes_;
+  }
+
+  // The total time spent in fetches which did not return a message.
+  std::chrono::nanoseconds total_nop_fetch_time() const {
+    return total_nop_fetch_time_;
+  }
+  // The total number of fetches which did not return a message.
+  int total_nop_fetch_count() const { return total_nop_fetch_count_; }
+
+  // The maximum time for a single copy, or 0 if none of those have happened.
+  std::chrono::nanoseconds max_copy_time() const { return max_copy_time_; }
+  // The channel for that longest copy, or -1 if none of those have happened.
+  int max_copy_time_channel() const { return max_copy_time_channel_; }
+  // The size of the message for that longest copy, or -1 if none of those have
+  // happened.
+  int max_copy_time_size() const { return max_copy_time_size_; }
+  // The total time spent copying messages.
+  std::chrono::nanoseconds total_copy_time() const { return total_copy_time_; }
+  // The total number of messages copied.
+  int total_copy_count() const { return total_copy_count_; }
+  // The total number of bytes copied.
+  int64_t total_copy_bytes() const { return total_copy_bytes_; }
+
+  void ResetStatisics();
+
+  // Rotates the log file(s), triggering new part files to be written for each
+  // log file.
+  void Rotate();
+
+  // Starts logging to files with the given naming scheme.
+  //
+  // log_start_uuid may be used to tie this log event to other log events across
+  // multiple nodes. The default (empty string) indicates there isn't one
+  // available.
+  void StartLogging(std::unique_ptr<LogNamer> log_namer,
+                    std::string_view log_start_uuid = "");
+
+  // Stops logging. Ensures any messages through end_time make it into the log.
+  //
+  // If you want to stop ASAP, pass min_time to avoid reading any more messages.
+  //
+  // Returns the LogNamer in case the caller wants to do anything else with it
+  // before destroying it.
+  std::unique_ptr<LogNamer> StopLogging(
+      aos::monotonic_clock::time_point end_time);
+
+  // Returns whether a log is currently being written.
+  bool is_started() const { return static_cast<bool>(log_namer_); }
+
+  // Shortcut to call StartLogging with a LocalLogNamer when event processing
+  // starts.
+  void StartLoggingLocalNamerOnRun(std::string base_name) {
+    event_loop_->OnRun([this, base_name]() {
+      StartLogging(
+          std::make_unique<LocalLogNamer>(base_name, event_loop_->node()));
+    });
+  }
+
+ private:
+  // Structure to track both a fetcher, and if the data fetched has been
+  // written.  We may want to delay writing data to disk so that we don't let
+  // data get too far out of order when written to disk so we can avoid making
+  // it too hard to sort when reading.
+  struct FetcherStruct {
+    std::unique_ptr<RawFetcher> fetcher;
+    bool written = false;
+
+    // Channel index to log to.
+    int channel_index = -1;
+    const Channel *channel = nullptr;
+    const Node *timestamp_node = nullptr;
+
+    LogType log_type = LogType::kLogMessage;
+
+    // We fill out the metadata at construction, but the actual writers have to
+    // be updated each time we start logging. To avoid duplicating the complex
+    // logic determining whether each writer should be initialized, we just
+    // stash the answer in separate member variables.
+    bool wants_writer = false;
+    DetachedBufferWriter *writer = nullptr;
+    bool wants_timestamp_writer = false;
+    DetachedBufferWriter *timestamp_writer = nullptr;
+    bool wants_contents_writer = false;
+    DetachedBufferWriter *contents_writer = nullptr;
+
+    // Node which this data is from, or -1 if it is unknown.
+    int data_node_index = -1;
+    // Node that this timestamp is for, or -1 if it is known.
+    int timestamp_node_index = -1;
+    // Node that the contents this contents_writer will log are from.
+    int contents_node_index = -1;
+  };
+
+  // Vector mapping from the channel index from the event loop to the logged
+  // channel index.
+  std::vector<int> event_loop_to_logged_channel_index_;
+
+  struct NodeState {
+    aos::monotonic_clock::time_point monotonic_start_time =
+        aos::monotonic_clock::min_time;
+    aos::realtime_clock::time_point realtime_start_time =
+        aos::realtime_clock::min_time;
+
+    bool has_source_node_boot_uuid = false;
+
+    // This is an initial UUID that is a valid UUID4 and is pretty obvious that
+    // it isn't valid.
+    std::string source_node_boot_uuid = "00000000-0000-4000-8000-000000000000";
+
+    aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
+        aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
+
+    // True if a header has been written to the start of a log file.
+    bool header_written = false;
+    // True if the current written header represents the contents which will
+    // follow.  This is cleared when boot_uuid is known to not match anymore.
+    bool header_valid = false;
+
+    // Sets the source_node_boot_uuid, properly updating everything.
+    void SetBootUUID(std::string_view new_source_node_boot_uuid) {
+      source_node_boot_uuid = new_source_node_boot_uuid;
+      header_valid = false;
+      has_source_node_boot_uuid = true;
+
+      flatbuffers::String *source_node_boot_uuid_string =
+          log_file_header.mutable_message()->mutable_source_node_boot_uuid();
+      CHECK_EQ(source_node_boot_uuid.size(),
+               source_node_boot_uuid_string->size());
+      memcpy(source_node_boot_uuid_string->data(), source_node_boot_uuid.data(),
+             source_node_boot_uuid.size());
+    }
+  };
+
+  void WriteHeader();
+
+  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+      const Node *node, std::string_view config_sha256);
+
+  // Writes the header for the provided node if enough information is valid.
+  void MaybeWriteHeader(int node_index);
+  // Overload for when we already know node as well.
+  void MaybeWriteHeader(int node_index, const Node *node);
+
+  bool MaybeUpdateTimestamp(
+      const Node *node, int node_index,
+      aos::monotonic_clock::time_point monotonic_start_time,
+      aos::realtime_clock::time_point realtime_start_time);
+
+  void DoLogData(const monotonic_clock::time_point end_time);
+
+  void WriteMissingTimestamps();
+
+  // Fetches from each channel until all the data is logged.
+  void LogUntil(monotonic_clock::time_point t);
+
+  void RecordFetchResult(aos::monotonic_clock::time_point start,
+                         aos::monotonic_clock::time_point end, bool got_new,
+                         FetcherStruct *fetcher);
+
+  void RecordCreateMessageTime(aos::monotonic_clock::time_point start,
+                               aos::monotonic_clock::time_point end,
+                               FetcherStruct *fetcher);
+
+  // Sets the start time for a specific node.
+  void SetStartTime(
+      size_t node_index, aos::monotonic_clock::time_point monotonic_start_time,
+      aos::realtime_clock::time_point realtime_start_time,
+      aos::monotonic_clock::time_point logger_monotonic_start_time,
+      aos::realtime_clock::time_point logger_realtime_start_time);
+
+  EventLoop *const event_loop_;
+  // The configuration to place at the top of the log file.
+  const Configuration *const configuration_;
+
+  UUID log_event_uuid_ = UUID::Zero();
+  const UUID logger_instance_uuid_ = UUID::Random();
+  std::unique_ptr<LogNamer> log_namer_;
+  // Empty indicates there isn't one.
+  std::string log_start_uuid_;
+
+  // Name to save in the log file.  Defaults to hostname.
+  std::string name_;
+
+  std::function<void()> on_logged_period_ = []() {};
+
+  std::chrono::nanoseconds max_message_fetch_time_ =
+      std::chrono::nanoseconds::zero();
+  int max_message_fetch_time_channel_ = -1;
+  int max_message_fetch_time_size_ = -1;
+  std::chrono::nanoseconds total_message_fetch_time_ =
+      std::chrono::nanoseconds::zero();
+  int total_message_fetch_count_ = 0;
+  int64_t total_message_fetch_bytes_ = 0;
+
+  std::chrono::nanoseconds total_nop_fetch_time_ =
+      std::chrono::nanoseconds::zero();
+  int total_nop_fetch_count_ = 0;
+
+  std::chrono::nanoseconds max_copy_time_ = std::chrono::nanoseconds::zero();
+  int max_copy_time_channel_ = -1;
+  int max_copy_time_size_ = -1;
+  std::chrono::nanoseconds total_copy_time_ = std::chrono::nanoseconds::zero();
+  int total_copy_count_ = 0;
+  int64_t total_copy_bytes_ = 0;
+
+  std::vector<FetcherStruct> fetchers_;
+  TimerHandler *timer_handler_;
+
+  // Period to poll the channels.
+  std::chrono::nanoseconds polling_period_ = std::chrono::milliseconds(100);
+
+  // Last time that data was written for all channels to disk.
+  monotonic_clock::time_point last_synchronized_time_;
+
+  // Max size that the header has consumed.  This much extra data will be
+  // reserved in the builder to avoid reallocating.
+  size_t max_header_size_ = 0;
+
+  // If true, write the message header into a separate file.
+  bool separate_config_ = true;
+
+  // Fetcher for all the statistics from all the nodes.
+  aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
+
+  std::vector<NodeState> node_state_;
+};
+
+}  // namespace logger
+}  // namespace aos
+
+#endif  // AOS_EVENTS_LOGGING_LOG_WRITER_H_
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index da84cc9..dd1d8a2 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -2,7 +2,7 @@
 #include <sys/time.h>
 
 #include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
 #include "aos/events/shm_event_loop.h"
 #include "aos/init.h"
 #include "aos/logging/log_namer.h"
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 2929198..bd41663 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -1,7 +1,8 @@
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
 
 #include "absl/strings/str_format.h"
 #include "aos/events/event_loop.h"
+#include "aos/events/logging/log_writer.h"
 #include "aos/events/message_counter.h"
 #include "aos/events/ping_lib.h"
 #include "aos/events/pong_lib.h"
@@ -32,6 +33,21 @@
 constexpr std::string_view kConfigSha1(
     "47511a1906dbb59cf9f8ad98ad08e568c718a4deb204c8bbce81ff76cef9095c");
 
+std::vector<std::vector<std::string>> ToLogReaderVector(
+    const std::vector<LogFile> &log_files) {
+  std::vector<std::vector<std::string>> result;
+  for (const LogFile &log_file : log_files) {
+    for (const LogParts &log_parts : log_file.parts) {
+      std::vector<std::string> parts;
+      for (const std::string &part : log_parts.parts) {
+        parts.emplace_back(part);
+      }
+      result.emplace_back(std::move(parts));
+    }
+  }
+  return result;
+}
+
 class LoggerTest : public ::testing::Test {
  public:
   LoggerTest()
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 3cb5c26..85bbf18 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -197,7 +197,7 @@
         ":timestamp_fbs",
         "//aos:unique_malloc_ptr",
         "//aos/events:shm_event_loop",
-        "//aos/events/logging:logger",
+        "//aos/events/logging:log_reader",
         "//third_party/lksctp-tools:sctp",
     ],
 )
@@ -275,7 +275,7 @@
         ":sctp_client",
         ":timestamp_fbs",
         "//aos/events:shm_event_loop",
-        "//aos/events/logging:logger",
+        "//aos/events/logging:log_reader",
     ],
 )
 
@@ -448,7 +448,7 @@
         ":web_proxy",
         "//aos:init",
         "//aos/events:simulated_event_loop",
-        "//aos/events/logging:logger",
+        "//aos/events/logging:log_reader",
         "@com_github_google_flatbuffers//:flatbuffers",
     ],
 )
diff --git a/aos/network/log_web_proxy_main.cc b/aos/network/log_web_proxy_main.cc
index 1942c57..5c099a2 100644
--- a/aos/network/log_web_proxy_main.cc
+++ b/aos/network/log_web_proxy_main.cc
@@ -4,7 +4,7 @@
 // And then opening the plotting webpage at http://localhost:8080/graph.html
 
 #include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/flatbuffer_merge.h"
 #include "aos/init.h"
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index 38c3831..8a0ec34 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -3,7 +3,7 @@
 #include <chrono>
 #include <string_view>
 
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/shm_event_loop.h"
 #include "aos/network/connect_generated.h"
 #include "aos/network/message_bridge_client_generated.h"
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 1ac6cb5..aaeecff 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -2,7 +2,7 @@
 
 #include "absl/strings/str_cat.h"
 #include "absl/types/span.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/shm_event_loop.h"
 #include "aos/network/connect_generated.h"
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index 55f3fea..fea2d06 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -4,7 +4,7 @@
 #include <deque>
 
 #include "absl/types/span.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/events/shm_event_loop.h"
 #include "aos/network/connect_generated.h"
diff --git a/frc971/analysis/BUILD b/frc971/analysis/BUILD
index 2901463..b72634a 100644
--- a/frc971/analysis/BUILD
+++ b/frc971/analysis/BUILD
@@ -23,7 +23,7 @@
         "//aos:json_to_flatbuffer",
         "//aos/events:shm_event_loop",
         "//aos/events:simulated_event_loop",
-        "//aos/events/logging:logger",
+        "//aos/events/logging:log_reader",
         "@com_github_google_glog//:glog",
         "@python_repo//:python3.7_lib",
     ],
diff --git a/frc971/analysis/py_log_reader.cc b/frc971/analysis/py_log_reader.cc
index 8d3e2a6..5b32ed6 100644
--- a/frc971/analysis/py_log_reader.cc
+++ b/frc971/analysis/py_log_reader.cc
@@ -16,7 +16,7 @@
 #include <errno.h>
 
 #include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/flatbuffer_merge.h"
 #include "aos/init.h"
diff --git a/frc971/control_loops/drivetrain/BUILD b/frc971/control_loops/drivetrain/BUILD
index ad39bcf..8c3274d 100644
--- a/frc971/control_loops/drivetrain/BUILD
+++ b/frc971/control_loops/drivetrain/BUILD
@@ -470,7 +470,7 @@
         ":drivetrain_output_fbs",
         ":drivetrain_test_lib",
         "//aos/controls:control_loop_test",
-        "//aos/events/logging:logger",
+        "//aos/events/logging:log_writer",
         "//aos/testing:googletest",
         "//frc971/queues:gyro_fbs",
         "//frc971/wpilib:imu_fbs",
diff --git a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
index bfa3d26..b378484 100644
--- a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
@@ -6,7 +6,7 @@
 #include "aos/controls/control_loop_test.h"
 #include "aos/controls/polytope.h"
 #include "aos/events/event_loop.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
 #include "aos/time/time.h"
 #include "gflags/gflags.h"
 #include "gtest/gtest.h"
diff --git a/y2019/control_loops/drivetrain/BUILD b/y2019/control_loops/drivetrain/BUILD
index ed6d832..c21433b 100644
--- a/y2019/control_loops/drivetrain/BUILD
+++ b/y2019/control_loops/drivetrain/BUILD
@@ -232,7 +232,8 @@
         "//aos:init",
         "//aos/events:shm_event_loop",
         "//aos/events:simulated_event_loop",
-        "//aos/events/logging:logger",
+        "//aos/events/logging:log_reader",
+        "//aos/events/logging:log_writer",
         "//frc971/control_loops/drivetrain:drivetrain_lib",
         "@com_github_gflags_gflags//:gflags",
         "@com_github_google_glog//:glog",
diff --git a/y2019/control_loops/drivetrain/drivetrain_replay.cc b/y2019/control_loops/drivetrain/drivetrain_replay.cc
index 69cce4f..97b4c2b 100644
--- a/y2019/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2019/control_loops/drivetrain/drivetrain_replay.cc
@@ -1,7 +1,8 @@
 #include <iostream>
 
 #include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/init.h"
 #include "aos/json_to_flatbuffer.h"
diff --git a/y2020/control_loops/drivetrain/BUILD b/y2020/control_loops/drivetrain/BUILD
index a74b18e..1b98592 100644
--- a/y2020/control_loops/drivetrain/BUILD
+++ b/y2020/control_loops/drivetrain/BUILD
@@ -137,7 +137,7 @@
         ":localizer",
         "//aos/controls:control_loop_test",
         "//aos/events:simulated_event_loop",
-        "//aos/events/logging:logger",
+        "//aos/events/logging:log_writer",
         "//aos/network:team_number",
         "//aos/network:testing_time_converter",
         "//frc971/control_loops:team_number_test_environment",
@@ -160,7 +160,8 @@
         "//aos:init",
         "//aos/events:shm_event_loop",
         "//aos/events:simulated_event_loop",
-        "//aos/events/logging:logger",
+        "//aos/events/logging:log_reader",
+        "//aos/events/logging:log_writer",
         "//aos/testing:googletest",
         "//frc971/control_loops/drivetrain:drivetrain_lib",
         "@com_github_gflags_gflags//:gflags",
@@ -180,7 +181,8 @@
         "//aos:init",
         "//aos/events:shm_event_loop",
         "//aos/events:simulated_event_loop",
-        "//aos/events/logging:logger",
+        "//aos/events/logging:log_reader",
+        "//aos/events/logging:log_writer",
         "//frc971/control_loops/drivetrain:drivetrain_lib",
         "@com_github_gflags_gflags//:gflags",
         "@com_github_google_glog//:glog",
diff --git a/y2020/control_loops/drivetrain/drivetrain_replay.cc b/y2020/control_loops/drivetrain/drivetrain_replay.cc
index 1533316..9b2375b 100644
--- a/y2020/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2020/control_loops/drivetrain/drivetrain_replay.cc
@@ -5,7 +5,8 @@
 // in some other way. The original drivetrain status data will be on the
 // /original/drivetrain channel.
 #include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/init.h"
 #include "aos/json_to_flatbuffer.h"
diff --git a/y2020/control_loops/drivetrain/drivetrain_replay_test.cc b/y2020/control_loops/drivetrain/drivetrain_replay_test.cc
index 4b02923..2e6b327 100644
--- a/y2020/control_loops/drivetrain/drivetrain_replay_test.cc
+++ b/y2020/control_loops/drivetrain/drivetrain_replay_test.cc
@@ -11,7 +11,8 @@
 #include "gtest/gtest.h"
 
 #include "aos/configuration.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/init.h"
 #include "aos/json_to_flatbuffer.h"
diff --git a/y2020/control_loops/drivetrain/localizer_test.cc b/y2020/control_loops/drivetrain/localizer_test.cc
index d440aaf..d195b5d 100644
--- a/y2020/control_loops/drivetrain/localizer_test.cc
+++ b/y2020/control_loops/drivetrain/localizer_test.cc
@@ -3,7 +3,7 @@
 #include "gtest/gtest.h"
 
 #include "aos/controls/control_loop_test.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
 #include "aos/network/message_bridge_server_generated.h"
 #include "aos/network/team_number.h"
 #include "aos/network/testing_time_converter.h"
diff --git a/y2020/control_loops/superstructure/BUILD b/y2020/control_loops/superstructure/BUILD
index d659920..3ea3dbf 100644
--- a/y2020/control_loops/superstructure/BUILD
+++ b/y2020/control_loops/superstructure/BUILD
@@ -105,7 +105,7 @@
         ":superstructure_status_fbs",
         "//aos:math",
         "//aos/controls:control_loop_test",
-        "//aos/events/logging:logger",
+        "//aos/events/logging:log_writer",
         "//aos/testing:googletest",
         "//aos/time",
         "//frc971/control_loops:capped_test_plant",
diff --git a/y2020/control_loops/superstructure/superstructure_lib_test.cc b/y2020/control_loops/superstructure/superstructure_lib_test.cc
index ac1e4a0..c406d09 100644
--- a/y2020/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2020/control_loops/superstructure/superstructure_lib_test.cc
@@ -4,7 +4,7 @@
 #include <memory>
 
 #include "aos/controls/control_loop_test.h"
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_writer.h"
 #include "frc971/control_loops/capped_test_plant.h"
 #include "frc971/control_loops/position_sensor_sim.h"
 #include "frc971/control_loops/team_number_test_environment.h"
diff --git a/y2020/vision/BUILD b/y2020/vision/BUILD
index a58ed8c..a5d47fa 100644
--- a/y2020/vision/BUILD
+++ b/y2020/vision/BUILD
@@ -117,7 +117,7 @@
         ":vision_fbs",
         "//aos:init",
         "//aos/events:simulated_event_loop",
-        "//aos/events/logging:logger",
+        "//aos/events/logging:log_reader",
         "//third_party:opencv",
     ],
 )
diff --git a/y2020/vision/viewer_replay.cc b/y2020/vision/viewer_replay.cc
index 82ab11a..234c445 100644
--- a/y2020/vision/viewer_replay.cc
+++ b/y2020/vision/viewer_replay.cc
@@ -3,7 +3,7 @@
 #include <opencv2/highgui/highgui.hpp>
 #include <opencv2/imgproc.hpp>
 
-#include "aos/events/logging/logger.h"
+#include "aos/events/logging/log_reader.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/init.h"
 #include "y2020/vision/vision_generated.h"