Enforce max out of order duration contract when writing logs.

Actively enforce this metric on the writer side by tracking the newest
message received and fetcher data for the current message being logged.
If max out of order duration is violated, rotate the part file and
double the max out of order duration value for the next part file.
Eventually we'll reach a satisfactory value and continue logging.

On the log reader side, choose the highest max out of order duration
value among all part files with the same part uuid. This ensures we have
sufficient messages in queue while reading to guarantee that max out of
order duration contract is satisfied.

Change-Id: I1666ed54acfcbc59cb9aedbe2f0dee979ceca095
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 56d0c4f..1e0f7c8 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -269,6 +269,7 @@
 
   log_event_uuid_ = UUID::Random();
   log_start_uuid_ = log_start_uuid;
+  log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
 
   // We want to do as much work as possible before the initial Fetch. Time
   // between that and actually starting to log opens up the possibility of
@@ -287,8 +288,6 @@
     }
   }
 
-  log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
-
   const aos::monotonic_clock::time_point beginning_time =
       event_loop_->monotonic_now();
 
@@ -705,7 +704,11 @@
     ContextDataCopier coppier(f.fetcher->context(), f.channel_index, f.log_type,
                               event_loop_);
 
-    writer->CopyMessage(&coppier, source_node_boot_uuid, start);
+    aos::monotonic_clock::time_point message_time =
+        static_cast<int>(node_index_) != f.data_node_index
+            ? f.fetcher->context().monotonic_remote_time
+            : f.fetcher->context().monotonic_event_time;
+    writer->CopyMessage(&coppier, source_node_boot_uuid, start, message_time);
     RecordCreateMessageTime(start, coppier.end_time(), f);
 
     VLOG(2) << "Wrote data as node " << FlatbufferToJson(node_)
@@ -730,7 +733,8 @@
     ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
                               LogType::kLogDeliveryTimeOnly, event_loop_);
 
-    timestamp_writer->CopyMessage(&coppier, event_loop_->boot_uuid(), start);
+    timestamp_writer->CopyMessage(&coppier, event_loop_->boot_uuid(), start,
+                                  f.fetcher->context().monotonic_event_time);
     RecordCreateMessageTime(start, coppier.end_time(), f);
 
     VLOG(2) << "Wrote timestamps as node " << FlatbufferToJson(node_)
@@ -780,8 +784,10 @@
     RemoteMessageCopier coppier(msg, channel_index, monotonic_timestamp_time,
                                 event_loop_);
 
-    contents_writer->CopyMessage(&coppier, UUID::FromVector(msg->boot_uuid()),
-                                 start);
+    contents_writer->CopyMessage(
+        &coppier, UUID::FromVector(msg->boot_uuid()), start,
+        monotonic_clock::time_point(
+            chrono::nanoseconds(msg->monotonic_sent_time())));
 
     RecordCreateMessageTime(start, coppier.end_time(), f);
   }