Support stopping and starting logging at runtime

Change-Id: If1e1b7119808d1f56e96efb71ea7000e0fa13fe8
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index ccec7fa..ad2f2d2 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -291,6 +291,7 @@
         ":multinode_pingpong_config",
         "//aos/events:pingpong_config",
     ],
+    shard_count = 5,
     deps = [
         ":logger",
         "//aos/events:message_counter",
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 5265f24..661f28d 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -126,6 +126,8 @@
                     const Configuration *configuration, const Node *node);
   ~MultiNodeLogNamer() override = default;
 
+  std::string_view base_name() const { return base_name_; }
+
   void WriteHeader(
       aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> *header,
       const Node *node) override;
@@ -150,6 +152,22 @@
   // this method.
   bool ran_out_of_space() const { return ran_out_of_space_; }
 
+  // Returns the maximum total_bytes() value for all existing
+  // DetachedBufferWriters.
+  //
+  // Returns 0 if no files are open.
+  size_t maximum_total_bytes() const {
+    size_t result = 0;
+    for (const std::pair<const Channel *const, DataWriter> &data_writer :
+         data_writers_) {
+      result = std::max(result, data_writer.second.writer->total_bytes());
+    }
+    if (data_writer_) {
+      result = std::max(result, data_writer_->total_bytes());
+    }
+    return result;
+  }
+
   // Closes all existing log files. No more data may be written after this.
   //
   // This may set ran_out_of_space().
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 3032a6f..859e24c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -35,43 +35,30 @@
 namespace logger {
 namespace chrono = std::chrono;
 
-Logger::Logger(std::string_view base_name, EventLoop *event_loop,
-               std::chrono::milliseconds polling_period)
-    : Logger(base_name, event_loop, event_loop->configuration(),
-             polling_period) {}
-Logger::Logger(std::string_view base_name, EventLoop *event_loop,
-               const Configuration *configuration,
-               std::chrono::milliseconds polling_period)
-    : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
-             event_loop, configuration, polling_period) {}
-Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
-               std::chrono::milliseconds polling_period)
-    : Logger(std::move(log_namer), event_loop, event_loop->configuration(),
-             polling_period) {}
-
-Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
-               const Configuration *configuration,
-               std::chrono::milliseconds polling_period)
+Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
+               std::function<bool(const Channel *)> should_log)
     : event_loop_(event_loop),
-      uuid_(UUID::Random()),
-      log_namer_(std::move(log_namer)),
       configuration_(configuration),
       name_(network::GetHostname()),
-      timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
-      polling_period_(polling_period),
+      timer_handler_(event_loop_->AddTimer(
+          [this]() { DoLogData(event_loop_->monotonic_now()); })),
       server_statistics_fetcher_(
           configuration::MultiNode(event_loop_->configuration())
               ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
                     "/aos")
               : aos::Fetcher<message_bridge::ServerStatistics>()) {
-  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
-  int channel_index = 0;
+  VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
 
   // Find all the nodes which are logging timestamps on our node.
   std::set<const Node *> timestamp_logger_nodes;
   for (const Channel *channel : *configuration_->channels()) {
-    if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
-        !channel->has_destination_nodes()) {
+    if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
+      continue;
+    }
+    if (!channel->has_destination_nodes()) {
+      continue;
+    }
+    if (!should_log(channel)) {
       continue;
     }
     for (const Connection *connection : *channel->destination_nodes()) {
@@ -103,44 +90,53 @@
         << event_loop_->node()->name()->string_view()
         << " but can't find channel /aos/remote_timestamps/"
         << node->name()->string_view();
+    if (!should_log(channel)) {
+      continue;
+    }
     timestamp_logger_channels.insert(std::make_pair(channel, node));
   }
 
   const size_t our_node_index =
       configuration::GetNodeIndex(configuration_, event_loop_->node());
 
-  for (const Channel *config_channel : *configuration_->channels()) {
+  for (size_t channel_index = 0;
+       channel_index < configuration_->channels()->size(); ++channel_index) {
+    const Channel *const config_channel =
+        configuration_->channels()->Get(channel_index);
     // The MakeRawFetcher method needs a channel which is in the event loop
     // configuration() object, not the configuration_ object.  Go look that up
     // from the config.
     const Channel *channel = aos::configuration::GetChannel(
         event_loop_->configuration(), config_channel->name()->string_view(),
         config_channel->type()->string_view(), "", event_loop_->node());
+    if (!should_log(channel)) {
+      continue;
+    }
 
     FetcherStruct fs;
     fs.node_index = our_node_index;
+    fs.channel_index = channel_index;
+    fs.channel = channel;
+
     const bool is_local =
         configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
 
     const bool is_readable =
         configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
-    const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
-                                 channel, event_loop_->node()) &&
-                             is_readable;
+    const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
+        channel, event_loop_->node());
+    const bool log_message = is_logged && is_readable;
 
-    const bool log_delivery_times =
-        (event_loop_->node() == nullptr)
-            ? false
-            : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-                  channel, event_loop_->node(), event_loop_->node());
+    bool log_delivery_times = false;
+    if (event_loop_->node() != nullptr) {
+      log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+          channel, event_loop_->node(), event_loop_->node());
+    }
 
     // Now, detect a MessageHeader timestamp logger where we should just log the
     // contents to a file directly.
     const bool log_contents = timestamp_logger_channels.find(channel) !=
                               timestamp_logger_channels.end();
