Improve messaging when ReadMessages detects a corrupted message.

Added tests for various control flag behaviors.

Change-Id: I1ea3ebae7c1e15e0bc5ce5586ac7752bde20cb80
Signed-off-by: Brian Smartt <brian.smartt@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 59b106e..ecdf5d6 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -50,6 +50,17 @@
             "Some old log files have two headers at the beginning.  Use the "
             "last header as the actual header.");
 
+DEFINE_bool(crash_on_corrupt_message, true,
+            "When true, MessageReader will crash the first time a message "
+            "with corrupted format is found. When false, the crash will be "
+            "suppressed, and any remaining readable messages will be "
+            "evaluated to present verified vs corrupted stats.");
+
+DEFINE_bool(ignore_corrupt_messages, false,
+            "When true, and crash_on_corrupt_message is false, then any "
+            "corrupt message found by MessageReader be silently ignored, "
+            "providing access to all uncorrupted messages in a logfile.");
+
 namespace aos::logger {
 namespace {
 
@@ -389,15 +400,19 @@
 }
 
 void SpanReader::ConsumeMessage() {
-  consumed_data_ +=
+  size_t consumed_size =
       flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
       sizeof(flatbuffers::uoffset_t);
+  consumed_data_ += consumed_size;
+  total_consumed_ += consumed_size;
 }
 
 absl::Span<const uint8_t> SpanReader::ReadMessage() {
   absl::Span<const uint8_t> result = PeekMessage();
   if (result != absl::Span<const uint8_t>()) {
     ConsumeMessage();
+  } else {
+    is_finished_ = true;
   }
   return result;
 }
@@ -427,6 +442,8 @@
     return false;
   }
 
+  total_read_ += count;
+
   return true;
 }
 
