Close previous logger first when restarting

The LZMA compressor is a massive memory user, and having both open at
once significantly increases peak memory usage, which is all that
matters since the OS doesn't reliably reclaim memory from the logger.

Digging in further, we have very few to no mallocs right now in a
lzma logger when it is up and running.  The increasing memory
usage comes from the kernel paging memory in behind the heap when it
gets accessed the first time, and the high peak memory needed by
rotating.

If this isn't enough, we should be able to figure out how to make a pool
for the lzma compressor so it doesn't re-allocate.

Change-Id: Ife2d6a1d51b279aadd99825ce2018d608493d360
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index 8ec1e70..9ca1274 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -714,9 +714,13 @@
   return data_writer_.get();
 }
 
-void MultiNodeLogNamer::Close() {
+WriteCode MultiNodeLogNamer::Close() {
   data_writers_.clear();
   data_writer_.reset();
+  if (ran_out_of_space_) {
+    return WriteCode::kOutOfSpace;
+  }
+  return WriteCode::kOk;
 }
 
 void MultiNodeLogNamer::ResetStatistics() {
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 3b54025..84745b6 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -216,6 +216,10 @@
   // Returns all the nodes that data is being written for.
   const std::vector<const Node *> &nodes() const { return nodes_; }
 
+  // Closes all existing log data writers. No more data may be written after
+  // this.
+  virtual WriteCode Close() = 0;
+
   // Returns the node the logger is running on.
   const Node *node() const { return node_; }
   const UUID &logger_node_boot_uuid() const { return logger_node_boot_uuid_; }
@@ -387,7 +391,7 @@
   // Closes all existing log files. No more data may be written after this.
   //
   // This may set ran_out_of_space().
-  void Close();
+  WriteCode Close() override;
 
   // Accessors for various statistics. See the identically-named methods in
   // DetachedBufferWriter for documentation. These are aggregated across all
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 1a333e7..b16bb47 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -348,11 +348,45 @@
 
   VLOG(1) << "Restarting logger for " << FlatbufferToJson(node_);
 
-  // Force out every currently pending message, pointing all fetchers at the
-  // last (currently available) records.  Note that LogUntil() updates
-  // last_synchronized_time_ to the time value that it receives.
-  while (LogUntil(last_synchronized_time_ + polling_period_))
-    ;
+  // Make sure not to write past now so we don't risk out of order problems.  We
+  // don't want to get into a situation where we write out up to now + 0.1 sec,
+  // and that operation takes ~0.1 seconds, so we end up writing a different
+  // amount of the early and late channels.  That would then result in the next
+  // go around finding more than 0.1 sec of data on the early channels.
+  //
+  // Make sure we read up until "now" and log it.  This sets us up so that we
+  // are unlikely to fetch a message far in the future and have a ton of data
+  // before the offical start time.
+  monotonic_clock::time_point newest_record =
+      monotonic_clock::min_time;
+  while (true) {
+    aos::monotonic_clock::time_point next_time =
+        last_synchronized_time_ + polling_period_;
+    const aos::monotonic_clock::time_point monotonic_now =
+        event_loop_->monotonic_now();
+    if (next_time > monotonic_now) {
+      next_time = monotonic_now;
+    }
+
+    bool wrote_messages = false;
+    std::tie(wrote_messages, newest_record) = LogUntil(next_time);
+
+    if (next_time == monotonic_now &&
+        (!wrote_messages || newest_record < monotonic_now + polling_period_)) {
+      // If we stopped writing messages, then we probably have stopped making
+      // progress. If the newest record (unwritten or written) on a channel is
+      // very close to the current time, then there won't be much data
+      // officially after the end of the last log but before the start of the
+      // current one.  We need to pick the start of the current log to be after
+      // the last message on record so we don't have holes in the log.
+      break;
+    }
+  }
+
+  // We are now synchronized up to last_synchronized_time_.  Our start time can
+  // safely be "newest_record".  But, we need to guarentee that the start time
+  // is after the newest message we have a record of, and that we don't skip any
+  // messages as we rotate.  This means we can't call Fetch anywhere.
 
   std::unique_ptr<LogNamer> old_log_namer = std::move(log_namer_);
   log_namer_ = std::move(log_namer);
@@ -371,13 +405,11 @@
                  << "ns to swap log_namer";
   }
 
-  // Since we are going to log all in 1 big go, we need our log start time to
-  // be after the previous LogUntil call finished, but before 1 period after
-  // it. The best way to guarentee that is to pick a start time that is the
-  // earliest of the two.  That covers the case where the OS puts us to sleep
-  // between when we finish LogUntil and capture beginning_time.
-  const aos::monotonic_clock::time_point monotonic_start_time =
-      std::min(last_synchronized_time_, beginning_time);
+  // Our start time is now the newest message we have a record of.  We will
+  // declare the old log "done", and start in on the new one, double-logging
+  // anything we have a record of so we have all the messages from before the
+  // start.
+  const aos::monotonic_clock::time_point monotonic_start_time = newest_record;
   const aos::realtime_clock::time_point realtime_start_time =
       (beginning_time_rt + (monotonic_start_time.time_since_epoch() -
                             ((beginning_time.time_since_epoch() +
@@ -402,66 +434,32 @@
   const aos::monotonic_clock::time_point header_time =
       event_loop_->monotonic_now();
 
-  // Write the transition record(s) for each channel ...
+  // Close out the old writers to free up memory to be used by the new writers.
+  old_log_namer->Close();
+
   for (FetcherStruct &f : fetchers_) {
     // Create writers from the new namer
-    NewDataWriter *next_writer = nullptr;
-    NewDataWriter *next_timestamp_writer = nullptr;
-    NewDataWriter *next_contents_writer = nullptr;
 
     if (f.wants_writer) {
-      next_writer = log_namer_->MakeWriter(f.channel);
+      f.writer = log_namer_->MakeWriter(f.channel);
     }
     if (f.wants_timestamp_writer) {
-      next_timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+      f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
     }
     if (f.wants_contents_writer) {
-      next_contents_writer = log_namer_->MakeForwardedTimestampWriter(
+      f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
           f.channel, CHECK_NOTNULL(f.timestamp_node));
     }
 
-    if (f.fetcher->context().data != nullptr) {
-      // Write the last message fetched as the first of the new log of this
-      // type. The timestamps on these will all be before the new start time.
-      WriteData(next_writer, f);
-      WriteTimestamps(next_timestamp_writer, f);
-      WriteContent(next_contents_writer, f);
-
-      // It is possible that a few more snuck in. Write them all out also,
-      // including any that should also be in the old log.
-      while (true) {
-        // Get the next message ...
-        const auto start = event_loop_->monotonic_now();
-        const bool got_new = f.fetcher->FetchNext();
-        const auto end = event_loop_->monotonic_now();
-        RecordFetchResult(start, end, got_new, &f);
-
-        if (got_new) {
-          if (f.fetcher->context().monotonic_event_time <=
-              last_synchronized_time_) {
-            WriteFetchedRecord(f);
-            WriteData(next_writer, f);
-            WriteTimestamps(next_timestamp_writer, f);
-            WriteContent(next_contents_writer, f);
-
-          } else {
-            f.written = false;
-            break;
-          }
-
-        } else {
-          f.written = true;
-          break;
-        }
-      }
-    }
-
-    // Switch fully over to the new writers.
-    f.writer = next_writer;
-    f.timestamp_writer = next_timestamp_writer;
-    f.contents_writer = next_contents_writer;
+    // Mark each channel with data as not written.  That triggers each channel
+    // to be re-logged.
+    f.written = f.fetcher->context().data == nullptr;
   }
 
+  // And now make sure to log everything up to the start time in 1 big go so we
+  // make sure we have it before we let the world start logging normally again.
+  LogUntil(monotonic_start_time);
+
   const aos::monotonic_clock::time_point channel_time =
       event_loop_->monotonic_now();
 
@@ -828,8 +826,10 @@
   WriteContent(f.contents_writer, f);
 }
 
-bool Logger::LogUntil(monotonic_clock::time_point t) {
-  bool has_pending_messages = false;
+std::pair<bool, monotonic_clock::time_point> Logger::LogUntil(
+    monotonic_clock::time_point t) {
+  bool wrote_messages = false;
+  monotonic_clock::time_point newest_record = monotonic_clock::min_time;
 
   // Grab the latest ServerStatistics message.  This will always have the
   // oppertunity to be >= to the current time, so it will always represent any
@@ -838,6 +838,11 @@
 
   // Write each channel to disk, one at a time.
   for (FetcherStruct &f : fetchers_) {
+    if (f.fetcher->context().data != nullptr) {
+      newest_record =
+          std::max(newest_record, f.fetcher->context().monotonic_event_time);
+    }
+
     while (true) {
       if (f.written) {
         const auto start = event_loop_->monotonic_now();
@@ -850,23 +855,25 @@
                          f.fetcher->channel());
           break;
         }
+        newest_record =
+            std::max(newest_record, f.fetcher->context().monotonic_event_time);
         f.written = false;
       }
 
       // TODO(james): Write tests to exercise this logic.
       if (f.fetcher->context().monotonic_event_time >= t) {
-        has_pending_messages = true;
         break;
       }
 
       WriteFetchedRecord(f);
+      wrote_messages = true;
 
       f.written = true;
     }
   }
   last_synchronized_time_ = t;
 
-  return has_pending_messages;
+  return std::make_pair(wrote_messages, newest_record);
 }
 
 void Logger::DoLogData(const monotonic_clock::time_point end_time,
diff --git a/aos/events/logging/log_writer.h b/aos/events/logging/log_writer.h
index 0063c9b..d127f1b 100644
--- a/aos/events/logging/log_writer.h
+++ b/aos/events/logging/log_writer.h
@@ -250,9 +250,12 @@
   // Fetches from each channel until all the data is logged.  This is dangerous
   // because it lets you log for more than 1 period.  All calls need to verify
   // that t isn't greater than 1 period in the future.
-  // Returns true if there is at least one message that has been fetched but
-  // not yet written.
-  bool LogUntil(monotonic_clock::time_point t);
+  //
+  // Returns true if there is at least one message written, and also returns the
+  // timestamp of the newest record that any fetcher is pointing to, or min_time
+  // if there are no messages published on any logged channels.
+  std::pair<bool, monotonic_clock::time_point> LogUntil(
+      monotonic_clock::time_point t);
 
   void RecordFetchResult(aos::monotonic_clock::time_point start,
                          aos::monotonic_clock::time_point end, bool got_new,
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index 18337be..5d18d3d 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -3802,6 +3802,52 @@
   auto result = ConfirmReadable(filenames);
 }
 
+// Tests that RestartLogging works in the simple case.  Unfortunately, the
+// failure cases involve simulating time elapsing in callbacks, which is really
+// hard.  The best we can reasonably do is make sure 2 back to back logs are
+// parseable together.
+TEST_P(MultinodeLoggerTest, RestartLogging) {
+  time_converter_.AddMonotonic(
+      {BootTimestamp::epoch(), BootTimestamp::epoch() + chrono::seconds(1000)});
+  std::vector<std::string> filenames;
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger, logfile_base1_);
+    aos::monotonic_clock::time_point last_rotation_time =
+        pi1_logger.event_loop->monotonic_now();
+    pi1_logger.logger->set_on_logged_period([&] {
+      const auto now = pi1_logger.event_loop->monotonic_now();
+      if (now > last_rotation_time + std::chrono::seconds(5)) {
+        pi1_logger.AppendAllFilenames(&filenames);
+        std::unique_ptr<MultiNodeFilesLogNamer> namer =
+            pi1_logger.MakeLogNamer(logfile_base2_);
+        pi1_logger.log_namer = namer.get();
+
+        pi1_logger.logger->RestartLogging(std::move(namer));
+        last_rotation_time = now;
+      }
+    });
+
+    event_loop_factory_.RunFor(chrono::milliseconds(7000));
+
+    pi1_logger.AppendAllFilenames(&filenames);
+  }
+
+  for (const auto &x : filenames) {
+    LOG(INFO) << x;
+  }
+
+  EXPECT_GE(filenames.size(), 2u);
+
+  ConfirmReadable(filenames);
+
+  // TODO(austin): It would be good to confirm that any one time messages end up
+  // in both logs correctly.
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/multinode_logger_test_lib.cc b/aos/events/logging/multinode_logger_test_lib.cc
index 22822e7..fdee4d8 100644
--- a/aos/events/logging/multinode_logger_test_lib.cc
+++ b/aos/events/logging/multinode_logger_test_lib.cc
@@ -29,6 +29,16 @@
           params};
 }
 
+std::unique_ptr<MultiNodeFilesLogNamer> LoggerState::MakeLogNamer(
+    std::string logfile_base) {
+  std::unique_ptr<MultiNodeFilesLogNamer> namer =
+      std::make_unique<MultiNodeFilesLogNamer>(logfile_base, configuration,
+                                               event_loop.get(), node);
+  namer->set_extension(params.extension);
+  namer->set_encoder_factory(params.encoder_factory);
+  return namer;
+}
+
 void LoggerState::StartLogger(std::string logfile_base) {
   CHECK(!logfile_base.empty());
 
@@ -41,11 +51,7 @@
   logger->set_logger_version(
       absl::StrCat("logger_version_", event_loop->node()->name()->str()));
   event_loop->OnRun([this, logfile_base]() {
-    std::unique_ptr<MultiNodeFilesLogNamer> namer =
-        std::make_unique<MultiNodeFilesLogNamer>(logfile_base, configuration,
-                                            event_loop.get(), node);
-    namer->set_extension(params.extension);
-    namer->set_encoder_factory(params.encoder_factory);
+    std::unique_ptr<MultiNodeFilesLogNamer> namer = MakeLogNamer(logfile_base);
     log_namer = namer.get();
 
     logger->StartLogging(std::move(namer));
diff --git a/aos/events/logging/multinode_logger_test_lib.h b/aos/events/logging/multinode_logger_test_lib.h
index 40b5933..06a4a21 100644
--- a/aos/events/logging/multinode_logger_test_lib.h
+++ b/aos/events/logging/multinode_logger_test_lib.h
@@ -43,6 +43,9 @@
 struct LoggerState {
   void StartLogger(std::string logfile_base);
 
+  std::unique_ptr<MultiNodeFilesLogNamer> MakeLogNamer(
+      std::string logfile_base);
+
   std::unique_ptr<EventLoop> event_loop;
   std::unique_ptr<Logger> logger;
   const Configuration *configuration;