-    const Node *timestamp_node =
-        log_contents ? timestamp_logger_channels.find(channel)->second
-                     : nullptr;
 
     if (log_message || log_delivery_times || log_contents) {
       fs.fetcher = event_loop->MakeRawFetcher(channel);
@@ -149,11 +145,11 @@
 
       if (log_delivery_times) {
         VLOG(1) << "  Delivery times";
-        fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
+        fs.wants_timestamp_writer = true;
       }
       if (log_message) {
         VLOG(1) << "  Data";
-        fs.writer = log_namer_->MakeWriter(channel);
+        fs.wants_writer = true;
         if (!is_local) {
           fs.log_type = LogType::kLogRemoteMessage;
         }
@@ -161,18 +157,50 @@
       if (log_contents) {
         VLOG(1) << "Timestamp logger channel "
                 << configuration::CleanedChannelToString(channel);
-        fs.contents_writer =
-            log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
+        fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
+        fs.wants_contents_writer = true;
         fs.node_index =
-            configuration::GetNodeIndex(configuration_, timestamp_node);
+            configuration::GetNodeIndex(configuration_, fs.timestamp_node);
       }
-      fs.channel_index = channel_index;
-      fs.written = false;
       fetchers_.emplace_back(std::move(fs));
     }
-    ++channel_index;
+  }
+}
+
+Logger::~Logger() {
+  if (log_namer_) {
+    // If we are replaying a log file, or in simulation, we want to force the
+    // last bit of data to be logged.  The easiest way to deal with this is to
+    // poll everything as we go to destroy the class, ie, shut down the logger,
+    // and write it to disk.
+    StopLogging(event_loop_->monotonic_now());
+  }
+}
+
+void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer) {
+  CHECK(!log_namer_) << ": Already logging";
+  log_namer_ = std::move(log_namer);
+  uuid_ = UUID::Random();
+  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+
+  // We want to do as much work as possible before the initial Fetch. Time
+  // between that and actually starting to log opens up the possibility of
+  // falling off the end of the queue during that time.
+
+  for (FetcherStruct &f : fetchers_) {
+    if (f.wants_writer) {
+      f.writer = log_namer_->MakeWriter(f.channel);
+    }
+    if (f.wants_timestamp_writer) {
+      f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+    }
+    if (f.wants_contents_writer) {
+      f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
+          f.channel, CHECK_NOTNULL(f.timestamp_node));
+    }
   }
 
+  CHECK(node_state_.empty());
   node_state_.resize(configuration::MultiNode(configuration_)
                          ? configuration_->nodes()->size()
                          : 1u);
@@ -183,21 +211,6 @@
     node_state_[node_index].log_file_header = MakeHeader(node);
   }
 
-  // When things start, we want to log the header, then the most recent
-  // messages available on each fetcher to capture the previous state, then
-  // start polling.
-  event_loop_->OnRun([this]() { StartLogging(); });
-}
-
-Logger::~Logger() {
-  // If we are replaying a log file, or in simulation, we want to force the last
-  // bit of data to be logged.  The easiest way to deal with this is to poll
-  // everything as we go to destroy the class, ie, shut down the logger, and
-  // write it to disk.
-  DoLogData();
-}
-
-void Logger::StartLogging() {
   // Grab data from each channel right before we declare the log file started
   // so we can capture the latest message on each channel.  This lets us have
   // non periodic messages with configuration that now get logged.
@@ -219,6 +232,25 @@
                         polling_period_);
 }
 
