Improve messaging when ReadMessages detects a corrupted message.
Added tests for various control flag behaviors.
Change-Id: I1ea3ebae7c1e15e0bc5ce5586ac7752bde20cb80
Signed-off-by: Brian Smartt <brian.smartt@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 59b106e..ecdf5d6 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -50,6 +50,17 @@
"Some old log files have two headers at the beginning. Use the "
"last header as the actual header.");
+DEFINE_bool(crash_on_corrupt_message, true,
+ "When true, MessageReader will crash the first time a message "
+ "with corrupted format is found. When false, the crash will be "
+ "suppressed, and any remaining readable messages will be "
+ "evaluated to present verified vs corrupted stats.");
+
+DEFINE_bool(ignore_corrupt_messages, false,
+ "When true, and crash_on_corrupt_message is false, then any "
+ "corrupt message found by MessageReader be silently ignored, "
+ "providing access to all uncorrupted messages in a logfile.");
+
namespace aos::logger {
namespace {
@@ -389,15 +400,19 @@
}
void SpanReader::ConsumeMessage() {
- consumed_data_ +=
+ size_t consumed_size =
flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
sizeof(flatbuffers::uoffset_t);
+ consumed_data_ += consumed_size;
+ total_consumed_ += consumed_size;
}
absl::Span<const uint8_t> SpanReader::ReadMessage() {
absl::Span<const uint8_t> result = PeekMessage();
if (result != absl::Span<const uint8_t>()) {
ConsumeMessage();
+ } else {
+ is_finished_ = true;
}
return result;
}
@@ -427,6 +442,8 @@
return false;
}
+ total_read_ += count;
+
return true;
}
@@ -504,6 +521,9 @@
: span_reader_(filename),
raw_log_file_header_(
SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
+ set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
+ set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
+
std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
raw_log_file_header = ReadHeader(&span_reader_);
@@ -514,6 +534,8 @@
CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
+ total_verified_before_ = span_reader_.TotalConsumed();
+
max_out_of_order_duration_ =
FLAGS_max_out_of_order > 0
? chrono::duration_cast<chrono::nanoseconds>(
@@ -527,11 +549,84 @@
std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
if (msg_data == absl::Span<const uint8_t>()) {
+ if (is_corrupted()) {
+ LOG(ERROR) << "Total corrupted volumes: before = "
+ << total_verified_before_
+ << " | corrupted = " << total_corrupted_
+ << " | during = " << total_verified_during_
+ << " | after = " << total_verified_after_ << std::endl;
+ }
+
+ if (span_reader_.IsIncomplete()) {
+ LOG(ERROR) << "Unable to access some messages in " << filename()
+ << " : " << span_reader_.TotalRead() << " bytes read, "
+ << span_reader_.TotalConsumed() << " bytes usable."
+ << std::endl;
+ }
return nullptr;
}
SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
- CHECK(msg.Verify()) << ": Corrupted message from " << filename();
+
+ if (crash_on_corrupt_message_flag_) {
+ CHECK(msg.Verify()) << "Corrupted message at offset "
+ << total_verified_before_
+ << " found within " << filename()
+ << "; set --nocrash_on_corrupt_message to see summary;"
+ << " also set --ignore_corrupt_messages to process"
+ << " anyway";
+
+ } else if (!msg.Verify()) {
+ LOG(ERROR) << "Corrupted message at offset "
+ << total_verified_before_
+ << " from " << filename() << std::endl;
+
+ total_corrupted_ += msg_data.size();
+
+ while (true) {
+ absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
+
+ if (msg_data == absl::Span<const uint8_t>()) {
+ if (!ignore_corrupt_messages_flag_) {
+ LOG(ERROR) << "Total corrupted volumes: before = "
+ << total_verified_before_
+ << " | corrupted = " << total_corrupted_
+ << " | during = " << total_verified_during_
+ << " | after = " << total_verified_after_ << std::endl;
+
+ if (span_reader_.IsIncomplete()) {
+ LOG(ERROR) << "Unable to access some messages in " << filename()
+ << " : " << span_reader_.TotalRead() << " bytes read, "
+ << span_reader_.TotalConsumed() << " bytes usable."
+ << std::endl;
+ }
+ return nullptr;
+ }
+ break;
+ }
+
+ SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
+
+ if (!next_msg.Verify()) {
+ total_corrupted_ += msg_data.size();
+ total_verified_during_ += total_verified_after_;
+ total_verified_after_ = 0;
+
+ } else {
+ total_verified_after_ += msg_data.size();
+ if (ignore_corrupt_messages_flag_) {
+ msg = next_msg;
+ break;
+ }
+ }
+ }
+ }
+
+ if (is_corrupted()) {
+ total_verified_after_ += msg_data.size();
+ } else {
+ total_verified_before_ += msg_data.size();
+ }
auto result = UnpackedMessageHeader::MakeMessage(msg.message());