Track matching forwarded times for log file sorting

Previously, the only thing we knew was a loose ordering based on part
index.  That isn't very conclusive.  This now provides us with
corresponding timestamps for when things cross the network boundary in a
way that we think should tell us definitively what came first.

There are 2 main problem cases.  Let's say we have 2 channels.  /a which
is reliable, and /b which isn't, both sent from the same remote node.

case 1:  /a -> boot 0 received on boot 0.
         /b -> boot 1 received on boot 0.
 We start logging after both messages arrive.

case 2:  /a -> boot 0 received on boot 0.
         /b -> boot 0 received on boot 0.
 We log for a bit, then reboot.  More messages show up when we reconnect.
         /a -> boot 0 received on boot 1.
         /b -> boot 0 received on boot 1.

In case 1: we only have a reliable timestamp from boot 0, but that
reliable timestamp makes it clear that /a was before /b, so boot 0 was
before boot 1.

In case 2: we have the same reliable timestamp, so that tells us nothing.
The unreliable timestamps though tell a different story.  /b will be after
/a, since any messages on /b generated before the reboot won't get
delivered.  So, we get an ordering constraint saying that any sent /b's
on the second boot were after /b on the first boot.

We believe that any other cases are covered by the same mechanism.
Without fully implementing and debugging the sorting code, we won't know
for certain.  Sanjay and I have been unable to break the logic so far.

Change-Id: I990bf249b18bf43072997cdb099ac66c2fa8fc57
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index a4b15e8..93dd02d 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -25,9 +25,8 @@
       log_namer_(log_namer),
       reopen_(std::move(reopen)),
       close_(std::move(close)) {
-  boot_uuids_.resize(configuration::NodesCount(log_namer->configuration_),
-                     UUID::Zero());
-  CHECK_LT(node_index_, boot_uuids_.size());
+  state_.resize(configuration::NodesCount(log_namer->configuration_));
+  CHECK_LT(node_index_, state_.size());
   reopen_(this);
 }
 
@@ -54,14 +53,44 @@
   header_written_ = false;
 }
 
-void NewDataWriter::UpdateRemote(size_t remote_node_index,
-                                 const UUID &remote_node_boot_uuid) {
-  CHECK_LT(remote_node_index, boot_uuids_.size());
-  if (boot_uuids_[remote_node_index] != remote_node_boot_uuid) {
+void NewDataWriter::UpdateRemote(
+    const size_t remote_node_index, const UUID &remote_node_boot_uuid,
+    const monotonic_clock::time_point monotonic_remote_time,
+    const monotonic_clock::time_point monotonic_event_time,
+    const bool reliable) {
+  bool rotate = false;
+  CHECK_LT(remote_node_index, state_.size());
+  State &state = state_[remote_node_index];
+  if (state.boot_uuid != remote_node_boot_uuid) {
     VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
-              << remote_node_boot_uuid << " from "
-              << boot_uuids_[remote_node_index];
-    boot_uuids_[remote_node_index] = remote_node_boot_uuid;
+            << remote_node_boot_uuid << " from " << state.boot_uuid;
+    state.boot_uuid = remote_node_boot_uuid;
+    state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
+    state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
+    state.oldest_remote_unreliable_monotonic_timestamp =
+        monotonic_clock::max_time;
+    state.oldest_local_unreliable_monotonic_timestamp =
+        monotonic_clock::max_time;
+    rotate = true;
+  }
+
+  if (!reliable) {
+    if (state.oldest_remote_unreliable_monotonic_timestamp >
+        monotonic_remote_time) {
+      state.oldest_remote_unreliable_monotonic_timestamp =
+          monotonic_remote_time;
+      state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
+      rotate = true;
+    }
+  }
+
+  if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
+    state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
+    state.oldest_local_monotonic_timestamp = monotonic_event_time;
+    rotate = true;
+  }
+
+  if (rotate) {
     Rotate();
   }
 }
@@ -70,15 +99,15 @@
                                  const UUID &source_node_boot_uuid,
                                  aos::monotonic_clock::time_point now) {
   // TODO(austin): Handle remote nodes changing too, not just the source node.
-  if (boot_uuids_[node_index_] != source_node_boot_uuid) {
-    boot_uuids_[node_index_] = source_node_boot_uuid;
+  if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
+    state_[node_index_].boot_uuid = source_node_boot_uuid;
     if (header_written_) {
       Reboot();
     }
 
     QueueHeader(MakeHeader());
   }
-  CHECK_EQ(boot_uuids_[node_index_], source_node_boot_uuid);
+  CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
   CHECK(header_written_) << ": Attempting to write message before header to "
                          << writer->filename();
   writer->QueueSizedFlatbuffer(fbb, now);
@@ -88,14 +117,14 @@
 NewDataWriter::MakeHeader() {
   const size_t logger_node_index = log_namer_->logger_node_index();
   const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
-  if (boot_uuids_[logger_node_index] == UUID::Zero()) {
+  if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
     VLOG(1) << filename() << " Logger node is " << logger_node_index
             << " and uuid is " << logger_node_boot_uuid;
-    boot_uuids_[logger_node_index] = logger_node_boot_uuid;
+    state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
   } else {
-    CHECK_EQ(boot_uuids_[logger_node_index], logger_node_boot_uuid);
+    CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
   }
-  return log_namer_->MakeHeader(node_index_, boot_uuids_, parts_uuid(),
+  return log_namer_->MakeHeader(node_index_, state_, parts_uuid(),
                                 parts_index_);
 }
 
@@ -104,7 +133,7 @@
   CHECK(!header_written_) << ": Attempting to write duplicate header to "
                           << writer->filename();
   CHECK(header.message().has_source_node_boot_uuid());
-  CHECK_EQ(boot_uuids_[node_index_],
+  CHECK_EQ(state_[node_index_].boot_uuid,
            UUID::FromString(header.message().source_node_boot_uuid()));
   // TODO(austin): This triggers a dummy allocation that we don't need as part
   // of releasing.  Can we skip it?
@@ -120,12 +149,12 @@
 }
 
 aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
-    size_t node_index, const std::vector<UUID> &boot_uuids,
+    size_t node_index, const std::vector<NewDataWriter::State> &state,
     const UUID &parts_uuid, int parts_index) const {
-  const UUID &source_node_boot_uuid = boot_uuids[node_index];
+  const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
   const Node *const source_node =
       configuration::GetNode(configuration_, node_index);
-  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 20u);
+  CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 24u);
   flatbuffers::FlatBufferBuilder fbb;
   fbb.ForceDefaults(true);
 
