Enforce max out of order duration contract when writing logs.

Actively enforce this metric on the writer side by tracking the newest
message received and fetcher data for the current message being logged.
If max out of order duration is violated, rotate the part file and
double the max out of order duration value for the next part file.
Eventually we'll reach a satisfactory value and continue logging.

On the log reader side, choose the highest max out of order duration
value among all part files with the same part uuid. This ensures we have
sufficient messages in queue while reading to guarantee that max out of
order duration contract is satisfied.

Change-Id: I1666ed54acfcbc59cb9aedbe2f0dee979ceca095
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index f4071e4..a0abcd8 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -216,8 +216,13 @@
     writer.QueueSpan(m2.span());
   }
 
+  // When parts are sorted, we choose the highest max out of order duration for
+  // all parts with the same part uuid.
   const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
 
+  EXPECT_EQ(parts.size(), 1);
+  EXPECT_EQ(parts[0].parts.size(), 1);
+
   PartsMessageReader reader(parts[0].parts[0]);
 
   EXPECT_EQ(reader.filename(), logfile0);
@@ -225,16 +230,18 @@
   // Confirm that the timestamps track, and the filename also updates.
   // Read the first message.
   EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
+  // Since config1 has higher max out of order duration, that will be used to
+  // read partfiles with same part uuid, i.e logfile0 and logfile1.
   EXPECT_EQ(
       reader.max_out_of_order_duration(),
-      std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
+      std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
   EXPECT_TRUE(reader.ReadMessage());
   EXPECT_EQ(reader.filename(), logfile0);
   EXPECT_EQ(reader.newest_timestamp(),
             monotonic_clock::time_point(chrono::nanoseconds(1)));
   EXPECT_EQ(
       reader.max_out_of_order_duration(),
-      std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
+      std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
 
   // Read the second message.
   EXPECT_TRUE(reader.ReadMessage());
@@ -252,6 +259,11 @@
       reader.max_out_of_order_duration(),
       std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
   EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
+
+  // Verify that the parts metadata has the correct max out of order duration.
+  EXPECT_EQ(
+      parts[0].parts[0].max_out_of_order_duration,
+      std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
 }
 
 // Tests that Message's operator < works as expected.