@@ -504,6 +521,9 @@
     : span_reader_(filename),
       raw_log_file_header_(
           SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
+  set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
+  set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
+
   std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
       raw_log_file_header = ReadHeader(&span_reader_);
 
@@ -514,6 +534,8 @@
 
   CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
 
+  total_verified_before_ = span_reader_.TotalConsumed();
+
   max_out_of_order_duration_ =
       FLAGS_max_out_of_order > 0
           ? chrono::duration_cast<chrono::nanoseconds>(
@@ -527,11 +549,84 @@
 std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
   absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
   if (msg_data == absl::Span<const uint8_t>()) {
+    if (is_corrupted()) {
+      LOG(ERROR) << "Total corrupted volumes: before = "
+                 << total_verified_before_
+                 << " | corrupted = " << total_corrupted_
+                 << " | during = " << total_verified_during_
+                 << " | after = " << total_verified_after_ << std::endl;
+    }
+
+    if (span_reader_.IsIncomplete()) {
+      LOG(ERROR) << "Unable to access some messages in " << filename()
+                 << " : " << span_reader_.TotalRead() << " bytes read, "
+                 << span_reader_.TotalConsumed() << " bytes usable."
+                 << std::endl;
+    }
     return nullptr;
   }
 
   SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
-  CHECK(msg.Verify()) << ": Corrupted message from " << filename();
+
+  if (crash_on_corrupt_message_flag_) {
+    CHECK(msg.Verify()) << "Corrupted message at offset "
+                        << total_verified_before_
+                        << " found within " << filename()
+                        << "; set --nocrash_on_corrupt_message to see summary;"
+                        << " also set --ignore_corrupt_messages to process"
+                        << " anyway";
+
+  } else if (!msg.Verify()) {
+    LOG(ERROR) << "Corrupted message at offset "
+               << total_verified_before_
+               << " from " << filename() << std::endl;
+
+    total_corrupted_ += msg_data.size();
+
+    while (true) {
+      absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
+
+      if (msg_data == absl::Span<const uint8_t>()) {
+        if (!ignore_corrupt_messages_flag_) {
+          LOG(ERROR) << "Total corrupted volumes: before = "
+                     << total_verified_before_
+                     << " | corrupted = " << total_corrupted_
+                     << " | during = " << total_verified_during_
+                     << " | after = " << total_verified_after_ << std::endl;
+
+          if (span_reader_.IsIncomplete()) {
+            LOG(ERROR) << "Unable to access some messages in " << filename()
+                       << " : " << span_reader_.TotalRead() << " bytes read, "
+                       << span_reader_.TotalConsumed() << " bytes usable."
+                       << std::endl;
+          }
+          return nullptr;
+        }
+        break;
+      }
+
+      SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
+
+      if (!next_msg.Verify()) {
+        total_corrupted_ += msg_data.size();
+        total_verified_during_ += total_verified_after_;
+        total_verified_after_ = 0;
+
+      } else {
+        total_verified_after_ += msg_data.size();
+        if (ignore_corrupt_messages_flag_) {
+          msg = next_msg;
+          break;
+        }
+      }
+    }
+  }
+
+  if (is_corrupted()) {
+    total_verified_after_ += msg_data.size();
+  } else {
+    total_verified_before_ += msg_data.size();
+  }
 
   auto result = UnpackedMessageHeader::MakeMessage(msg.message());
 
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index f2bdc42..41d4a9a 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -206,6 +206,11 @@
 
   std::string_view filename() const { return filename_; }
 
+  size_t TotalRead() const { return total_read_; }
+  size_t TotalConsumed() const { return total_consumed_; }
+  bool IsIncomplete() const { return is_finished_
+      && total_consumed_ < total_read_; }
+
   // Returns a span with the data for the next message from the log file,
   // including the size.  The result is only guarenteed to be valid until
   // ReadMessage() or PeekMessage() is called again.
@@ -240,6 +245,17 @@
 
   // Amount of data consumed already in data_.
   size_t consumed_data_ = 0;
+
+  // Accumulates the total volume of bytes read from filename_
+  size_t total_read_ = 0;
+
+  // Accumulates the total volume of read bytes that were 'consumed' into
+  // messages. May be less than total_read_, if the last message (span) is
+  // either truncated or somehow corrupt.
+  size_t total_consumed_ = 0;
+
+  // Reached the end, no more readable messages.
+  bool is_finished_ = false;
 };
 
 // Reads the last header from a log file.  This handles any duplicate headers
@@ -293,6 +309,14 @@
     return newest_timestamp() - max_out_of_order_duration();
   }
 
+  // Flag value setters for testing
+  void set_crash_on_corrupt_message_flag(bool b) {
+    crash_on_corrupt_message_flag_ = b;
+  }
+  void set_ignore_corrupt_messages_flag(bool b) {
+    ignore_corrupt_messages_flag_ = b;
+  }
+
  private:
   // Log chunk reader.
   SpanReader span_reader_;
@@ -306,6 +330,30 @@
 
   // Timestamp of the newest message in a channel queue.
   monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+
+  // Total volume of verifiable messages from the beginning of the file.
+  // TODO - are message counts also useful?
+  size_t total_verified_before_ = 0;
+
+  // Total volume of messages with corrupted flatbuffer formatting, if any.
+  // Excludes corrupted message content.
+  // TODO - if the layout included something as simple as a CRC (relatively
+  // fast and robust enough) for each span, then corrupted content could be
+  // included in this check.
+  size_t total_corrupted_ = 0;
+
+  // Total volume of verifiable messages intermixed with corrupted messages,
+  // if any. Will be == 0 if total_corrupted_ == 0.
+  size_t total_verified_during_ = 0;
+
+  // Total volume of verifiable messages found after the last corrupted one,
+  // if any. Will be == 0 if total_corrupted_ == 0.
+  size_t total_verified_after_ = 0;
+
+  bool is_corrupted() const { return total_corrupted_ > 0; }
+
+  bool crash_on_corrupt_message_flag_ = true;
+  bool ignore_corrupt_messages_flag_ = false;
 };
 
 // A class to seamlessly read messages from a list of part files.
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index c9ec37e..d024ae0 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -9,6 +9,7 @@
 #include "aos/flatbuffers.h"
 #include "aos/json_to_flatbuffer.h"
 #include "aos/testing/tmpdir.h"
