Add option to run the logger X second behind.

This lets us rotate when an event happens, but trigger the rotation
before the event is written to disk.

Change-Id: Ic275702db768b332a175a8eb86fa37844c7dbff2
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 312a6ae..5ccf15b 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -27,8 +27,9 @@
       node_(configuration::GetNode(configuration_, event_loop->node())),
       node_index_(configuration::GetNodeIndex(configuration_, node_)),
       name_(network::GetHostname()),
-      timer_handler_(event_loop_->AddTimer(
-          [this]() { DoLogData(event_loop_->monotonic_now(), true); })),
+      timer_handler_(event_loop_->AddTimer([this]() {
+        DoLogData(event_loop_->monotonic_now() - logging_delay_, true);
+      })),
       server_statistics_fetcher_(
           configuration::MultiNode(event_loop_->configuration())
               ? event_loop_->MakeFetcher<message_bridge::ServerStatistics>(
@@ -291,6 +292,8 @@
   const aos::monotonic_clock::time_point beginning_time =
       event_loop_->monotonic_now();
 
+  log_until_time_ = beginning_time;
+
   // Grab data from each channel right before we declare the log file started
   // so we can capture the latest message on each channel.  This lets us have
   // non periodic messages with configuration that now get logged.
@@ -338,77 +341,45 @@
 }
 
 std::unique_ptr<LogNamer> Logger::RestartLogging(
-    std::unique_ptr<LogNamer> log_namer, std::optional<UUID> log_start_uuid) {
+    std::unique_ptr<LogNamer> log_namer, std::optional<UUID> log_start_uuid,
+    std::optional<monotonic_clock::time_point> end_time) {
   CHECK(log_namer_) << ": Unexpected restart while not logging";
 
   VLOG(1) << "Restarting logger for " << FlatbufferToJson(node_);
 
-  // Make sure not to write past now so we don't risk out of order problems.  We
-  // don't want to get into a situation where we write out up to now + 0.1 sec,
-  // and that operation takes ~0.1 seconds, so we end up writing a different
-  // amount of the early and late channels.  That would then result in the next
-  // go around finding more than 0.1 sec of data on the early channels.
-  //
-  // Make sure we read up until "now" and log it.  This sets us up so that we
-  // are unlikely to fetch a message far in the future and have a ton of data
-  // before the offical start time.
-  monotonic_clock::time_point newest_record = monotonic_clock::min_time;
-  while (true) {
-    aos::monotonic_clock::time_point next_time =
-        last_synchronized_time_ + polling_period_;
-    const aos::monotonic_clock::time_point monotonic_now =
-        event_loop_->monotonic_now();
-    if (next_time > monotonic_now) {
-      next_time = monotonic_now;
-    }
+  // Grab a representative time on both the RT and monotonic clock.
+  // Average a monotonic clock before and after to reduce the error.
+  const aos::monotonic_clock::time_point monotonic_now1 =
+      event_loop_->monotonic_now();
+  const aos::realtime_clock::time_point realtime_now =
+      event_loop_->realtime_now();
+  const aos::monotonic_clock::time_point monotonic_now2 =
+      event_loop_->monotonic_now();
 
-    bool wrote_messages = false;
-    std::tie(wrote_messages, newest_record) = LogUntil(next_time);
-
-    if (next_time == monotonic_now &&
-        (!wrote_messages || newest_record < monotonic_now + polling_period_)) {
-      // If we stopped writing messages, then we probably have stopped making
-      // progress. If the newest record (unwritten or written) on a channel is
-      // very close to the current time, then there won't be much data
-      // officially after the end of the last log but before the start of the
-      // current one.  We need to pick the start of the current log to be after
-      // the last message on record so we don't have holes in the log.
-      break;
+  // Log until the provided end time.
+  if (end_time) {
+    CHECK_LE(*end_time, monotonic_now1) << ": Can't log into the future.";
+    // DoLogData is a bit fragile.
+    if (*end_time > last_synchronized_time_) {
+      DoLogData(*end_time, false);
     }
   }
 
-  // We are now synchronized up to last_synchronized_time_.  Our start time can
-  // safely be "newest_record".  But, we need to guarentee that the start time
-  // is after the newest message we have a record of, and that we don't skip any
-  // messages as we rotate.  This means we can't call Fetch anywhere.
+  // We are now synchronized up to last_synchronized_time_.  We only have record
+  // of messages from before last_synchronized_time_, so it is a safe start
+  // time.
 
   std::unique_ptr<LogNamer> old_log_namer = std::move(log_namer_);
   log_namer_ = std::move(log_namer);
 
-  // Now grab a representative time on both the RT and monotonic clock.
-  // Average a monotonic clock before and after to reduce the error.
-  const aos::monotonic_clock::time_point beginning_time =
-      event_loop_->monotonic_now();
-  const aos::realtime_clock::time_point beginning_time_rt =
-      event_loop_->realtime_now();
-  const aos::monotonic_clock::time_point beginning_time2 =
-      event_loop_->monotonic_now();
-
-  if (beginning_time > last_synchronized_time_) {
-    LOG(WARNING) << "Took over " << polling_period_.count()
-                 << "ns to swap log_namer";
-  }
-
-  // Our start time is now the newest message we have a record of.  We will
-  // declare the old log "done", and start in on the new one, double-logging
-  // anything we have a record of so we have all the messages from before the
-  // start.
-  const aos::monotonic_clock::time_point monotonic_start_time = newest_record;
+  // Our start time is now how far we logged until before.
+  const aos::monotonic_clock::time_point monotonic_start_time =
+      last_synchronized_time_;
   const aos::realtime_clock::time_point realtime_start_time =
-      (beginning_time_rt + (monotonic_start_time.time_since_epoch() -
-                            ((beginning_time.time_since_epoch() +
-                              beginning_time2.time_since_epoch()) /
-                             2)));
+      (realtime_now + (last_synchronized_time_.time_since_epoch() -
+                       ((monotonic_now1.time_since_epoch() +
+                         monotonic_now2.time_since_epoch()) /
+                        2)));
 
   auto config_sha256 = WriteConfiguration(log_namer_.get());
 
@@ -459,7 +430,7 @@
 
   VLOG(1) << "Logging node as " << FlatbufferToJson(node_) << " restart_time "
           << last_synchronized_time_ << ", took "
-          << chrono::duration<double>(header_time - beginning_time).count()
+          << chrono::duration<double>(header_time - monotonic_now1).count()
           << " to prepare and write header, "
           << chrono::duration<double>(channel_time - header_time).count()
           << " to write initial channel messages, boot uuid "
@@ -827,6 +798,8 @@
   bool wrote_messages = false;
   monotonic_clock::time_point newest_record = monotonic_clock::min_time;
 
+  log_until_time_ = t;
+
   // Grab the latest ServerStatistics message.  This will always have the
   // oppertunity to be >= to the current time, so it will always represent any
   // reboots which may have happened.
@@ -842,7 +815,8 @@
     while (true) {
       if (f.written) {
         const auto start = event_loop_->monotonic_now();
-        const bool got_new = f.fetcher->FetchNext();
+        const bool got_new =
+            f.fetcher->FetchNextIf(std::ref(fetch_next_if_fn_));
         const auto end = event_loop_->monotonic_now();
         RecordFetchResult(start, end, got_new, &f);
         if (!got_new) {
@@ -857,7 +831,11 @@
       }
 
       // TODO(james): Write tests to exercise this logic.
-      if (f.fetcher->context().monotonic_event_time >= t) {
+      CHECK_LE(f.fetcher->context().monotonic_event_time, t);
+
+      // At startup, we can end up grabbing a message at the current time.
+      // Ignore it.
+      if (f.fetcher->context().monotonic_event_time == t) {
         break;
       }
 
@@ -874,6 +852,9 @@
 
 void Logger::DoLogData(const monotonic_clock::time_point end_time,
                        bool run_on_logged) {
+  if (end_time < last_synchronized_time_) return;
+
+  DCHECK(is_started());
   // We want to guarantee that messages aren't out of order by more than
   // max_out_of_order_duration.  To do this, we need sync points.  Every write
   // cycle should be a sync point.
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index 837a02a..3d3d7f2 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -66,6 +66,15 @@
     separate_config_ = separate_config;
   }
 
+  // Sets the amount to run the logger behind the current time.  This lets us
+  // make decisions about rotating or stopping logging before something happens.
+  // Using this to start logging in the past isn't yet supported.  This can be
+  // changed at runtime, but will only influence future writes, not what is
+  // already written.
+  void set_logging_delay(std::chrono::nanoseconds logging_delay) {
+    logging_delay_ = logging_delay;
+  }
+
   // Sets the period between polling the data. Defaults to 100ms.
   //
   // Changing this while a set of files is being written may result in
@@ -139,11 +148,14 @@
   void StartLogging(std::unique_ptr<LogNamer> log_namer,
                     std::optional<UUID> log_start_uuid = std::nullopt);
 
-  // Restart logging using a new naming scheme. Intended for log rotation.
-  // Returns a unique_ptr to the prior log_namer instance.
+  // Restarts logging using a new naming scheme. Intended for log rotation.
+  // Returns a unique_ptr to the prior log_namer instance.  If provided,
+  // end_time is the time to log until.  It must be in the past.  Times before
+  // the last_synchronized_time are ignored.
   std::unique_ptr<LogNamer> RestartLogging(
       std::unique_ptr<LogNamer> log_namer,
-      std::optional<UUID> log_start_uuid = std::nullopt);
+      std::optional<UUID> log_start_uuid = std::nullopt,
+      std::optional<monotonic_clock::time_point> end_time = std::nullopt);
 
   // Stops logging. Ensures any messages through end_time make it into the log.
   //
@@ -331,6 +343,16 @@
 
   // Fetcher for all the statistics from all the nodes.
   aos::Fetcher<message_bridge::ServerStatistics> server_statistics_fetcher_;
+
+  monotonic_clock::time_point log_until_time_ = monotonic_clock::min_time;
+
+  std::function<bool(const Context &)> fetch_next_if_fn_ =
+      [this](const Context &context) {
+        return context.monotonic_event_time < log_until_time_;
+      };
+
+  // Amount of time to run the logger behind now.
+  std::chrono::nanoseconds logging_delay_ = std::chrono::nanoseconds(0);
 };
 
 }  // namespace logger