Close previous logger first when restarting

The LZMA compressor is a massive memory user, and having both open at
once significantly increases peak memory usage, which is all that
matters since the OS doesn't reliably reclaim memory from the logger.

Digging in further, we have very few to no mallocs right now in a
lzma logger when it is up and running.  The increasing memory
usage comes from the kernel paging memory in behind the heap when it
gets accessed the first time, and the high peak memory needed by
rotating.

If this isn't enough, we should be able to figure out how to make a pool
for the lzma compressor so it doesn't re-allocate.

Change-Id: Ife2d6a1d51b279aadd99825ce2018d608493d360
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 1a333e7..b16bb47 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -348,11 +348,45 @@
 
   VLOG(1) << "Restarting logger for " << FlatbufferToJson(node_);
 
-  // Force out every currently pending message, pointing all fetchers at the
-  // last (currently available) records.  Note that LogUntil() updates
-  // last_synchronized_time_ to the time value that it receives.
-  while (LogUntil(last_synchronized_time_ + polling_period_))
-    ;
+  // Make sure not to write past now so we don't risk out of order problems.  We
+  // don't want to get into a situation where we write out up to now + 0.1 sec,
+  // and that operation takes ~0.1 seconds, so we end up writing a different
+  // amount of the early and late channels.  That would then result in the next
+  // go around finding more than 0.1 sec of data on the early channels.
+  //
+  // Make sure we read up until "now" and log it.  This sets us up so that we
+  // are unlikely to fetch a message far in the future and have a ton of data
+  // before the offical start time.
+  monotonic_clock::time_point newest_record =
+      monotonic_clock::min_time;
+  while (true) {
+    aos::monotonic_clock::time_point next_time =
+        last_synchronized_time_ + polling_period_;
+    const aos::monotonic_clock::time_point monotonic_now =
+        event_loop_->monotonic_now();
+    if (next_time > monotonic_now) {
+      next_time = monotonic_now;
+    }
+
+    bool wrote_messages = false;
+    std::tie(wrote_messages, newest_record) = LogUntil(next_time);
+
+    if (next_time == monotonic_now &&
+        (!wrote_messages || newest_record < monotonic_now + polling_period_)) {
+      // If we stopped writing messages, then we probably have stopped making
+      // progress. If the newest record (unwritten or written) on a channel is
+      // very close to the current time, then there won't be much data
+      // officially after the end of the last log but before the start of the
+      // current one.  We need to pick the start of the current log to be after
+      // the last message on record so we don't have holes in the log.
+      break;
+    }
+  }
+
+  // We are now synchronized up to last_synchronized_time_.  Our start time can
+  // safely be "newest_record".  But, we need to guarentee that the start time
+  // is after the newest message we have a record of, and that we don't skip any
+  // messages as we rotate.  This means we can't call Fetch anywhere.
 
   std::unique_ptr<LogNamer> old_log_namer = std::move(log_namer_);
   log_namer_ = std::move(log_namer);
@@ -371,13 +405,11 @@
                  << "ns to swap log_namer";
   }
 
-  // Since we are going to log all in 1 big go, we need our log start time to
-  // be after the previous LogUntil call finished, but before 1 period after
-  // it. The best way to guarentee that is to pick a start time that is the
-  // earliest of the two.  That covers the case where the OS puts us to sleep
-  // between when we finish LogUntil and capture beginning_time.
-  const aos::monotonic_clock::time_point monotonic_start_time =
-      std::min(last_synchronized_time_, beginning_time);
+  // Our start time is now the newest message we have a record of.  We will
+  // declare the old log "done", and start in on the new one, double-logging
+  // anything we have a record of so we have all the messages from before the
+  // start.
+  const aos::monotonic_clock::time_point monotonic_start_time = newest_record;
   const aos::realtime_clock::time_point realtime_start_time =
       (beginning_time_rt + (monotonic_start_time.time_since_epoch() -
                             ((beginning_time.time_since_epoch() +
@@ -402,66 +434,32 @@
   const aos::monotonic_clock::time_point header_time =
       event_loop_->monotonic_now();
 
-  // Write the transition record(s) for each channel ...
+  // Close out the old writers to free up memory to be used by the new writers.
+  old_log_namer->Close();
+
   for (FetcherStruct &f : fetchers_) {
     // Create writers from the new namer
-    NewDataWriter *next_writer = nullptr;
-    NewDataWriter *next_timestamp_writer = nullptr;
-    NewDataWriter *next_contents_writer = nullptr;
 
     if (f.wants_writer) {
-      next_writer = log_namer_->MakeWriter(f.channel);
+      f.writer = log_namer_->MakeWriter(f.channel);
     }
     if (f.wants_timestamp_writer) {
-      next_timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
+      f.timestamp_writer = log_namer_->MakeTimestampWriter(f.channel);
     }
     if (f.wants_contents_writer) {
-      next_contents_writer = log_namer_->MakeForwardedTimestampWriter(
+      f.contents_writer = log_namer_->MakeForwardedTimestampWriter(
           f.channel, CHECK_NOTNULL(f.timestamp_node));
     }
 
-    if (f.fetcher->context().data != nullptr) {
-      // Write the last message fetched as the first of the new log of this
-      // type. The timestamps on these will all be before the new start time.
-      WriteData(next_writer, f);
-      WriteTimestamps(next_timestamp_writer, f);
-      WriteContent(next_contents_writer, f);
-
-      // It is possible that a few more snuck in. Write them all out also,
-      // including any that should also be in the old log.
-      while (true) {
-        // Get the next message ...
-        const auto start = event_loop_->monotonic_now();
-        const bool got_new = f.fetcher->FetchNext();
-        const auto end = event_loop_->monotonic_now();
-        RecordFetchResult(start, end, got_new, &f);
-
-        if (got_new) {
-          if (f.fetcher->context().monotonic_event_time <=
-              last_synchronized_time_) {
-            WriteFetchedRecord(f);
-            WriteData(next_writer, f);
-            WriteTimestamps(next_timestamp_writer, f);
-            WriteContent(next_contents_writer, f);
-
-          } else {
-            f.written = false;
-            break;
-          }
-
-        } else {
-          f.written = true;
-          break;
-        }
-      }
-    }
-
-    // Switch fully over to the new writers.
-    f.writer = next_writer;
-    f.timestamp_writer = next_timestamp_writer;
-    f.contents_writer = next_contents_writer;
+    // Mark each channel with data as not written.  That triggers each channel
+    // to be re-logged.
+    f.written = f.fetcher->context().data == nullptr;
   }
 
+  // And now make sure to log everything up to the start time in 1 big go so we
+  // make sure we have it before we let the world start logging normally again.
+  LogUntil(monotonic_start_time);
+
   const aos::monotonic_clock::time_point channel_time =
       event_loop_->monotonic_now();
 
@@ -828,8 +826,10 @@
   WriteContent(f.contents_writer, f);
 }
 
-bool Logger::LogUntil(monotonic_clock::time_point t) {
-  bool has_pending_messages = false;
+std::pair<bool, monotonic_clock::time_point> Logger::LogUntil(
+    monotonic_clock::time_point t) {
+  bool wrote_messages = false;
+  monotonic_clock::time_point newest_record = monotonic_clock::min_time;
 
   // Grab the latest ServerStatistics message.  This will always have the
   // oppertunity to be >= to the current time, so it will always represent any
@@ -838,6 +838,11 @@
 
   // Write each channel to disk, one at a time.
   for (FetcherStruct &f : fetchers_) {
+    if (f.fetcher->context().data != nullptr) {
+      newest_record =
+          std::max(newest_record, f.fetcher->context().monotonic_event_time);
+    }
+
     while (true) {
       if (f.written) {
         const auto start = event_loop_->monotonic_now();
@@ -850,23 +855,25 @@
                          f.fetcher->channel());
           break;
         }
+        newest_record =
+            std::max(newest_record, f.fetcher->context().monotonic_event_time);
         f.written = false;
       }
 
       // TODO(james): Write tests to exercise this logic.
       if (f.fetcher->context().monotonic_event_time >= t) {
-        has_pending_messages = true;
         break;
       }
 
       WriteFetchedRecord(f);
+      wrote_messages = true;
 
       f.written = true;
     }
   }
   last_synchronized_time_ = t;
 
-  return has_pending_messages;
+  return std::make_pair(wrote_messages, newest_record);
 }
 
 void Logger::DoLogData(const monotonic_clock::time_point end_time,