Work around duplicate headers at the start of logs

Somehow, we have managed to log duplicate headers at the beginning of
some of our logs.
log_11_data/11/aos/aos.message_bridge.Timestamp.part0.bfbs.xz and
friends seem to be where we are finding these issues.  Hide this
behavior behind a flag.

We urgently need a follow-up commit which fixes the log writer so it
doesn't write duplicate headers.  Unfortuantely, we have valuable data
logged with duplicate headers in it so we need this workaround.

Change-Id: I905023fc647547fda5004b303f1a6988ec311a93
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index c2378ab..edd0ac7 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -6,6 +6,7 @@
 #include <string_view>
 #include <vector>
 
+#include "absl/strings/escaping.h"
 #include "aos/configuration.h"
 #include "aos/events/logging/log_reader.h"
 #include "aos/events/simulated_event_loop.h"
@@ -91,20 +92,61 @@
     if (argc != 2) {
       LOG(FATAL) << "Expected 1 logfile as an argument.";
     }
-    aos::logger::MessageReader reader(argv[1]);
+    aos::logger::SpanReader reader(argv[1]);
+    absl::Span<const uint8_t> raw_log_file_header_span = reader.ReadMessage();
 
+    if (raw_log_file_header_span == absl::Span<const uint8_t>()) {
+      LOG(WARNING) << "Empty log file on " << reader.filename();
+      return 0;
+    }
+
+    // Now, reproduce the log file header deduplication logic inline so we can
+    // print out all the headers we find.
+    aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>
+        log_file_header(raw_log_file_header_span);
+    if (!log_file_header.Verify()) {
+      LOG(ERROR) << "Header corrupted on " << reader.filename();
+      return 1;
+    }
+    while (true) {
+      absl::Span<const uint8_t> maybe_header_data = reader.PeekMessage();
+      if (maybe_header_data == absl::Span<const uint8_t>()) {
+        break;
+      }
+
+      aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
+          maybe_header_data);
+      if (maybe_header.Verify()) {
+        std::cout << aos::FlatbufferToJson(
+                         log_file_header,
+                         {.multi_line = FLAGS_pretty,
+                          .max_vector_size =
+                              static_cast<size_t>(FLAGS_max_vector_size)})
+                  << std::endl;
+        LOG(WARNING) << "Found duplicate LogFileHeader in "
+                     << reader.filename();
+        log_file_header =
+            aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>(
+                maybe_header_data);
+
+        reader.ConsumeMessage();
+      } else {
+        break;
+      }
+    }
+
+    // And now use the final sha256 to match the raw_header.
     std::optional<aos::logger::MessageReader> raw_header_reader;
-    const aos::logger::LogFileHeader *full_header = reader.log_file_header();
+    const aos::logger::LogFileHeader *full_header = &log_file_header.message();
     if (!FLAGS_raw_header.empty()) {
       raw_header_reader.emplace(FLAGS_raw_header);
       std::cout << aos::FlatbufferToJson(
-                       reader.log_file_header(),
-                       {.multi_line = FLAGS_pretty,
-                        .max_vector_size =
-                            static_cast<size_t>(FLAGS_max_vector_size)})
+                       full_header, {.multi_line = FLAGS_pretty,
+                                     .max_vector_size = static_cast<size_t>(
+                                         FLAGS_max_vector_size)})
                 << std::endl;
       CHECK_EQ(
-          reader.log_file_header()->configuration_sha256()->string_view(),
+          full_header->configuration_sha256()->string_view(),
           aos::logger::Sha256(raw_header_reader->raw_log_file_header().span()));
       full_header = raw_header_reader->log_file_header();
     }
@@ -115,46 +157,45 @@
               << std::endl;
 
     while (true) {
-      std::optional<
-          aos::SizePrefixedFlatbufferVector<aos::logger::MessageHeader>>
-          message = reader.ReadMessage();
-      if (!message) {
+      const aos::SizePrefixedFlatbufferSpan<aos::logger::MessageHeader> message(
+          reader.ReadMessage());
+      if (message.span() == absl::Span<const uint8_t>()) {
         break;
       }
       const auto *const channels = full_header->configuration()->channels();
-      const size_t channel_index = message.value().message().channel_index();
+      const size_t channel_index = message.message().channel_index();
       CHECK_LT(channel_index, channels->size());
       const aos::Channel *const channel = channels->Get(channel_index);
 
-      if (message.value().message().data() != nullptr) {
+      CHECK(message.Verify()) << absl::BytesToHexString(std::string_view(
+          reinterpret_cast<const char *>(message.span().data()),
+          message.span().size()));
+
+      if (message.message().data() != nullptr) {
         CHECK(channel->has_schema());
 
-        CHECK(flatbuffers::Verify(*channel->schema(),
-                                  *channel->schema()->root_table(),
-                                  message.value().message().data()->data(),
-                                  message.value().message().data()->size()))
+        CHECK(flatbuffers::Verify(
+            *channel->schema(), *channel->schema()->root_table(),
+            message.message().data()->data(), message.message().data()->size()))
             << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
             << channel->type()->c_str();
       }
 
-      if (FLAGS_format_raw && message.value().message().data() != nullptr) {
+      if (FLAGS_format_raw && message.message().data() != nullptr) {
         std::cout << aos::configuration::StrippedChannelToString(channel) << " "
-                  << aos::FlatbufferToJson(
-                         message.value(),
-                         {.multi_line = FLAGS_pretty, .max_vector_size = 4})
+                  << aos::FlatbufferToJson(message, {.multi_line = FLAGS_pretty,
+                                                     .max_vector_size = 4})
                   << ": "
                   << aos::FlatbufferToJson(
-                         channel->schema(),
-                         message.value().message().data()->data(),
+                         channel->schema(), message.message().data()->data(),
                          {FLAGS_pretty,
                           static_cast<size_t>(FLAGS_max_vector_size)})
                   << std::endl;
       } else {
         std::cout << aos::configuration::StrippedChannelToString(channel) << " "
                   << aos::FlatbufferToJson(
-                         message.value(),
-                         {FLAGS_pretty,
-                          static_cast<size_t>(FLAGS_max_vector_size)})
+                         message, {FLAGS_pretty,
+                                   static_cast<size_t>(FLAGS_max_vector_size)})
                   << std::endl;
       }
     }
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 5f0e372..cb22130 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -38,6 +38,10 @@
     max_out_of_order, -1,
     "If set, this overrides the max out of order duration for a log file.");
 
+DEFINE_bool(workaround_double_headers, true,
+            "Some old log files have two headers at the beginning.  Use the "
+            "last header as the actual header.");
+
 namespace aos::logger {
 
 namespace chrono = std::chrono;
@@ -401,9 +405,8 @@
 }
 
 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
-    std::string_view filename) {
-  SpanReader span_reader(filename);
-  absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
+    SpanReader *span_reader) {
+  absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
 
   // Make sure something was read.
   if (config_data == absl::Span<const uint8_t>()) {
@@ -415,9 +418,41 @@
   if (!result.Verify()) {
     return std::nullopt;
   }
+
+  if (FLAGS_workaround_double_headers) {
+    while (true) {
+      absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
+      if (maybe_header_data == absl::Span<const uint8_t>()) {
+        break;
+      }
+
+      aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
+          maybe_header_data);
+      if (maybe_header.Verify()) {
+        LOG(WARNING) << "Found duplicate LogFileHeader in "
+                     << span_reader->filename();
+        ResizeableBuffer header_data_copy;
+        header_data_copy.resize(maybe_header_data.size());
+        memcpy(header_data_copy.data(), maybe_header_data.begin(),
+               header_data_copy.size());
+        result = SizePrefixedFlatbufferVector<LogFileHeader>(
+            std::move(header_data_copy));
+
+        span_reader->ConsumeMessage();
+      } else {
+        break;
+      }
+    }
+  }
   return result;
 }
 
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
+    std::string_view filename) {
+  SpanReader span_reader(filename);
+  return ReadHeader(&span_reader);
+}
+
 std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
     std::string_view filename, size_t n) {
   SpanReader span_reader(filename);
@@ -443,19 +478,13 @@
     : span_reader_(filename),
       raw_log_file_header_(
           SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
-  // Make sure we have enough to read the size.
-  absl::Span<const uint8_t> header_data = span_reader_.ReadMessage();
+  std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
+      raw_log_file_header = ReadHeader(&span_reader_);
 
   // Make sure something was read.
-  CHECK(header_data != absl::Span<const uint8_t>())
-      << ": Failed to read header from: " << filename;
+  CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
 
-  // And copy the header data so we have it forever.
-  ResizeableBuffer header_data_copy;
-  header_data_copy.resize(header_data.size());
-  memcpy(header_data_copy.data(), header_data.begin(), header_data_copy.size());
-  raw_log_file_header_ =
-      SizePrefixedFlatbufferVector<LogFileHeader>(std::move(header_data_copy));
+  raw_log_file_header_ = std::move(*raw_log_file_header);
 
   max_out_of_order_duration_ =
       FLAGS_max_out_of_order > 0
@@ -476,6 +505,8 @@
 
   SizePrefixedFlatbufferVector<MessageHeader> result(msg_data);
 
+  CHECK(result.Verify()) << ": Corrupted message from " << filename();
+
   const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
       chrono::nanoseconds(result.message().monotonic_sent_time()));
 
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index ee13a7c..f876d25 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -199,8 +199,12 @@
     flatbuffers::FlatBufferBuilder *fbb, const Context &context,
     int channel_index, LogType log_type);
 
+// Reads the last header from a log file.  This handles any duplicate headers
+// that were written.
 std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
     std::string_view filename);
+// Reads the Nth message from a log file, excluding the header.  Note: this
+// doesn't handle duplicate headers.
 std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
     std::string_view filename, size_t n);