@@ -181,10 +210,10 @@
   }
 
   std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
-  boot_uuid_offsets.reserve(boot_uuids.size());
-  for (const UUID &uuid : boot_uuids) {
-    if (uuid != UUID::Zero()) {
-      boot_uuid_offsets.emplace_back(uuid.PackString(&fbb));
+  boot_uuid_offsets.reserve(state.size());
+  for (const NewDataWriter::State &state : state) {
+    if (state.boot_uuid != UUID::Zero()) {
+      boot_uuid_offsets.emplace_back(state.boot_uuid.PackString(&fbb));
     } else {
       boot_uuid_offsets.emplace_back(fbb.CreateString(""));
     }
@@ -194,6 +223,43 @@
       flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
       boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
 
+  int64_t *oldest_remote_monotonic_timestamps;
+  flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+      oldest_remote_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
+          state.size(), &oldest_remote_monotonic_timestamps);
+
+  int64_t *oldest_local_monotonic_timestamps;
+  flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+      oldest_local_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
+          state.size(), &oldest_local_monotonic_timestamps);
+
+  int64_t *oldest_remote_unreliable_monotonic_timestamps;
+  flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+      oldest_remote_unreliable_monotonic_timestamps_offset =
+          fbb.CreateUninitializedVector(
+              state.size(), &oldest_remote_unreliable_monotonic_timestamps);
+
+  int64_t *oldest_local_unreliable_monotonic_timestamps;
+  flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+      oldest_local_unreliable_monotonic_timestamps_offset =
+          fbb.CreateUninitializedVector(
+              state.size(), &oldest_local_unreliable_monotonic_timestamps);
+
+  for (size_t i = 0; i < state.size(); ++i) {
+    oldest_remote_monotonic_timestamps[i] =
+        state[i].oldest_remote_monotonic_timestamp.time_since_epoch().count();
+    oldest_local_monotonic_timestamps[i] =
+        state[i].oldest_local_monotonic_timestamp.time_since_epoch().count();
+    oldest_remote_unreliable_monotonic_timestamps[i] =
+        state[i]
+            .oldest_remote_unreliable_monotonic_timestamp.time_since_epoch()
+            .count();
+    oldest_local_unreliable_monotonic_timestamps[i] =
+        state[i]
+            .oldest_local_unreliable_monotonic_timestamp.time_since_epoch()
+            .count();
+  }
+
   aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
 
   log_file_header_builder.add_name(name_offset);
@@ -263,6 +329,14 @@
       std::chrono::duration_cast<std::chrono::nanoseconds>(
           event_loop_->realtime_now().time_since_epoch())
           .count());
+  log_file_header_builder.add_oldest_remote_monotonic_timestamps(
+      oldest_remote_monotonic_timestamps_offset);
+  log_file_header_builder.add_oldest_local_monotonic_timestamps(
+      oldest_local_monotonic_timestamps_offset);
+  log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
+      oldest_remote_unreliable_monotonic_timestamps_offset);
+  log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
+      oldest_local_unreliable_monotonic_timestamps_offset);
   fbb.FinishSizePrefixed(log_file_header_builder.Finish());
   aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
       fbb.Release());