Improve messaging when ReadMessages detects a corrupted message.

Added tests for various control flag behaviors.

Change-Id: I1ea3ebae7c1e15e0bc5ce5586ac7752bde20cb80
Signed-off-by: Brian Smartt <brian.smartt@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 59b106e..ecdf5d6 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -50,6 +50,17 @@
             "Some old log files have two headers at the beginning.  Use the "
             "last header as the actual header.");
 
+DEFINE_bool(crash_on_corrupt_message, true,
+            "When true, MessageReader will crash the first time a message "
+            "with corrupted format is found. When false, the crash will be "
+            "suppressed, and any remaining readable messages will be "
+            "evaluated to present verified vs corrupted stats.");
+
+DEFINE_bool(ignore_corrupt_messages, false,
+            "When true, and crash_on_corrupt_message is false, then any "
+            "corrupt message found by MessageReader be silently ignored, "
+            "providing access to all uncorrupted messages in a logfile.");
+
 namespace aos::logger {
 namespace {
 
@@ -389,15 +400,19 @@
 }
 
 void SpanReader::ConsumeMessage() {
-  consumed_data_ +=
+  size_t consumed_size =
       flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
       sizeof(flatbuffers::uoffset_t);
+  consumed_data_ += consumed_size;
+  total_consumed_ += consumed_size;
 }
 
 absl::Span<const uint8_t> SpanReader::ReadMessage() {
   absl::Span<const uint8_t> result = PeekMessage();
   if (result != absl::Span<const uint8_t>()) {
     ConsumeMessage();
+  } else {
+    is_finished_ = true;
   }
   return result;
 }
@@ -427,6 +442,8 @@
     return false;
   }
 
+  total_read_ += count;
+
   return true;
 }
 
@@ -504,6 +521,9 @@
     : span_reader_(filename),
       raw_log_file_header_(
           SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
+  set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
+  set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
+
   std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
       raw_log_file_header = ReadHeader(&span_reader_);
 
@@ -514,6 +534,8 @@
 
   CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
 
+  total_verified_before_ = span_reader_.TotalConsumed();
+
   max_out_of_order_duration_ =
       FLAGS_max_out_of_order > 0
           ? chrono::duration_cast<chrono::nanoseconds>(
@@ -527,11 +549,84 @@
 std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
   absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
   if (msg_data == absl::Span<const uint8_t>()) {
+    if (is_corrupted()) {
+      LOG(ERROR) << "Total corrupted volumes: before = "
+                 << total_verified_before_
+                 << " | corrupted = " << total_corrupted_
+                 << " | during = " << total_verified_during_
+                 << " | after = " << total_verified_after_ << std::endl;
+    }
+
+    if (span_reader_.IsIncomplete()) {
+      LOG(ERROR) << "Unable to access some messages in " << filename()
+                 << " : " << span_reader_.TotalRead() << " bytes read, "
+                 << span_reader_.TotalConsumed() << " bytes usable."
+                 << std::endl;
+    }
     return nullptr;
   }
 
   SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
-  CHECK(msg.Verify()) << ": Corrupted message from " << filename();
+
+  if (crash_on_corrupt_message_flag_) {
+    CHECK(msg.Verify()) << "Corrupted message at offset "
+                        << total_verified_before_
+                        << " found within " << filename()
+                        << "; set --nocrash_on_corrupt_message to see summary;"
+                        << " also set --ignore_corrupt_messages to process"
+                        << " anyway";
+
+  } else if (!msg.Verify()) {
+    LOG(ERROR) << "Corrupted message at offset "
+               << total_verified_before_
+               << " from " << filename() << std::endl;
+
+    total_corrupted_ += msg_data.size();
+
+    while (true) {
+      absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
+
+      if (msg_data == absl::Span<const uint8_t>()) {
+        if (!ignore_corrupt_messages_flag_) {
+          LOG(ERROR) << "Total corrupted volumes: before = "
+                     << total_verified_before_
+                     << " | corrupted = " << total_corrupted_
+                     << " | during = " << total_verified_during_
+                     << " | after = " << total_verified_after_ << std::endl;
+
+          if (span_reader_.IsIncomplete()) {
+            LOG(ERROR) << "Unable to access some messages in " << filename()
+                       << " : " << span_reader_.TotalRead() << " bytes read, "
+                       << span_reader_.TotalConsumed() << " bytes usable."
+                       << std::endl;
+          }
+          return nullptr;
+        }
+        break;
+      }
+
+      SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
+
+      if (!next_msg.Verify()) {
+        total_corrupted_ += msg_data.size();
+        total_verified_during_ += total_verified_after_;
+        total_verified_after_ = 0;
+
+      } else {
+        total_verified_after_ += msg_data.size();
+        if (ignore_corrupt_messages_flag_) {
+          msg = next_msg;
+          break;
+        }
+      }
+    }
+  }
+
+  if (is_corrupted()) {
+    total_verified_after_ += msg_data.size();
+  } else {
+    total_verified_before_ += msg_data.size();
+  }
 
   auto result = UnpackedMessageHeader::MakeMessage(msg.message());