Flush log files periodically

For low rate logs, disk bandwidth isn't the thing to worry about.  The
real concern is how long data sits in memory before being written to
disk.  Add a (configurable) threshold after which we flush data to disk
to bound the time.

Change-Id: Ia0be4a13a4840ec6b89c3ecafdeb3a1f1a82d4a2
diff --git a/aos/events/logging/log_edit.cc b/aos/events/logging/log_edit.cc
index 7eff9e7..78d5eff 100644
--- a/aos/events/logging/log_edit.cc
+++ b/aos/events/logging/log_edit.cc
@@ -42,7 +42,7 @@
 
     aos::logger::DetachedBufferWriter buffer_writer(
         FLAGS_logfile, std::make_unique<aos::logger::DummyEncoder>());
-    buffer_writer.QueueSizedFlatbuffer(&fbb);
+    buffer_writer.QueueSizedFlatbuffer(&fbb, aos::monotonic_clock::min_time);
 
     while (true) {
       absl::Span<const uint8_t> msg_data = span_reader.ReadMessage();
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index f68bbf6..96e3dc8 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -14,6 +14,7 @@
 namespace logger {
 namespace {
 using message_bridge::RemoteMessage;
+namespace chrono = std::chrono;
 }  // namespace
 
 Logger::Logger(EventLoop *event_loop, const Configuration *configuration,
@@ -682,7 +683,7 @@
         CHECK(node_state_[f.data_node_index].header_valid)
             << ": Can't write data before the header on channel "
             << configuration::CleanedChannelToString(f.fetcher->channel());
-        f.writer->QueueSizedFlatbuffer(&fbb);
+        f.writer->QueueSizedFlatbuffer(&fbb, end);
       }
 
       if (f.timestamp_writer != nullptr) {
@@ -708,7 +709,7 @@
         CHECK(node_state_[f.timestamp_node_index].header_valid)
             << ": Can't write data before the header on channel "
             << configuration::CleanedChannelToString(f.fetcher->channel());
-        f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+        f.timestamp_writer->QueueSizedFlatbuffer(&fbb, end);
       }
 
       if (f.contents_writer != nullptr) {
@@ -769,7 +770,7 @@
         CHECK(node_state_[f.contents_node_index].header_valid)
             << ": Can't write data before the header on channel "
             << configuration::CleanedChannelToString(f.fetcher->channel());
-        f.contents_writer->QueueSizedFlatbuffer(&fbb);
+        f.contents_writer->QueueSizedFlatbuffer(&fbb, end);
       }
 
       f.written = true;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 84ea84a..b8b66a2 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -30,6 +30,9 @@
 
 DEFINE_int32(flush_size, 128000,
              "Number of outstanding bytes to allow before flushing to disk.");
+DEFINE_double(
+    flush_period, 5.0,
+    "Max time to let data sit in the queue before flushing in seconds.");
 
 DEFINE_double(
     max_out_of_order, -1,
@@ -98,6 +101,7 @@
     return;
   }
 
+  aos::monotonic_clock::time_point now;
   if (encoder_->may_bypass() && span.size() > 4096u) {
     // Over this threshold, we'll assume it's cheaper to add an extra
     // syscall to write the data immediately instead of copying it to
@@ -114,11 +118,13 @@
     const auto end = aos::monotonic_clock::now();
     HandleWriteReturn(written, span.size());
     UpdateStatsForWrite(end - start, written, 1);
+    now = end;
   } else {
     encoder_->Encode(CopySpanAsDetachedBuffer(span));
+    now = aos::monotonic_clock::now();
   }
 
-  FlushAtThreshold();
+  FlushAtThreshold(now);
 }
 
 void DetachedBufferWriter::Close() {
@@ -209,7 +215,8 @@
   total_write_bytes_ += written;
 }
 
-void DetachedBufferWriter::FlushAtThreshold() {
+void DetachedBufferWriter::FlushAtThreshold(
+    aos::monotonic_clock::time_point now) {
   if (ran_out_of_space_) {
     // We don't want any later data to be written after space becomes available,
     // so refuse to write anything more once we've dropped data because we ran
@@ -223,11 +230,22 @@
     return;
   }
 
+  // We don't want to flush the first time through.  Otherwise we will flush as
+  // the log file header might be compressing, defeating any parallelism and
+  // queueing there.
+  if (last_flush_time_ == aos::monotonic_clock::min_time) {
+    last_flush_time_ = now;
+  }
+
   // Flush if we are at the max number of iovs per writev, because there's no
   // point queueing up any more data in memory. Also flush once we have enough
-  // data queued up.
+  // data queued up or if it has been long enough.
   while (encoder_->queued_bytes() > static_cast<size_t>(FLAGS_flush_size) ||
-         encoder_->queue_size() >= IOV_MAX) {
+         encoder_->queue_size() >= IOV_MAX ||
+         now > last_flush_time_ +
+                   chrono::duration_cast<chrono::nanoseconds>(
+                       chrono::duration<double>(FLAGS_flush_period))) {
+    last_flush_time_ = now;
     Flush();
   }
 }
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 5fe45a5..2219a07 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -71,16 +71,22 @@
   // Triggers a flush if there's enough data queued up.
   //
   // Steals the detached buffer from it.
-  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb) {
-    QueueSizedFlatbuffer(fbb->Release());
+  void QueueSizedFlatbuffer(flatbuffers::FlatBufferBuilder *fbb,
+                            aos::monotonic_clock::time_point now) {
+    QueueSizedFlatbuffer(fbb->Release(), now);
   }
   // May steal the backing storage of buffer, or may leave it alone.
+  void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer,
+                            aos::monotonic_clock::time_point now) {
+    QueueSizedFlatbuffer(std::move(buffer));
+    FlushAtThreshold(now);
+  }
+  // Unconditionally queues the buffer.
   void QueueSizedFlatbuffer(flatbuffers::DetachedBuffer &&buffer) {
     if (ran_out_of_space_) {
       return;
     }
     encoder_->Encode(std::move(buffer));
-    FlushAtThreshold();
   }
 
   // Queues up data in span. May copy or may write it to disk immediately.
@@ -159,8 +165,10 @@
                            ssize_t written, int iovec_size);
 
   // Flushes data if we've reached the threshold to do that as part of normal
-  // operation.
-  void FlushAtThreshold();
+  // operation either due to the outstanding queued data, or because we have
+  // passed our flush period.  now is the current time to save some CPU grabbing
+  // the current time.  It just needs to be close.
+  void FlushAtThreshold(aos::monotonic_clock::time_point now);
 
   std::string filename_;
   std::unique_ptr<DetachedBufferEncoder> encoder_;
@@ -180,6 +188,9 @@
   int total_write_count_ = 0;
   int total_write_messages_ = 0;
   int total_write_bytes_ = 0;
+
+  aos::monotonic_clock::time_point last_flush_time_ =
+      aos::monotonic_clock::min_time;
 };
 
 // Packes a message pointed to by the context into a MessageHeader.