+std::unique_ptr<LogNamer> Logger::StopLogging(
+    aos::monotonic_clock::time_point end_time) {
+  CHECK(log_namer_) << ": Not logging right now";
+
+  if (end_time != aos::monotonic_clock::min_time) {
+    LogUntil(end_time);
+  }
+  timer_handler_->Disable();
+
+  for (FetcherStruct &f : fetchers_) {
+    f.writer = nullptr;
+    f.timestamp_writer = nullptr;
+    f.contents_writer = nullptr;
+  }
+  node_state_.clear();
+
+  return std::move(log_namer_);
+}
+
 void Logger::WriteHeader() {
   if (configuration::MultiNode(configuration_)) {
     server_statistics_fetcher_.Fetch();
@@ -356,6 +388,7 @@
   flatbuffers::Offset<flatbuffers::String> name_offset =
       fbb.CreateString(name_);
 
+  CHECK(uuid_ != UUID::Zero());
   flatbuffers::Offset<flatbuffers::String> logger_uuid_offset =
       fbb.CreateString(uuid_.string_view());
 
@@ -384,8 +417,7 @@
   // message.  Report back 3x to be extra safe, and because the cost isn't
   // huge on the read side.
   log_file_header_builder.add_max_out_of_order_duration(
-      std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
-          .count());
+      std::chrono::nanoseconds(3 * polling_period_).count());
 
   log_file_header_builder.add_monotonic_start_time(
       std::chrono::duration_cast<std::chrono::nanoseconds>(
@@ -522,22 +554,21 @@
   last_synchronized_time_ = t;
 }
 
-void Logger::DoLogData() {
-  // We want to guarentee that messages aren't out of order by more than
+void Logger::DoLogData(const monotonic_clock::time_point end_time) {
+  // We want to guarantee that messages aren't out of order by more than
   // max_out_of_order_duration.  To do this, we need sync points.  Every write
   // cycle should be a sync point.
-  const monotonic_clock::time_point monotonic_now =
-      event_loop_->monotonic_now();
 
   do {
     // Move the sync point up by at most polling_period.  This forces one sync
     // per iteration, even if it is small.
-    LogUntil(
-        std::min(last_synchronized_time_ + polling_period_, monotonic_now));
+    LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
+
+    on_logged_period_();
 
     // If we missed cycles, we could be pretty far behind.  Spin until we are
     // caught up.
-  } while (last_synchronized_time_ + polling_period_ < monotonic_now);
+  } while (last_synchronized_time_ + polling_period_ < end_time);
 }
 
 std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 2905dfa..2051550 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -45,7 +45,7 @@
   // ---- basename_timestamps/pi1/aos/remote_timestamps/pi2/aos.logger.MessageHeader.part0.bfbs, etc.
   //  \-- basename_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs, etc.
 
-  // All log files and parts from a single run of a logger executable will have
+  // All log files and parts from a single logging event will have
   // the same uuid.  This should be all the files generated on a single node.
   // Used to correlate files recorded together.
   logger_uuid:string;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 70c9f60..e8856f3 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -32,66 +32,69 @@
 class Logger {
  public:
   // Constructs a logger.
-  //   base_name/log_namer: Object used to write data to disk in one or more log
-  //     files.  If a base_name is passed in, a LocalLogNamer is wrapped
-  //     around it.
   //   event_loop: The event loop used to read the messages.
-  //   polling_period: The period used to poll the data.
   //   configuration: When provided, this is the configuration to log, and the
   //     configuration to use for the channel list to log.  If not provided,
   //     this becomes the configuration from the event loop.
-  Logger(std::string_view base_name, EventLoop *event_loop,
-         std::chrono::milliseconds polling_period =
-             std::chrono::milliseconds(100));
-  Logger(std::string_view base_name, EventLoop *event_loop,
-         const Configuration *configuration,
-         std::chrono::milliseconds polling_period =
-             std::chrono::milliseconds(100));
-  Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
-         std::chrono::milliseconds polling_period =
-             std::chrono::milliseconds(100));
-  Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
-         const Configuration *configuration,
-         std::chrono::milliseconds polling_period =
-             std::chrono::milliseconds(100));
+  //   should_log: When provided, a filter for channels to log. If not provided,
+  //     all available channels are logged.
+  Logger(EventLoop *event_loop)
+      : Logger(event_loop, event_loop->configuration()) {}
+  Logger(EventLoop *event_loop, const Configuration *configuration)
+      : Logger(event_loop, configuration,
+               [](const Channel *) { return true; }) {}
+  Logger(EventLoop *event_loop, const Configuration *configuration,
+         std::function<bool(const Channel *)> should_log);
   ~Logger();
 
   // Overrides the name in the log file header.
   void set_name(std::string_view name) { name_ = name; }
 
+  // Sets the callback to run after each period of data is logged. Defaults to
+  // doing nothing.
+  //
+  // This callback may safely do things like call Rotate().
+  void set_on_logged_period(std::function<void()> on_logged_period) {
+    on_logged_period_ = std::move(on_logged_period);
+  }
+
+  // Sets the period between polling the data. Defaults to 100ms.
+  //
+  // Changing this while a set of files is being written may result in
+  // unreadable files.
+  void set_polling_period(std::chrono::nanoseconds polling_period) {
+    polling_period_ = polling_period;
+  }
+
   // Rotates the log file(s), triggering new part files to be written for each
   // log file.
   void Rotate();
 
+  // Starts logging to files with the given naming scheme.
+  void StartLogging(std::unique_ptr<LogNamer> log_namer);
+
+  // Stops logging. Ensures any messages through end_time make it into the log.
+  //
+  // If you want to stop ASAP, pass min_time to avoid reading any more messages.
+  //
+  // Returns the LogNamer in case the caller wants to do anything else with it
+  // before destroying it.
+  std::unique_ptr<LogNamer> StopLogging(
+      aos::monotonic_clock::time_point end_time);
+
+  // Returns whether a log is currently being written.
+  bool is_started() const { return static_cast<bool>(log_namer_); }
+
+  // Shortcut to call StartLogging with a LocalLogNamer when event processing
+  // starts.
+  void StartLoggingLocalNamerOnRun(std::string base_name) {
+    event_loop_->OnRun([this, base_name]() {
+      StartLogging(
+          std::make_unique<LocalLogNamer>(base_name, event_loop_->node()));
+    });
+  }
+
  private:
-  void WriteHeader();
-  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
-      const Node *node);
-
-  bool MaybeUpdateTimestamp(
-      const Node *node, int node_index,
-      aos::monotonic_clock::time_point monotonic_start_time,
-      aos::realtime_clock::time_point realtime_start_time);
-
-  void DoLogData();
-
-  void WriteMissingTimestamps();
-
-  void StartLogging();
-
-  // Fetches from each channel until all the data is logged.
-  void LogUntil(monotonic_clock::time_point t);
-
-  EventLoop *event_loop_;
-  const UUID uuid_;
-  std::unique_ptr<LogNamer> log_namer_;
-
-  // The configuration to place at the top of the log file.
-  const Configuration *configuration_;
-
-  // Name to save in the log file.  Defaults to hostname.
-  std::string name_;
-
   // Structure to track both a fetcher, and if the data fetched has been
   // written.  We may want to delay writing data to disk so that we don't let
   // data get too far out of order when written to disk so we can avoid making
