Support stopping and starting logging at runtime

Change-Id: If1e1b7119808d1f56e96efb71ea7000e0fa13fe8
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 3032a6f..859e24c 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -35,43 +35,30 @@
 namespace logger {
 namespace chrono = std::chrono;
 
-Logger::Logger(std::string_view base_name, EventLoop *event_loop,
-               std::chrono::milliseconds polling_period)
-    : Logger(base_name, event_loop, event_loop->configuration(),
-             polling_period) {}
-Logger::Logger(std::string_view base_name, EventLoop *event_loop,
-               const Configuration *configuration,
-               std::chrono::milliseconds polling_period)
-    : Logger(std::make_unique<LocalLogNamer>(base_name, event_loop->node()),
-             event_loop, configuration, polling_period) {}
-Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
-               std::chrono::milliseconds polling_period)
-    : Logger(std::move(log_namer), event_loop, event_loop->configuration(),
-             polling_period) {}
-
-Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
-               const Configuration *configuration,
-               std::chrono::milliseconds polling_period)
+Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
+               std::function<bool(const Channel *)> should_log)
     : event_loop_(event_loop),
-      uuid_(UUID::Random()),
-      log_namer_(std::move(log_namer)),
       configuration_(configuration),
       name_(network::GetHostname()),
-      timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
-      polling_period_(polling_period),
+      timer_handler_(event_loop_->AddTimer(
+          [this]() { DoLogData(event_loop_->monotonic_now()); })),
       server_statistics_fetcher_(
           configuration::MultiNode(event_loop_->configuration())
               ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
                     "/aos")
               : aos::Fetcher<message_bridge::ServerStatistics>()) {
-  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
-  int channel_index = 0;
+  VLOG(1) << "Creating logger for " << FlatbufferToJson(event_loop_->node());
 
   // Find all the nodes which are logging timestamps on our node.
   std::set<const Node *> timestamp_logger_nodes;
   for (const Channel *channel : *configuration_->channels()) {
-    if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) ||
-        !channel->has_destination_nodes()) {
+    if (!configuration::ChannelIsSendableOnNode(channel, event_loop_->node())) {
+      continue;
+    }
+    if (!channel->has_destination_nodes()) {
+      continue;
+    }
+    if (!should_log(channel)) {
       continue;
     }
     for (const Connection *connection : *channel->destination_nodes()) {
@@ -103,44 +90,53 @@
         << event_loop_->node()->name()->string_view()
         << " but can't find channel /aos/remote_timestamps/"
         << node->name()->string_view();
+    if (!should_log(channel)) {
+      continue;
+    }
     timestamp_logger_channels.insert(std::make_pair(channel, node));
   }
 
   const size_t our_node_index =
       configuration::GetNodeIndex(configuration_, event_loop_->node());
 
-  for (const Channel *config_channel : *configuration_->channels()) {
+  for (size_t channel_index = 0;
+       channel_index < configuration_->channels()->size(); ++channel_index) {
+    const Channel *const config_channel =
+        configuration_->channels()->Get(channel_index);
     // The MakeRawFetcher method needs a channel which is in the event loop
     // configuration() object, not the configuration_ object.  Go look that up
     // from the config.
     const Channel *channel = aos::configuration::GetChannel(
         event_loop_->configuration(), config_channel->name()->string_view(),
         config_channel->type()->string_view(), "", event_loop_->node());
+    if (!should_log(channel)) {
+      continue;
+    }
 
     FetcherStruct fs;
     fs.node_index = our_node_index;
+    fs.channel_index = channel_index;
+    fs.channel = channel;
+
     const bool is_local =
         configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
 
     const bool is_readable =
         configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
-    const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
-                                 channel, event_loop_->node()) &&
-                             is_readable;
+    const bool is_logged = configuration::ChannelMessageIsLoggedOnNode(
+        channel, event_loop_->node());
+    const bool log_message = is_logged && is_readable;
 
-    const bool log_delivery_times =
-        (event_loop_->node() == nullptr)
-            ? false
-            : configuration::ConnectionDeliveryTimeIsLoggedOnNode(
-                  channel, event_loop_->node(), event_loop_->node());
+    bool log_delivery_times = false;
+    if (event_loop_->node() != nullptr) {
+      log_delivery_times = configuration::ConnectionDeliveryTimeIsLoggedOnNode(
+          channel, event_loop_->node(), event_loop_->node());
+    }
 
     // Now, detect a MessageHeader timestamp logger where we should just log the
     // contents to a file directly.
     const bool log_contents = timestamp_logger_channels.find(channel) !=
                               timestamp_logger_channels.end();
-    const Node *timestamp_node =
-        log_contents ? timestamp_logger_channels.find(channel)->second
-                     : nullptr;
 
     if (log_message || log_delivery_times || log_contents) {
       fs.fetcher = event_loop->MakeRawFetcher(channel);
@@ -149,11 +145,11 @@
 
       if (log_delivery_times) {
         VLOG(1) << "  Delivery times";
-        fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
+        fs.wants_timestamp_writer = true;
       }
       if (log_message) {
         VLOG(1) << "  Data";
-        fs.writer = log_namer_->MakeWriter(channel);
+        fs.wants_writer = true;
         if (!is_local) {
           fs.log_type = LogType::kLogRemoteMessage;
         }
@@ -161,18 +157,50 @@
       if (log_contents) {
         VLOG(1) << "Timestamp logger channel "
                 << configuration::CleanedChannelToString(channel);
-        fs.contents_writer =
-            log_namer_->MakeForwardedTimestampWriter(channel, timestamp_node);
+        fs.timestamp_node = timestamp_logger_channels.find(channel)->second;
+        fs.wants_contents_writer = true;
         fs.node_index =
-            configuration::GetNodeIndex(configuration_, timestamp_node);
+            configuration::GetNodeIndex(configuration_, fs.timestamp_node);
       }
-      fs.channel_index = channel_index;
-      fs.written = false;
       fetchers_.emplace_back(std::move(fs));
     }
-    ++channel_index;
+  }
+}
+
+Logger::~Logger() {
+  if (log_namer_) {
+    // If we are replaying a log file, or in simulation, we want to force the
+    // last bit of data to be logged.  The easiest way to deal with this is to
+    // poll everything as we go to destroy the class, ie, shut down the logger,
+    // and write it to disk.
+    StopLogging(event_loop_->monotonic_now());
+  }
+}
+
+void Logger::StartLogging(std::unique_ptr<LogNamer> log_namer) {
+  CHECK(!log_namer_) << ": Already logging";
+  log_namer_ = std::move(log_namer);
+  uuid_ = UUID::Random();
+  VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+
+  // We want to do as much work as possible before the initial Fetch. Time
+  // between that and actually starting to log opens up the possibility of
+  // falling off the end of the queue during that time.
+
+  for (FetcherStruct &f : fetchers_) {
+    if (f.wants_writer) {
+      f.writer = log_namer_->MakeWriter(f.channel);
+    }
+    if (f.wants_timestamp_writer) {
+      f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+    }
+    if (f.wants_contents_writer) {
+      f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
+          f.channel, CHECK_NOTNULL(f.timestamp_node));
+    }
   }
 
+  CHECK(node_state_.empty());
   node_state_.resize(configuration::MultiNode(configuration_)
                          ? configuration_->nodes()->size()
                          : 1u);
@@ -183,21 +211,6 @@
     node_state_[node_index].log_file_header = MakeHeader(node);
   }
 
-  // When things start, we want to log the header, then the most recent
-  // messages available on each fetcher to capture the previous state, then
-  // start polling.
-  event_loop_->OnRun([this]() { StartLogging(); });
-}
-
-Logger::~Logger() {
-  // If we are replaying a log file, or in simulation, we want to force the last
-  // bit of data to be logged.  The easiest way to deal with this is to poll
-  // everything as we go to destroy the class, ie, shut down the logger, and
-  // write it to disk.
-  DoLogData();
-}
-
-void Logger::StartLogging() {
   // Grab data from each channel right before we declare the log file started
   // so we can capture the latest message on each channel.  This lets us have
   // non periodic messages with configuration that now get logged.
@@ -219,6 +232,25 @@
                         polling_period_);
 }
 
+std::unique_ptr<LogNamer> Logger::StopLogging(
+    aos::monotonic_clock::time_point end_time) {
+  CHECK(log_namer_) << ": Not logging right now";
+
+  if (end_time != aos::monotonic_clock::min_time) {
+    LogUntil(end_time);
+  }
+  timer_handler_->Disable();
+
+  for (FetcherStruct &f : fetchers_) {
+    f.writer = nullptr;
+    f.timestamp_writer = nullptr;
+    f.contents_writer = nullptr;
+  }
+  node_state_.clear();
+
+  return std::move(log_namer_);
+}
+
 void Logger::WriteHeader() {
   if (configuration::MultiNode(configuration_)) {
     server_statistics_fetcher_.Fetch();
@@ -356,6 +388,7 @@
   flatbuffers::Offset<flatbuffers::String> name_offset =
       fbb.CreateString(name_);
 
+  CHECK(uuid_ != UUID::Zero());
   flatbuffers::Offset<flatbuffers::String> logger_uuid_offset =
       fbb.CreateString(uuid_.string_view());
 
@@ -384,8 +417,7 @@
   // message.  Report back 3x to be extra safe, and because the cost isn't
   // huge on the read side.
   log_file_header_builder.add_max_out_of_order_duration(
-      std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_)
-          .count());
+      std::chrono::nanoseconds(3 * polling_period_).count());
 
   log_file_header_builder.add_monotonic_start_time(
       std::chrono::duration_cast<std::chrono::nanoseconds>(
@@ -522,22 +554,21 @@
   last_synchronized_time_ = t;
 }
 
-void Logger::DoLogData() {
-  // We want to guarentee that messages aren't out of order by more than
+void Logger::DoLogData(const monotonic_clock::time_point end_time) {
+  // We want to guarantee that messages aren't out of order by more than
   // max_out_of_order_duration.  To do this, we need sync points.  Every write
   // cycle should be a sync point.
-  const monotonic_clock::time_point monotonic_now =
-      event_loop_->monotonic_now();
 
   do {
     // Move the sync point up by at most polling_period.  This forces one sync
     // per iteration, even if it is small.
-    LogUntil(
-        std::min(last_synchronized_time_ + polling_period_, monotonic_now));
+    LogUntil(std::min(last_synchronized_time_ + polling_period_, end_time));
+
+    on_logged_period_();
 
     // If we missed cycles, we could be pretty far behind.  Spin until we are
     // caught up.
-  } while (last_synchronized_time_ + polling_period_ < monotonic_now);
+  } while (last_synchronized_time_ + polling_period_ < end_time);
 }
 
 std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {