Sort parts by UUID and part_index

Also update log_cat to support this!  This makes it significantly more
memory efficient to read logs with lots of parts.

Change-Id: I5ce70f9342b3ab1c7a7823a878ebd890c00ce04f
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 4ef9237..f406d24 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -122,12 +122,14 @@
     LOG(FATAL) << "Expected at least 1 logfile as an argument.";
   }
 
-  std::vector<std::vector<std::string>> logfiles;
-
+  std::vector<std::string> unsorted_logfiles;
   for (int i = 1; i < argc; ++i) {
-    logfiles.emplace_back(std::vector<std::string>{std::string(argv[i])});
+    unsorted_logfiles.emplace_back(std::string(argv[i]));
   }
 
+  std::vector<std::vector<std::string>> logfiles =
+      aos::logger::SortParts(unsorted_logfiles);
+
   aos::logger::LogReader reader(logfiles);
 
   aos::FastStringBuilder builder;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 214fa2b..938ede7 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -278,19 +278,36 @@
 
 FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename) {
   SpanReader span_reader(filename);
-  // Make sure we have enough to read the size.
   absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
 
   // Make sure something was read.
   CHECK(config_data != absl::Span<const uint8_t>())
       << ": Failed to read header from: " << filename;
 
-  // And copy the config so we have it forever.
+  // And copy the config so we have it forever, removing the size prefix.
   std::vector<uint8_t> data(
       config_data.begin() + sizeof(flatbuffers::uoffset_t), config_data.end());
   return FlatbufferVector<LogFileHeader>(std::move(data));
 }
 
+FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
+                                               size_t n) {
+  SpanReader span_reader(filename);
+  absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
+  for (size_t i = 0; i < n + 1; ++i) {
+    data_span = span_reader.ReadMessage();
+
+    // Make sure something was read.
+    CHECK(data_span != absl::Span<const uint8_t>())
+        << ": Failed to read data from: " << filename;
+  }
+
+  // And copy the data so we have it forever.
+  std::vector<uint8_t> data(data_span.begin() + sizeof(flatbuffers::uoffset_t),
+                            data_span.end());
+  return FlatbufferVector<MessageHeader>(std::move(data));
+}
+
 MessageReader::MessageReader(std::string_view filename)
     : span_reader_(filename),
       raw_log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index d1fbeb2..87ea229 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -96,6 +96,8 @@
     int channel_index, LogType log_type);
 
 FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename);
+FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
+                                               size_t n);
 
 // Class to read chunks out of a log file.
 class SpanReader {
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 85809b1..637d1ae 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -568,6 +568,99 @@
   } while (last_synchronized_time_ + polling_period_ < monotonic_now);
 }
 
+std::vector<std::vector<std::string>> SortParts(
+    const std::vector<std::string> &parts) {
+  // Start by grouping all parts by UUID, and extracting the part index.
+  std::map<std::string, std::vector<std::pair<std::string, int>>> parts_list;
+
+  // Sort part files without UUIDs and part indexes as well.  Extract everything
+  // useful from the log in the first pass, then sort later.
+  struct LogPart {
+    std::string filename;
+    monotonic_clock::time_point start_time;
+    monotonic_clock::time_point first_message_time;
+  };
+
+  std::vector<LogPart> old_parts;
+
+  for (const std::string &part : parts) {
+    FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
+
+    // Looks like an old log.  No UUID, index, and also single node.  We have
+    // little to no multi-node log files in the wild without part UUIDs and
+    // indexes which we care much about.
+    if (!log_header.message().has_parts_uuid() &&
+        !log_header.message().has_parts_index() &&
+        !log_header.message().has_node()) {
+      LogPart log_part;
+      log_part.filename = part;
+      log_part.start_time = monotonic_clock::time_point(
+          chrono::nanoseconds(log_header.message().monotonic_start_time()));
+      FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
+      log_part.first_message_time = monotonic_clock::time_point(
+          chrono::nanoseconds(first_message.message().monotonic_sent_time()));
+      old_parts.emplace_back(std::move(log_part));
+      continue;
+    }
+
+    CHECK(log_header.message().has_parts_uuid());
+    CHECK(log_header.message().has_parts_index());
+
+    const std::string parts_uuid = log_header.message().parts_uuid()->str();
+    auto it = parts_list.find(parts_uuid);
+    if (it == parts_list.end()) {
+      it = parts_list
+               .insert(std::make_pair(
+                   parts_uuid, std::vector<std::pair<std::string, int>>{}))
+               .first;
+    }
+    it->second.emplace_back(
+        std::make_pair(part, log_header.message().parts_index()));
+  }
+
+  CHECK_NE(old_parts.empty(), parts_list.empty())
+      << ": Can't have a mix of old and new parts.";
+
+  if (!old_parts.empty()) {
+    // Confirm they all have the same start time.  Old loggers always used the
+    // same start time.
+    for (const LogPart &p : old_parts) {
+      CHECK_EQ(old_parts[0].start_time, p.start_time);
+    }
+    // Sort by the oldest message in each file.
+    std::sort(old_parts.begin(), old_parts.end(),
+              [](const LogPart &a, const LogPart &b) {
+                return a.first_message_time < b.first_message_time;
+              });
+
+    // Produce the final form.
+    std::vector<std::string> sorted_old_parts;
+    sorted_old_parts.reserve(old_parts.size());
+    for (LogPart &p : old_parts) {
+      sorted_old_parts.emplace_back(std::move(p.filename));
+    }
+    return std::vector<std::vector<std::string>>{std::move(sorted_old_parts)};
+  }
+
+  // Now, sort them and produce the final vector form.
+  std::vector<std::vector<std::string>> result;
+  result.reserve(parts_list.size());
+  for (auto &part : parts_list) {
+    std::sort(part.second.begin(), part.second.end(),
+              [](const std::pair<std::string, int> &a,
+                 const std::pair<std::string, int> &b) {
+                return a.second < b.second;
+              });
+    std::vector<std::string> result_line;
+    result_line.reserve(part.second.size());
+    for (std::pair<std::string, int> &p : part.second) {
+      result_line.emplace_back(std::move(p.first));
+    }
+    result.emplace_back(std::move(result_line));
+  }
+  return result;
+}
+
 LogReader::LogReader(std::string_view filename,
                      const Configuration *replay_configuration)
     : LogReader(std::vector<std::string>{std::string(filename)},
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index d4ed786..c4ce769 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -375,6 +375,10 @@
   std::vector<NodeState> node_state_;
 };
 
+// Takes a bunch of parts and sorts them based on part_uuid and part_index.
+std::vector<std::vector<std::string>> SortParts(
+    const std::vector<std::string> &parts);
+
 // We end up with one of the following 3 log file types.
 //
 // Single node logged as the source node.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index bf062b8..47fe9e2 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -958,6 +958,31 @@
   reader.Deregister();
 }
 
+// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
+TEST_F(MultinodeLoggerTest, SortParts) {
+  // Make a bunch of parts.
+  {
+    LoggerState pi1_logger = MakeLogger(pi1_);
+    LoggerState pi2_logger = MakeLogger(pi2_);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+    StartLogger(&pi1_logger);
+    StartLogger(&pi2_logger);
+
+    event_loop_factory_.RunFor(chrono::milliseconds(2000));
+  }
+
+  const std::vector<std::vector<std::string>> sorted_parts =
+      SortParts(logfiles_);
+
+  // Test that each list of parts is in order.  Don't worry about the ordering
+  // between part file lists though.
+  // (inner vectors all need to be in order, but outer one doesn't matter).
+  EXPECT_THAT(sorted_parts,
+              ::testing::UnorderedElementsAreArray(structured_logfiles_));
+}
+
 // TODO(austin): We can write a test which recreates a logfile and confirms that
 // we get it back.  That is the ultimate test.