Populate boot_count for existing logs

This sets us up to figure out which boot comes when so we can sort all
the parts per boot, and then order the boots correctly.

Given that we might get multiple logs from multiple boots from multiple
nodes, we need to be a bit careful how we construct the problem.  The
only way we know how to order a boot from only the data in the header is
to look at remote data from a node that didn't reboot and look at how
the parts_index increases.  Those observations can then be extracted as
constraints and used to generate and sort a DAG, and to check the
connectivity of the DAG to make sure there is 1 valid ordering.

Change-Id: Ic6fadc8538f66c60f3142fcc22d1a91d7822cd1f
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index acb2219..65e8aba 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -157,6 +157,16 @@
   std::string name;
 };
 
+struct NodeBootState {
+  // Maps each boot to its constraining boots. If the bool in the value is true,
+  // then the boot uuid in the key chronologically precedes the boot uuid in
+  // the value, and vice versa if false.
+  std::map<std::string, std::vector<std::pair<std::string, bool>>> constraints;
+
+  // All the boots we know about.
+  std::set<std::string> boots;
+};
+
 // Helper class to make it easier to sort a list of log files into
 // std::vector<LogFile>
 struct PartsSorter {
@@ -178,6 +188,9 @@
   // Populates the class's datastructures from the input file list.
   void PopulateFromFiles(const std::vector<std::string> &parts);
 
+  // Wrangle parts_list into a map of boot uuids -> boot counts.
+  std::map<std::string, int> ComputeBootCounts();
+
   // Reformats old_parts into a list of logfiles and returns it.  This destroys
   // state in PartsSorter.
   std::vector<LogFile> FormatOldParts();
@@ -435,7 +448,268 @@
   return result;
 }
 