@@ -101,41 +104,25 @@
     bool written = false;
 
     int channel_index = -1;
+    const Channel *channel = nullptr;
+    const Node *timestamp_node = nullptr;
 
     LogType log_type = LogType::kLogMessage;
 
+    // We fill out the metadata at construction, but the actual writers have to
+    // be updated each time we start logging. To avoid duplicating the complex
+    // logic determining whether each writer should be initialized, we just
+    // stash the answer in separate member variables.
+    bool wants_writer = false;
     DetachedBufferWriter *writer = nullptr;
+    bool wants_timestamp_writer = false;
     DetachedBufferWriter *timestamp_writer = nullptr;
+    bool wants_contents_writer = false;
     DetachedBufferWriter *contents_writer = nullptr;
-    const Node *writer_node = nullptr;
-    const Node *timestamp_node = nullptr;
+
     int node_index = 0;
   };
 
-  std::vector<FetcherStruct> fetchers_;
-  TimerHandler *timer_handler_;
-
-  // Period to poll the channels.
-  const std::chrono::milliseconds polling_period_;
-
-  // Last time that data was written for all channels to disk.
-  monotonic_clock::time_point last_synchronized_time_;
-
-  monotonic_clock::time_point monotonic_start_time_;
-  realtime_clock::time_point realtime_start_time_;
-
-  // Max size that the header has consumed.  This much extra data will be
-  // reserved in the builder to avoid reallocating.
-  size_t max_header_size_ = 0;
-
-  // Fetcher for all the statistics from all the nodes.
-  aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
-
-  // Sets the start time for a specific node.
-  void SetStartTime(size_t node_index,
-                    aos::monotonic_clock::time_point monotonic_start_time,
-                    aos::realtime_clock::time_point realtime_start_time);
-
   struct NodeState {
     aos::monotonic_clock::time_point monotonic_start_time =
         aos::monotonic_clock::min_time;
@@ -145,6 +132,56 @@
     aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> log_file_header =
         aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader>::Empty();
   };
