Flush log files periodically

For low rate logs, disk bandwidth isn't the thing to worry about.  The
real concern is how long data sits in memory before being written to
disk.  Add a (configurable) threshold after which we flush data to disk
to bound the time.

Change-Id: Ia0be4a13a4840ec6b89c3ecafdeb3a1f1a82d4a2
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 84ea84a..b8b66a2 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -30,6 +30,9 @@
 
 DEFINE_int32(flush_size, 128000,
              "Number of outstanding bytes to allow before flushing to disk.");
+DEFINE_double(
+    flush_period, 5.0,
+    "Max time to let data sit in the queue before flushing in seconds.");
 
 DEFINE_double(
     max_out_of_order, -1,
@@ -98,6 +101,7 @@
     return;
   }
 
+  aos::monotonic_clock::time_point now;
   if (encoder_->may_bypass() && span.size() > 4096u) {
     // Over this threshold, we'll assume it's cheaper to add an extra
     // syscall to write the data immediately instead of copying it to
@@ -114,11 +118,13 @@
     const auto end = aos::monotonic_clock::now();
     HandleWriteReturn(written, span.size());
     UpdateStatsForWrite(end - start, written, 1);
+    now = end;
   } else {
     encoder_->Encode(CopySpanAsDetachedBuffer(span));
+    now = aos::monotonic_clock::now();
   }
 
-  FlushAtThreshold();
+  FlushAtThreshold(now);
 }
 
 void DetachedBufferWriter::Close() {
@@ -209,7 +215,8 @@
   total_write_bytes_ += written;
 }
 
-void DetachedBufferWriter::FlushAtThreshold() {
+void DetachedBufferWriter::FlushAtThreshold(
+    aos::monotonic_clock::time_point now) {
   if (ran_out_of_space_) {
     // We don't want any later data to be written after space becomes available,
     // so refuse to write anything more once we've dropped data because we ran
@@ -223,11 +230,22 @@
     return;
   }
 
+  // We don't want to flush the first time through.  Otherwise we will flush as
+  // the log file header might be compressing, defeating any parallelism and
+  // queueing there.
+  if (last_flush_time_ == aos::monotonic_clock::min_time) {
+    last_flush_time_ = now;
+  }
+
   // Flush if we are at the max number of iovs per writev, because there's no
   // point queueing up any more data in memory. Also flush once we have enough
-  // data queued up.
+  // data queued up or if it has been long enough.
   while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
-         encoder_->queue_size() >= IOV_MAX) {
+         encoder_->queue_size() >= IOV_MAX ||
+         now > last_flush_time_ +
+                   chrono::duration_cast<chrono::nanoseconds>(
+                       chrono::duration<double>(FLAGS_flush_period))) {
+    last_flush_time_ = now;
     Flush();
   }
 }