+std::map<std::string, int> PartsSorter::ComputeBootCounts() {
+  std::map<std::string, NodeBootState> boot_constraints;
+
+  // TODO(austin): This is the "old" way.  Once we have better ordering info in
+  // headers, we should use it.
+  for (std::pair<const std::string, UnsortedLogPartsMap> &logs : parts_list) {
+    std::map<std::string,
+             std::vector<std::pair<std::string, std::pair<int, int>>>>
+        node_boot_parts_index_ranges;
+
+    for (std::pair<const std::string, UnsortedLogParts> &parts :
+         logs.second.unsorted_parts) {
+      CHECK_GT(parts.second.parts.size(), 0u);
+
+      // Track that this boot exists so we know the overall set of boots we need
+      // to link.
+      {
+        auto node_boot_constraints_it =
+            boot_constraints.find(parts.second.node);
+        if (node_boot_constraints_it == boot_constraints.end()) {
+          node_boot_constraints_it =
+              boot_constraints
+                  .insert(std::make_pair(parts.second.node, NodeBootState()))
+                  .first;
+        }
+        node_boot_constraints_it->second.boots.insert(
+            parts.second.source_boot_uuid);
+      }
+
+      // Since the logger node is guarenteed to be on a single boot, we can look
+      // for reboots on the remote nodes and use the parts_index to track when
+      // they reboot.
+      //
+      // All the parts files rotate at the same time, so the parts_index will
+      // match.  We can track the min and max parts_index for a
+      // source_node_boot_uuid and use that to order the boot uuids.
+      auto it = node_boot_parts_index_ranges.find(parts.second.node);
+      if (it == node_boot_parts_index_ranges.end()) {
+        it =
+            node_boot_parts_index_ranges
+                .insert(std::make_pair(
+                    parts.second.node,
+                    std::vector<std::pair<std::string, std::pair<int, int>>>()))
+                .first;
+      }
+      std::vector<std::pair<std::string, std::pair<int, int>>>
+          &boot_parts_index_ranges = it->second;
+
+      // Find any existing ranges to extend, or make a new one otherwise.
+      auto parts_index_ranges_it = std::find_if(
+          boot_parts_index_ranges.begin(), boot_parts_index_ranges.end(),
+          [&](const std::pair<std::string, std::pair<int, int>> &val) {
+            return val.first == parts.second.source_boot_uuid;
+          });
+      if (parts_index_ranges_it == boot_parts_index_ranges.end()) {
+        boot_parts_index_ranges.emplace_back(
+            std::make_pair(parts.second.source_boot_uuid,
+                           std::make_pair(std::numeric_limits<int>::max(),
+                                          std::numeric_limits<int>::min())));
+        parts_index_ranges_it = boot_parts_index_ranges.end() - 1;
+      }
+
+      std::pair<int, int> &ranges = parts_index_ranges_it->second;
+
+      for (const std::pair<std::string, int> &part : parts.second.parts) {
+        ranges.first = std::min(ranges.first, part.second);
+        ranges.second = std::max(ranges.second, part.second);
+      }
+    }
+
+    // Now that we have ranges per node per boot, iterate through those and
+    // populate boot_constraints.
+    for (auto &boot_parts_index_ranges : node_boot_parts_index_ranges) {
+      auto it = boot_constraints.find(boot_parts_index_ranges.first);
+      if (it == boot_constraints.end()) {
+        it = boot_constraints
+                 .insert(std::make_pair(boot_parts_index_ranges.first,
+                                        NodeBootState()))
+                 .first;
+      }
+
+      // Sort the list by parts_index, look for overlaps, then insert
+      // everything as constraints.
+      std::sort(boot_parts_index_ranges.second.begin(),
+                boot_parts_index_ranges.second.end(), [](auto a, auto b) {
+                  return a.second.first < b.second.second;
+                });
+
+      std::map<std::string, std::vector<std::pair<std::string, bool>>>
+          &per_node_boot_constraints = it->second.constraints;
+
+      if (boot_parts_index_ranges.second.size() < 2) {
+        VLOG(1) << "Found only one boot for this node in this log.";
+        continue;
+      }
+
+      for (size_t i = 1; i < boot_parts_index_ranges.second.size(); ++i) {
+        CHECK_LT(boot_parts_index_ranges.second[i - 1].second.second,
+                 boot_parts_index_ranges.second[i].second.first)
+            << ": Overlapping parts_index, please investigate";
+        auto first_per_boot_constraints = per_node_boot_constraints.find(
+            boot_parts_index_ranges.second[i - 1].first);
+        if (first_per_boot_constraints == per_node_boot_constraints.end()) {
+          first_per_boot_constraints =
+              per_node_boot_constraints
+                  .insert(std::make_pair(
+                      boot_parts_index_ranges.second[i - 1].first,
+                      std::vector<std::pair<std::string, bool>>()))
+                  .first;
+        }
+
+        auto second_per_boot_constraints = per_node_boot_constraints.find(
+            boot_parts_index_ranges.second[i].first);
+        if (second_per_boot_constraints == per_node_boot_constraints.end()) {
+          second_per_boot_constraints =
+              per_node_boot_constraints
+                  .insert(std::make_pair(
+                      boot_parts_index_ranges.second[i].first,
+                      std::vector<std::pair<std::string, bool>>()))
+                  .first;
+        }
+
+        first_per_boot_constraints->second.emplace_back(
+            std::make_pair(boot_parts_index_ranges.second[i].first, true));
+        second_per_boot_constraints->second.emplace_back(
+            std::make_pair(boot_parts_index_ranges.second[i - 1].first, false));
+      }
+    }
+  }
+
+  // Print out our discovered constraints on request.
+  if (VLOG_IS_ON(2)) {
+    for (const std::pair<const std::string, NodeBootState> &node_state :
+         boot_constraints) {
+      LOG(INFO) << "Node " << node_state.first;
+      CHECK_GT(node_state.second.boots.size(), 0u)
+          << ": Need a boot from each node.";
+
+      for (const std::string &boot : node_state.second.boots) {
+        LOG(INFO) << "  boot " << boot;
+      }
+      for (const std::pair<std::string,
+                           std::vector<std::pair<std::string, bool>>>
+               &constraints : node_state.second.constraints) {
+        for (const std::pair<std::string, bool> &constraint :
+             constraints.second) {
+          if (constraint.second) {
+            LOG(INFO) << constraints.first << " < " << constraint.first;
+          } else {
+            LOG(INFO) << constraints.first << " > " << constraint.first;
+          }
+        }
+      }
+    }
+  }
+
+  // And now walk the constraint graph we have generated to order the boots.
+  // This doesn't need to catch all the cases, it just needs to report when it
+  // fails.
+  std::map<std::string, int> boot_count_map;
+
+  for (std::pair<const std::string, NodeBootState> &node_state :
+       boot_constraints) {
+    CHECK_GT(node_state.second.boots.size(), 0u)
+        << ": Need a boot from each node.";
+    if (node_state.second.boots.size() == 1u) {
+      boot_count_map.insert(
+          std::make_pair(*node_state.second.boots.begin(), 0));
+      continue;
+    }
+
+    // Now, walk the dag to find the earliest boot.  Pick a node and start
+    // traversing.
+    std::string current_boot = *node_state.second.boots.begin();
+    size_t update_count = 0;
+
+    while (true) {
+      auto it = node_state.second.constraints.find(current_boot);
+      if (it == node_state.second.constraints.end()) {
+        LOG(WARNING) << "Unconnected boot in set > 1";
+        break;
+      }
+
+      bool updated = false;
+      for (const std::pair<std::string, bool> &constraint : it->second) {
+        if (!constraint.second) {
+          updated = true;
+          current_boot = constraint.first;
+          break;
+        }
+      }
+      if (!updated) {
+        break;
+      }
+
+      ++update_count;
+
+      CHECK_LE(update_count, node_state.second.boots.size())
+          << ": Found a cyclic boot graph, giving up.";
+    }
+
+    // Now, walk the tree the other way to actually sort the boots.
+    // TODO(austin): We can probably drop sorted_boots and directly insert into
+    // the map with some more careful book-keeping.
+    std::vector<std::string> sorted_boots;
+    sorted_boots.emplace_back(current_boot);
+
+    update_count = 0;
+    while (true) {
+      auto it = node_state.second.constraints.find(current_boot);
+      if (it == node_state.second.constraints.end()) {
+        LOG(WARNING) << "Unconnected boot in set > 1";
+        break;
+      }
+
+      // This is a bit simplistic.  If you have a dag which isn't just a list,
+      // we aren't guarenteed to walk the tree correctly.
+      //
+      //  a < b
+      //  a < c
+      //  b < c
+      //
+      // That is rare enough in practice that we can CHECK and fix it if someone
+      // produces a valid use case.
+      bool updated = false;
+      for (const std::pair<std::string, bool> &constraint : it->second) {
+        if (constraint.second) {
+          updated = true;
+          current_boot = constraint.first;
+          break;
+        }
+      }
+      if (!updated) {
+        break;
+      }
+
+      sorted_boots.emplace_back(current_boot);
+
+      ++update_count;
+
+      CHECK_LE(update_count, node_state.second.boots.size())
+          << ": Found a cyclic boot graph, giving up.";
+    }
+
+    CHECK_EQ(sorted_boots.size(), node_state.second.boots.size())
+        << ": Graph failed to reach all the nodes.";
+
+    VLOG(1) << "Node " << node_state.first;
+    size_t boot_count = 0;
+    for (const std::string &boot : sorted_boots) {
+      VLOG(1) << "  Boot " << boot;
+      boot_count_map.insert(std::make_pair(std::move(boot), boot_count));
+      ++boot_count;
+    }
+  }
+
+  return boot_count_map;
+}
+
 std::vector<LogFile> PartsSorter::FormatNewParts() {
+  const std::map<std::string, int> boot_counts = ComputeBootCounts();
+
   std::map<std::string, std::shared_ptr<const Configuration>>
       copied_config_sha256;
   // Now, sort them and produce the final vector form.
@@ -466,6 +740,12 @@
       new_parts.parts_uuid = parts.first;
       new_parts.node = std::move(parts.second.node);
 
+      {
+        auto boot_count_it = boot_counts.find(new_parts.source_boot_uuid);
+        CHECK(boot_count_it != boot_counts.end());
+        new_parts.boot_count = boot_count_it->second;
+      }
+
       std::sort(parts.second.parts.begin(), parts.second.parts.end(),
                 [](const std::pair<std::string, int> &a,
                    const std::pair<std::string, int> &b) {
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 71196c8..03e8970 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -54,7 +54,9 @@
 // Datastructure to hold parts from the same run of the logger which have no
 // ordering constraints relative to each other.
 struct LogFile {
-  // The UUID tying them all together (if available)
+  // The UUID tying them all together (if available).  This is set to a random
+  // uuid everytime a set of log files is started, regardless of if the logger
+  // starts and stops.
   std::string log_event_uuid;
 
   // The node the logger was running on (if available)
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 384772c..cdbc332 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -1675,6 +1675,77 @@
   }
 }
 
+// This tests that we can properly sort a multi-node log file which has the old
+// (and buggy) timestamps in the header, and the non-resetting parts_index.
+// These make it so we can just bairly figure out what happened first and what
+// happened second, but not in a way that is robust to multiple nodes rebooting.
+TEST_F(SortingElementTest, OldReboot) {
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0 =
+      MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi2"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "1a9e5ca2-31b2-475b-8282-88f6d1ce5109",
+  "parts_index": 0,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
+})");
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1 =
+      MakeHeader(config_, R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi2"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 1000000,
+  "realtime_start_time": 1000000000000,
+  "logger_monotonic_start_time": 1000000,
+  "logger_realtime_start_time": 1000000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
+  "parts_index": 1,
+  "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+  "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+  "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
+})");
+
+  {
+    DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(boot0.span());
+  }
+  {
+    DetachedBufferWriter writer(logfile1_, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(boot1.span());
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+  ASSERT_EQ(parts.size(), 1u);
+  ASSERT_EQ(parts[0].parts.size(), 2u);
+
+  EXPECT_EQ(parts[0].parts[0].boot_count, 0);
+  EXPECT_EQ(parts[0].parts[0].source_boot_uuid,
+            boot0.message().source_node_boot_uuid()->string_view());
+
+  EXPECT_EQ(parts[0].parts[1].boot_count, 1);
+  EXPECT_EQ(parts[0].parts[1].source_boot_uuid,
+            boot1.message().source_node_boot_uuid()->string_view());
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos