Use log source for log reading

Change-Id: I4d6018a6117f5864cda38a5e6485c6d08a782999
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 62c19ae..36bac42 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1376,8 +1376,8 @@
   return result;
 }
 
-MessageReader::MessageReader(std::string_view filename)
-    : span_reader_(filename),
+MessageReader::MessageReader(SpanReader span_reader)
+    : span_reader_(std::move(span_reader)),
       raw_log_file_header_(
           SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
   set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
@@ -1387,7 +1387,8 @@
       raw_log_file_header = ReadHeader(&span_reader_);
 
   // Make sure something was read.
-  CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
+  CHECK(raw_log_file_header)
+      << ": Failed to read header from: " << span_reader_.filename();
 
   raw_log_file_header_ = std::move(*raw_log_file_header);
 
@@ -1401,7 +1402,7 @@
                 chrono::duration<double>(FLAGS_max_out_of_order))
           : chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
 
-  VLOG(1) << "Opened " << filename << " as node "
+  VLOG(1) << "Opened " << span_reader_.filename() << " as node "
           << FlatbufferToJson(log_file_header()->node());
 }
 
@@ -1560,18 +1561,32 @@
                                                 &DestroyAndFree);
 }
 
-PartsMessageReader::PartsMessageReader(LogParts log_parts)
-    : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
-  if (parts_.parts.size() >= 2) {
-    next_message_reader_.emplace(parts_.parts[1]);
+SpanReader PartsMessageReader::MakeSpanReader(
+    const LogPartsAccess &log_parts_access, size_t part_number) {
+  const auto part = log_parts_access.GetPartAt(part_number);
+  if (log_parts_access.log_source().has_value()) {
+    return SpanReader(part,
+                      log_parts_access.log_source().value()->GetDecoder(part));
+  } else {
+    return SpanReader(part);
+  }
+}
+
+PartsMessageReader::PartsMessageReader(LogPartsAccess log_parts_access)
+    : log_parts_access_(std::move(log_parts_access)),
+      message_reader_(MakeSpanReader(log_parts_access_, 0)) {
+  if (log_parts_access_.size() >= 2) {
+    next_message_reader_.emplace(MakeSpanReader(log_parts_access_, 1));
   }
   ComputeBootCounts();
 }
 
 void PartsMessageReader::ComputeBootCounts() {
-  boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
+  boot_counts_.assign(configuration::NodesCount(log_parts_access_.config()),
                       std::nullopt);
 
+  const auto boots = log_parts_access_.parts().boots;
+
   // We have 3 vintages of log files with different amounts of information.
   if (log_file_header()->has_boot_uuids()) {
     // The new hotness with the boots explicitly listed out.  We can use the log
@@ -1580,10 +1595,10 @@
     size_t node_index = 0;
     for (const flatbuffers::String *boot_uuid :
          *log_file_header()->boot_uuids()) {
-      CHECK(parts_.boots);
+      CHECK(boots);
       if (boot_uuid->size() != 0) {
-        auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
-        if (it != parts_.boots->boot_count_map.end()) {
+        auto it = boots->boot_count_map.find(boot_uuid->str());
+        if (it != boots->boot_count_map.end()) {
           boot_counts_[node_index] = it->second;
         }
       } else if (parts().boots->boots[node_index].size() == 1u) {
@@ -1595,11 +1610,10 @@
     // Older multi-node logs which are guarenteed to have UUIDs logged, or
     // single node log files with boot UUIDs in the header.  We only know how to
     // order certain boots in certain circumstances.
-    if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
+    if (configuration::MultiNode(log_parts_access_.config()) || boots) {
       for (size_t node_index = 0; node_index < boot_counts_.size();
            ++node_index) {
-        CHECK(parts_.boots);
-        if (parts().boots->boots[node_index].size() == 1u) {
+        if (boots->boots[node_index].size() == 1u) {
           boot_counts_[node_index] = 0;
         }
       }
@@ -1624,17 +1638,18 @@
       // start time.
       // TODO(austin): Does this work with startup when we don't know the
       // remote start time too?  Look at one of those logs to compare.
-      if (monotonic_sent_time >
-          parts_.monotonic_start_time + max_out_of_order_duration()) {
+      if (monotonic_sent_time > log_parts_access_.parts().monotonic_start_time +
+                                    max_out_of_order_duration()) {
         after_start_ = true;
       }
       if (after_start_) {
         CHECK_GE(monotonic_sent_time,
                  newest_timestamp_ - max_out_of_order_duration())
             << ": Max out of order of " << max_out_of_order_duration().count()
-            << "ns exceeded. " << parts_ << ", start time is "
-            << parts_.monotonic_start_time << " currently reading "
-            << filename();
+            << "ns exceeded. " << log_parts_access_.parts()
+            << ", start time is "
+            << log_parts_access_.parts().monotonic_start_time
+            << " currently reading " << filename();
       }
       return message;
     }
@@ -1645,7 +1660,7 @@
 }
 
 void PartsMessageReader::NextLog() {
-  if (next_part_index_ == parts_.parts.size()) {
+  if (next_part_index_ == log_parts_access_.size()) {
     CHECK(!next_message_reader_);
     done_ = true;
     return;
@@ -1653,8 +1668,9 @@
   CHECK(next_message_reader_);
   message_reader_ = std::move(*next_message_reader_);
   ComputeBootCounts();
-  if (next_part_index_ + 1 < parts_.parts.size()) {
-    next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
+  if (next_part_index_ + 1 < log_parts_access_.size()) {
+    next_message_reader_.emplace(
+        MakeSpanReader(log_parts_access_, next_part_index_ + 1));
   } else {
     next_message_reader_.reset();
   }
@@ -1748,8 +1764,8 @@
   return os;
 }
 
-MessageSorter::MessageSorter(LogParts log_parts)
-    : parts_message_reader_(log_parts),
+MessageSorter::MessageSorter(const LogPartsAccess log_parts_access)
+    : parts_message_reader_(log_parts_access),
       source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
 }
 
@@ -1850,7 +1866,7 @@
   const auto parts = log_files.SelectParts(node_name, boot_count);
   node_ = configuration::GetNodeIndex(parts.config(), node_name);
 
-  for (LogParts part : parts) {
+  for (LogPartsAccess part : parts) {
     message_sorters_.emplace_back(std::move(part));
   }