+
+  void WriteHeader();
+  aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+      const Node *node);
+
+  bool MaybeUpdateTimestamp(
+      const Node *node, int node_index,
+      aos::monotonic_clock::time_point monotonic_start_time,
+      aos::realtime_clock::time_point realtime_start_time);
+
+  void DoLogData(const monotonic_clock::time_point end_time);
+
+  void WriteMissingTimestamps();
+
+  // Fetches from each channel until all the data is logged.
+  void LogUntil(monotonic_clock::time_point t);
+
+  // Sets the start time for a specific node.
+  void SetStartTime(size_t node_index,
+                    aos::monotonic_clock::time_point monotonic_start_time,
+                    aos::realtime_clock::time_point realtime_start_time);
+
+  EventLoop *event_loop_;
+  UUID uuid_ = UUID::Zero();
+  std::unique_ptr<LogNamer> log_namer_;
+
+  // The configuration to place at the top of the log file.
+  const Configuration *const configuration_;
+
+  // Name to save in the log file.  Defaults to hostname.
+  std::string name_;
+
+  std::function<void()> on_logged_period_ = []() {};
+
+  std::vector<FetcherStruct> fetchers_;
+  TimerHandler *timer_handler_;
+
+  // Period to poll the channels.
+  std::chrono::nanoseconds polling_period_ = std::chrono::milliseconds(100);
+
+  // Last time that data was written for all channels to disk.
+  monotonic_clock::time_point last_synchronized_time_;
+
+  // Max size that the header has consumed.  This much extra data will be
+  // reserved in the builder to avoid reallocating.
+  size_t max_header_size_ = 0;
+
+  // Fetcher for all the statistics from all the nodes.
+  aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
+
   std::vector<NodeState> node_state_;
 };
 
diff --git a/aos/events/logging/logger_main.cc b/aos/events/logging/logger_main.cc
index d77d72b..b1dce96 100644
--- a/aos/events/logging/logger_main.cc
+++ b/aos/events/logging/logger_main.cc
@@ -33,8 +33,9 @@
         event_loop.node());
   }
 
-  aos::logger::Logger logger(std::move(log_namer), &event_loop,
-                             std::chrono::milliseconds(100));
+  aos::logger::Logger logger(&event_loop);
+  event_loop.OnRun(
+      [&log_namer, &logger]() { logger.StartLogging(std::move(log_namer)); });
 
   event_loop.Run();
 
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 707a232..2095529 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -41,6 +41,8 @@
   Pong pong_;
 };
 
+using LoggerDeathTest = LoggerTest;
+
 // Tests that we can startup at all.  This confirms that the channels are all in
 // the config.
 TEST_F(LoggerTest, Starts) {
@@ -58,8 +60,9 @@
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
-    Logger logger(base_name, logger_event_loop.get(),
-                  std::chrono::milliseconds(100));
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
     event_loop_factory_.RunFor(chrono::milliseconds(20000));
   }
 
@@ -101,6 +104,130 @@
   EXPECT_EQ(ping_count, 2010);
 }
 
+// Tests calling StartLogging twice.
+TEST_F(LoggerDeathTest, ExtraStart) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string base_name1 = tmpdir + "/logfile1";
+  const ::std::string logfile1 = base_name1 + ".part0.bfbs";
+  const ::std::string base_name2 = tmpdir + "/logfile2";
+  const ::std::string logfile2 = base_name2 + ".part0.bfbs";
+  unlink(logfile1.c_str());
+  unlink(logfile2.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile1 << " then " << logfile2;
+
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger_event_loop->OnRun(
+        [base_name1, base_name2, &logger_event_loop, &logger]() {
+          logger.StartLogging(std::make_unique<LocalLogNamer>(
+              base_name1, logger_event_loop->node()));
+          EXPECT_DEATH(logger.StartLogging(std::make_unique<LocalLogNamer>(
+                           base_name2, logger_event_loop->node())),
+                       "Already logging");
+        });
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+}
+
+// Tests calling StopLogging twice.
+TEST_F(LoggerDeathTest, ExtraStop) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string base_name = tmpdir + "/logfile";
+  const ::std::string logfile = base_name + ".part0.bfbs";
+  // Remove it.
+  unlink(logfile.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile;
+
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger_event_loop->OnRun([base_name, &logger_event_loop, &logger]() {
+      logger.StartLogging(std::make_unique<LocalLogNamer>(
+          base_name, logger_event_loop->node()));
+      logger.StopLogging(aos::monotonic_clock::min_time);
+      EXPECT_DEATH(logger.StopLogging(aos::monotonic_clock::min_time),
+                   "Not logging right now");
+    });
+    event_loop_factory_.RunFor(chrono::milliseconds(20000));
+  }
+}
+
+// Tests that we can startup twice.
+TEST_F(LoggerTest, StartsTwice) {
+  const ::std::string tmpdir(getenv("TEST_TMPDIR"));
+  const ::std::string base_name1 = tmpdir + "/logfile1";
+  const ::std::string logfile1 = base_name1 + ".part0.bfbs";
+  const ::std::string base_name2 = tmpdir + "/logfile2";
+  const ::std::string logfile2 = base_name2 + ".part0.bfbs";
+  unlink(logfile1.c_str());
+  unlink(logfile2.c_str());
+
+  LOG(INFO) << "Logging data to " << logfile1 << " then " << logfile2;
+
+  {
+    std::unique_ptr<EventLoop> logger_event_loop =
+        event_loop_factory_.MakeEventLoop("logger");
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLogging(
+        std::make_unique<LocalLogNamer>(base_name1, logger_event_loop->node()));
+    event_loop_factory_.RunFor(chrono::milliseconds(10000));
+    logger.StopLogging(logger_event_loop->monotonic_now());
+    event_loop_factory_.RunFor(chrono::milliseconds(10000));
+    logger.StartLogging(
+        std::make_unique<LocalLogNamer>(base_name2, logger_event_loop->node()));
+    event_loop_factory_.RunFor(chrono::milliseconds(10000));
+  }
+
+  for (const auto &logfile :
+       {std::make_tuple(logfile1, 10), std::make_tuple(logfile2, 2010)}) {
+    SCOPED_TRACE(std::get<0>(logfile));
+    LogReader reader(std::get<0>(logfile));
+    reader.Register();
+
+    EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(nullptr));
+
+    std::unique_ptr<EventLoop> test_event_loop =
+        reader.event_loop_factory()->MakeEventLoop("log_reader");
+
+    int ping_count = std::get<1>(logfile);
+    int pong_count = std::get<1>(logfile);
+
+    // Confirm that the ping and pong counts both match, and the value also
+    // matches.
+    test_event_loop->MakeWatcher("/test",
+                                 [&ping_count](const examples::Ping &ping) {
+                                   EXPECT_EQ(ping.value(), ping_count + 1);
+                                   ++ping_count;
+                                 });
+    test_event_loop->MakeWatcher(
+        "/test", [&pong_count, &ping_count](const examples::Pong &pong) {
+          EXPECT_EQ(pong.value(), pong_count + 1);
+          ++pong_count;
+          EXPECT_EQ(ping_count, pong_count);
+        });
+
+    reader.event_loop_factory()->RunFor(std::chrono::seconds(100));
+    EXPECT_EQ(ping_count, std::get<1>(logfile) + 1000);
+  }
+}
+
 // Tests that we can read and write rotated log files.
 TEST_F(LoggerTest, RotatedLogFile) {
   const ::std::string tmpdir(getenv("TEST_TMPDIR"));
@@ -119,9 +246,9 @@
 
     event_loop_factory_.RunFor(chrono::milliseconds(95));
 
-    Logger logger(
-        std::make_unique<LocalLogNamer>(base_name, logger_event_loop->node()),
-        logger_event_loop.get(), std::chrono::milliseconds(100));
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
     event_loop_factory_.RunFor(chrono::milliseconds(10000));
     logger.Rotate();
     event_loop_factory_.RunFor(chrono::milliseconds(10000));
@@ -218,8 +345,9 @@
                            chrono::microseconds(50));
     });
 
-    Logger logger(base_name, logger_event_loop.get(),
-                  std::chrono::milliseconds(100));
+    Logger logger(logger_event_loop.get());
+    logger.set_polling_period(std::chrono::milliseconds(100));
+    logger.StartLoggingLocalNamerOnRun(base_name);
 
     event_loop_factory_.RunFor(chrono::milliseconds(1000));
   }
@@ -289,11 +417,13 @@
   }
 
   void StartLogger(LoggerState *logger) {
-    logger->logger = std::make_unique<Logger>(
-        std::make_unique<MultiNodeLogNamer>(logfile_base_,
-                                            logger->event_loop->configuration(),
-                                            logger->event_loop->node()),
-        logger->event_loop.get(), chrono::milliseconds(100));
+    logger->logger = std::make_unique<Logger>(logger->event_loop.get());
+    logger->logger->set_polling_period(std::chrono::milliseconds(100));
+    logger->event_loop->OnRun([this, logger]() {
+      logger->logger->StartLogging(std::make_unique<MultiNodeLogNamer>(
+          logfile_base_, logger->event_loop->configuration(),
+          logger->event_loop->node()));
+    });
   }
 
   // Config and factory.
diff --git a/aos/events/logging/uuid.cc b/aos/events/logging/uuid.cc
index 9298c8b..376fa95 100644
--- a/aos/events/logging/uuid.cc
+++ b/aos/events/logging/uuid.cc
@@ -57,4 +57,10 @@
   return result;
 }
 
+UUID UUID::Zero() {
+  UUID result;
+  result.data_.fill(0);
+  return result;
+}
+
 }  // namespace aos
diff --git a/aos/events/logging/uuid.h b/aos/events/logging/uuid.h
index b81b811..0387a4b 100644
--- a/aos/events/logging/uuid.h
+++ b/aos/events/logging/uuid.h
@@ -13,6 +13,8 @@
   // Returns a randomly generated UUID.  This is known as a UUID4.
   static UUID Random();
 
+  static UUID Zero();
+
   std::string_view string_view() const {
     return std::string_view(data_.data(), data_.size());
   }
diff --git a/aos/events/logging/uuid_test.cc b/aos/events/logging/uuid_test.cc
index d0320de..4ea351c 100644
--- a/aos/events/logging/uuid_test.cc
+++ b/aos/events/logging/uuid_test.cc
@@ -12,6 +12,8 @@
   LOG(INFO) << UUID::Random().string_view();
 
   EXPECT_NE(UUID::Random(), UUID::Random());
+  EXPECT_NE(UUID::Random(), UUID::Zero());
+  EXPECT_EQ(UUID::Zero(), UUID::Zero());
 }
 
 }  // namespace testing
diff --git a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
index 1f362f3..faed4af 100644
--- a/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_lib_test.cc
@@ -64,8 +64,8 @@
     if (!FLAGS_output_file.empty()) {
       unlink(FLAGS_output_file.c_str());
       logger_event_loop_ = MakeEventLoop("logger");
-      logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
-                                                      logger_event_loop_.get());
+      logger_ = std::make_unique<aos::logger::Logger>(logger_event_loop_.get());
+      logger_->StartLoggingLocalNamerOnRun(FLAGS_output_file);
     }
 
     // Run for enough time to allow the gyro/imu zeroing code to run.
@@ -125,11 +125,9 @@
     // TODO(james): Handle Euler angle singularities...
     const double down_estimator_yaw =
         CHECK_NOTNULL(drivetrain_status_fetcher_->down_estimator())->yaw();
-    const double localizer_yaw =
-        drivetrain_status_fetcher_->theta();
-    EXPECT_LT(
-        std::abs(aos::math::DiffAngle(down_estimator_yaw, localizer_yaw)),
-        1e-2);
+    const double localizer_yaw = drivetrain_status_fetcher_->theta();
+    EXPECT_LT(std::abs(aos::math::DiffAngle(down_estimator_yaw, localizer_yaw)),
+              1e-2);
     const double true_yaw = (drivetrain_plant_.GetRightPosition() -
                              drivetrain_plant_.GetLeftPosition()) /
                             (dt_config_.robot_radius * 2.0);
@@ -558,9 +556,11 @@
     auto builder = drivetrain_goal_sender_.MakeBuilder();
 
     flatbuffers::Offset<flatbuffers::Vector<float>> spline_x_offset =
-        builder.fbb()->CreateVector<float>({0.0, -0.25, -0.5, -0.5, -0.75, -1.0});
+        builder.fbb()->CreateVector<float>(
+            {0.0, -0.25, -0.5, -0.5, -0.75, -1.0});
     flatbuffers::Offset<flatbuffers::Vector<float>> spline_y_offset =
-        builder.fbb()->CreateVector<float>({0.0, 0.0, -0.25, -0.75, -1.0, -1.0});
+        builder.fbb()->CreateVector<float>(
+            {0.0, 0.0, -0.25, -0.75, -1.0, -1.0});
 
     MultiSpline::Builder multispline_builder =
         builder.MakeBuilder<MultiSpline>();
diff --git a/y2019/control_loops/drivetrain/drivetrain_replay.cc b/y2019/control_loops/drivetrain/drivetrain_replay.cc
index f7b69e4..98f3eeb 100644
--- a/y2019/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2019/control_loops/drivetrain/drivetrain_replay.cc
@@ -42,7 +42,8 @@
   log_writer_event_loop->SkipTimingReport();
   log_writer_event_loop->SkipAosLog();
   CHECK(nullptr == log_writer_event_loop->node());
-  aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
+  aos::logger::Logger writer(log_writer_event_loop.get());
+  writer.StartLoggingLocalNamerOnRun(FLAGS_output_file);
 
   std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
       reader.event_loop_factory()->MakeEventLoop("drivetrain");
diff --git a/y2020/control_loops/drivetrain/drivetrain_replay.cc b/y2020/control_loops/drivetrain/drivetrain_replay.cc
index 0dd5a1a..0ed57c5 100644
--- a/y2020/control_loops/drivetrain/drivetrain_replay.cc
+++ b/y2020/control_loops/drivetrain/drivetrain_replay.cc
@@ -49,7 +49,8 @@
   std::unique_ptr<aos::EventLoop> log_writer_event_loop =
       reader.event_loop_factory()->MakeEventLoop("log_writer", node);
   log_writer_event_loop->SkipTimingReport();
-  aos::logger::Logger writer(FLAGS_output_file, log_writer_event_loop.get());
+  aos::logger::Logger writer(log_writer_event_loop.get());
+  writer.StartLoggingLocalNamerOnRun(FLAGS_output_file);
 
   std::unique_ptr<aos::EventLoop> drivetrain_event_loop =
       reader.event_loop_factory()->MakeEventLoop("drivetrain", node);
diff --git a/y2020/control_loops/drivetrain/localizer_test.cc b/y2020/control_loops/drivetrain/localizer_test.cc
index ae9f887..dbb4f2f 100644
--- a/y2020/control_loops/drivetrain/localizer_test.cc
+++ b/y2020/control_loops/drivetrain/localizer_test.cc
@@ -26,10 +26,10 @@
 using frc971::control_loops::drivetrain::DrivetrainConfig;
 using frc971::control_loops::drivetrain::Goal;
 using frc971::control_loops::drivetrain::LocalizerControl;
+using frc971::vision::sift::CameraCalibrationT;
+using frc971::vision::sift::CameraPoseT;
 using frc971::vision::sift::ImageMatchResult;
 using frc971::vision::sift::ImageMatchResultT;
-using frc971::vision::sift::CameraPoseT;
-using frc971::vision::sift::CameraCalibrationT;
 using frc971::vision::sift::TransformationMatrixT;
 
 namespace {
@@ -90,9 +90,9 @@
 }  // namespace
 
 namespace chrono = std::chrono;
-using frc971::control_loops::drivetrain::testing::DrivetrainSimulation;
-using frc971::control_loops::drivetrain::DrivetrainLoop;
 using aos::monotonic_clock;
+using frc971::control_loops::drivetrain::DrivetrainLoop;
+using frc971::control_loops::drivetrain::testing::DrivetrainSimulation;
 
 class LocalizedDrivetrainTest : public aos::testing::ControlLoopTest {
  protected:
@@ -136,8 +136,8 @@
 
     if (!FLAGS_output_file.empty()) {
       logger_event_loop_ = MakeEventLoop("logger", roborio_);
-      logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
-                                                      logger_event_loop_.get());
+      logger_ = std::make_unique<aos::logger::Logger>(logger_event_loop_.get());
+      logger_->StartLoggingLocalNamerOnRun(FLAGS_output_file);
     }
 
     test_event_loop_->MakeWatcher(
@@ -152,7 +152,7 @@
               last_frame_ = monotonic_now();
             }
           }
-          });
+        });
 
     test_event_loop_->AddPhasedLoop(
         [this](int) {
@@ -337,8 +337,7 @@
   aos::Sender<aos::message_bridge::ServerStatistics> server_statistics_sender_;
 
   std::unique_ptr<aos::EventLoop> drivetrain_event_loop_;
-  const frc971::control_loops::drivetrain::DrivetrainConfig<double>
-      dt_config_;
+  const frc971::control_loops::drivetrain::DrivetrainConfig<double> dt_config_;
 
   std::unique_ptr<aos::EventLoop> pi1_event_loop_;
   aos::Sender<ImageMatchResult> camera_sender_;
diff --git a/y2020/control_loops/superstructure/superstructure_lib_test.cc b/y2020/control_loops/superstructure/superstructure_lib_test.cc
index 8a0589c..59296d9 100644
--- a/y2020/control_loops/superstructure/superstructure_lib_test.cc
+++ b/y2020/control_loops/superstructure/superstructure_lib_test.cc
@@ -440,8 +440,8 @@
     if (!FLAGS_output_file.empty()) {
       unlink(FLAGS_output_file.c_str());
       logger_event_loop_ = MakeEventLoop("logger", roborio_);
-      logger_ = std::make_unique<aos::logger::Logger>(FLAGS_output_file,
-                                                      logger_event_loop_.get());
+      logger_ = std::make_unique<aos::logger::Logger>(logger_event_loop_.get());
+      logger_->StartLoggingLocalNamerOnRun(FLAGS_output_file);
     }
   }