+#include "gflags/gflags.h"
 #include "gtest/gtest.h"
 
 namespace aos {
@@ -2720,6 +2721,105 @@
       "Found overlapping boots on");
 }
 
+// Tests that we MessageReader blows up on a bad message.
+TEST(MessageReaderConfirmCrash, ReadWrite) {
+  const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
+  unlink(logfile.c_str());
+
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
+      JsonToSizedFlatbuffer<LogFileHeader>(
+          R"({ "max_out_of_order_duration": 100000000 })");
+  const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
+      JsonToSizedFlatbuffer<MessageHeader>(
+          R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
+  const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
+      JsonToSizedFlatbuffer<MessageHeader>(
+          R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
+  const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
+      JsonToSizedFlatbuffer<MessageHeader>(
+          R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
+
+  // Starts out like a proper flat buffer header, but it breaks down ...
+  std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
+  absl::Span<uint8_t> m3_span(garbage);
+
+  {
+    DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config.span());
+    writer.QueueSpan(m1.span());
+    writer.QueueSpan(m2.span());
+    writer.QueueSpan(m3_span);
+    writer.QueueSpan(m4.span());  // This message is "hidden"
+  }
+
+  {
+    MessageReader reader(logfile);
+
+    EXPECT_EQ(reader.filename(), logfile);
+
+    EXPECT_EQ(
+        reader.max_out_of_order_duration(),
+        std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
+    EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
+    EXPECT_TRUE(reader.ReadMessage());
+    EXPECT_EQ(reader.newest_timestamp(),
+              monotonic_clock::time_point(chrono::nanoseconds(1)));
+    EXPECT_TRUE(reader.ReadMessage());
+    EXPECT_EQ(reader.newest_timestamp(),
+              monotonic_clock::time_point(chrono::nanoseconds(2)));
+    // Confirm default crashing behavior
+    EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
+  }
+
+  {
+    gflags::FlagSaver fs;
+
+    MessageReader reader(logfile);
+    reader.set_crash_on_corrupt_message_flag(false);
+
+    EXPECT_EQ(reader.filename(), logfile);
+
+    EXPECT_EQ(
+        reader.max_out_of_order_duration(),
+        std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
+    EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
+    EXPECT_TRUE(reader.ReadMessage());
+    EXPECT_EQ(reader.newest_timestamp(),
+              monotonic_clock::time_point(chrono::nanoseconds(1)));
+    EXPECT_TRUE(reader.ReadMessage());
+    EXPECT_EQ(reader.newest_timestamp(),
+              monotonic_clock::time_point(chrono::nanoseconds(2)));
+    // Confirm avoiding the corrupted message crash, stopping instead.
+    EXPECT_FALSE(reader.ReadMessage());
+  }
+
+  {
+    gflags::FlagSaver fs;
+
+    MessageReader reader(logfile);
+    reader.set_crash_on_corrupt_message_flag(false);
+    reader.set_ignore_corrupt_messages_flag(true);
+
+    EXPECT_EQ(reader.filename(), logfile);
+
+    EXPECT_EQ(
+        reader.max_out_of_order_duration(),
+        std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
+    EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
+    EXPECT_TRUE(reader.ReadMessage());
+    EXPECT_EQ(reader.newest_timestamp(),
+              monotonic_clock::time_point(chrono::nanoseconds(1)));
+    EXPECT_TRUE(reader.ReadMessage());
+    EXPECT_EQ(reader.newest_timestamp(),
+              monotonic_clock::time_point(chrono::nanoseconds(2)));
+    // Confirm skipping of the corrupted message to read the hidden one.
+    EXPECT_TRUE(reader.ReadMessage());
+    EXPECT_EQ(reader.newest_timestamp(),
+              monotonic_clock::time_point(chrono::nanoseconds(4)));
+    EXPECT_FALSE(reader.ReadMessage());
+  }
+}
+
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos