Improve messaging when ReadMessages detects a corrupted message.
Added tests for various control flag behaviors.
Change-Id: I1ea3ebae7c1e15e0bc5ce5586ac7752bde20cb80
Signed-off-by: Brian Smartt <brian.smartt@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 59b106e..ecdf5d6 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -50,6 +50,17 @@
"Some old log files have two headers at the beginning. Use the "
"last header as the actual header.");
+DEFINE_bool(crash_on_corrupt_message, true,
+ "When true, MessageReader will crash the first time a message "
+ "with corrupted format is found. When false, the crash will be "
+ "suppressed, and any remaining readable messages will be "
+ "evaluated to present verified vs corrupted stats.");
+
+DEFINE_bool(ignore_corrupt_messages, false,
+ "When true, and crash_on_corrupt_message is false, then any "
+ "corrupt message found by MessageReader be silently ignored, "
+ "providing access to all uncorrupted messages in a logfile.");
+
namespace aos::logger {
namespace {
@@ -389,15 +400,19 @@
}
void SpanReader::ConsumeMessage() {
- consumed_data_ +=
+ size_t consumed_size =
flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
sizeof(flatbuffers::uoffset_t);
+ consumed_data_ += consumed_size;
+ total_consumed_ += consumed_size;
}
absl::Span<const uint8_t> SpanReader::ReadMessage() {
absl::Span<const uint8_t> result = PeekMessage();
if (result != absl::Span<const uint8_t>()) {
ConsumeMessage();
+ } else {
+ is_finished_ = true;
}
return result;
}
@@ -427,6 +442,8 @@
return false;
}
+ total_read_ += count;
+
return true;
}
@@ -504,6 +521,9 @@
: span_reader_(filename),
raw_log_file_header_(
SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
+ set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
+ set_ignore_corrupt_messages_flag(FLAGS_ignore_corrupt_messages);
+
std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
raw_log_file_header = ReadHeader(&span_reader_);
@@ -514,6 +534,8 @@
CHECK(raw_log_file_header_.Verify()) << "Log file header is corrupted";
+ total_verified_before_ = span_reader_.TotalConsumed();
+
max_out_of_order_duration_ =
FLAGS_max_out_of_order > 0
? chrono::duration_cast<chrono::nanoseconds>(
@@ -527,11 +549,84 @@
std::shared_ptr<UnpackedMessageHeader> MessageReader::ReadMessage() {
absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
if (msg_data == absl::Span<const uint8_t>()) {
+ if (is_corrupted()) {
+ LOG(ERROR) << "Total corrupted volumes: before = "
+ << total_verified_before_
+ << " | corrupted = " << total_corrupted_
+ << " | during = " << total_verified_during_
+ << " | after = " << total_verified_after_ << std::endl;
+ }
+
+ if (span_reader_.IsIncomplete()) {
+ LOG(ERROR) << "Unable to access some messages in " << filename()
+ << " : " << span_reader_.TotalRead() << " bytes read, "
+ << span_reader_.TotalConsumed() << " bytes usable."
+ << std::endl;
+ }
return nullptr;
}
SizePrefixedFlatbufferSpan<MessageHeader> msg(msg_data);
- CHECK(msg.Verify()) << ": Corrupted message from " << filename();
+
+ if (crash_on_corrupt_message_flag_) {
+ CHECK(msg.Verify()) << "Corrupted message at offset "
+ << total_verified_before_
+ << " found within " << filename()
+ << "; set --nocrash_on_corrupt_message to see summary;"
+ << " also set --ignore_corrupt_messages to process"
+ << " anyway";
+
+ } else if (!msg.Verify()) {
+ LOG(ERROR) << "Corrupted message at offset "
+ << total_verified_before_
+ << " from " << filename() << std::endl;
+
+ total_corrupted_ += msg_data.size();
+
+ while (true) {
+ absl::Span<const uint8_t> msg_data = span_reader_.ReadMessage();
+
+ if (msg_data == absl::Span<const uint8_t>()) {
+ if (!ignore_corrupt_messages_flag_) {
+ LOG(ERROR) << "Total corrupted volumes: before = "
+ << total_verified_before_
+ << " | corrupted = " << total_corrupted_
+ << " | during = " << total_verified_during_
+ << " | after = " << total_verified_after_ << std::endl;
+
+ if (span_reader_.IsIncomplete()) {
+ LOG(ERROR) << "Unable to access some messages in " << filename()
+ << " : " << span_reader_.TotalRead() << " bytes read, "
+ << span_reader_.TotalConsumed() << " bytes usable."
+ << std::endl;
+ }
+ return nullptr;
+ }
+ break;
+ }
+
+ SizePrefixedFlatbufferSpan<MessageHeader> next_msg(msg_data);
+
+ if (!next_msg.Verify()) {
+ total_corrupted_ += msg_data.size();
+ total_verified_during_ += total_verified_after_;
+ total_verified_after_ = 0;
+
+ } else {
+ total_verified_after_ += msg_data.size();
+ if (ignore_corrupt_messages_flag_) {
+ msg = next_msg;
+ break;
+ }
+ }
+ }
+ }
+
+ if (is_corrupted()) {
+ total_verified_after_ += msg_data.size();
+ } else {
+ total_verified_before_ += msg_data.size();
+ }
auto result = UnpackedMessageHeader::MakeMessage(msg.message());
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index f2bdc42..41d4a9a 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -206,6 +206,11 @@
std::string_view filename() const { return filename_; }
+ size_t TotalRead() const { return total_read_; }
+ size_t TotalConsumed() const { return total_consumed_; }
+ bool IsIncomplete() const { return is_finished_
+ && total_consumed_ < total_read_; }
+
// Returns a span with the data for the next message from the log file,
// including the size. The result is only guarenteed to be valid until
// ReadMessage() or PeekMessage() is called again.
@@ -240,6 +245,17 @@
// Amount of data consumed already in data_.
size_t consumed_data_ = 0;
+
+ // Accumulates the total volume of bytes read from filename_
+ size_t total_read_ = 0;
+
+ // Accumulates the total volume of read bytes that were 'consumed' into
+ // messages. May be less than total_read_, if the last message (span) is
+ // either truncated or somehow corrupt.
+ size_t total_consumed_ = 0;
+
+ // Reached the end, no more readable messages.
+ bool is_finished_ = false;
};
// Reads the last header from a log file. This handles any duplicate headers
@@ -293,6 +309,14 @@
return newest_timestamp() - max_out_of_order_duration();
}
+ // Flag value setters for testing
+ void set_crash_on_corrupt_message_flag(bool b) {
+ crash_on_corrupt_message_flag_ = b;
+ }
+ void set_ignore_corrupt_messages_flag(bool b) {
+ ignore_corrupt_messages_flag_ = b;
+ }
+
private:
// Log chunk reader.
SpanReader span_reader_;
@@ -306,6 +330,30 @@
// Timestamp of the newest message in a channel queue.
monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+
+ // Total volume of verifiable messages from the beginning of the file.
+ // TODO - are message counts also useful?
+ size_t total_verified_before_ = 0;
+
+ // Total volume of messages with corrupted flatbuffer formatting, if any.
+ // Excludes corrupted message content.
+ // TODO - if the layout included something as simple as a CRC (relatively
+ // fast and robust enough) for each span, then corrupted content could be
+ // included in this check.
+ size_t total_corrupted_ = 0;
+
+ // Total volume of verifiable messages intermixed with corrupted messages,
+ // if any. Will be == 0 if total_corrupted_ == 0.
+ size_t total_verified_during_ = 0;
+
+ // Total volume of verifiable messages found after the last corrupted one,
+ // if any. Will be == 0 if total_corrupted_ == 0.
+ size_t total_verified_after_ = 0;
+
+ bool is_corrupted() const { return total_corrupted_ > 0; }
+
+ bool crash_on_corrupt_message_flag_ = true;
+ bool ignore_corrupt_messages_flag_ = false;
};
// A class to seamlessly read messages from a list of part files.
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index c9ec37e..d024ae0 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -9,6 +9,7 @@
#include "aos/flatbuffers.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/testing/tmpdir.h"
+#include "gflags/gflags.h"
#include "gtest/gtest.h"
namespace aos {
@@ -2720,6 +2721,105 @@
"Found overlapping boots on");
}
+// Tests that we MessageReader blows up on a bad message.
+TEST(MessageReaderConfirmCrash, ReadWrite) {
+ const std::string logfile = aos::testing::TestTmpDir() + "/log.bfbs";
+ unlink(logfile.c_str());
+
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config =
+ JsonToSizedFlatbuffer<LogFileHeader>(
+ R"({ "max_out_of_order_duration": 100000000 })");
+ const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
+ JsonToSizedFlatbuffer<MessageHeader>(
+ R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
+ const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
+ JsonToSizedFlatbuffer<MessageHeader>(
+ R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
+ const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m4 =
+ JsonToSizedFlatbuffer<MessageHeader>(
+ R"({ "channel_index": 0, "monotonic_sent_time": 4 })");
+
+ // Starts out like a proper flat buffer header, but it breaks down ...
+ std::vector<uint8_t> garbage{8, 0, 0, 0, 16, 0, 0, 0, 4, 0, 0, 0};
+ absl::Span<uint8_t> m3_span(garbage);
+
+ {
+ DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
+ writer.QueueSpan(config.span());
+ writer.QueueSpan(m1.span());
+ writer.QueueSpan(m2.span());
+ writer.QueueSpan(m3_span);
+ writer.QueueSpan(m4.span()); // This message is "hidden"
+ }
+
+ {
+ MessageReader reader(logfile);
+
+ EXPECT_EQ(reader.filename(), logfile);
+
+ EXPECT_EQ(
+ reader.max_out_of_order_duration(),
+ std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
+ EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
+ EXPECT_TRUE(reader.ReadMessage());
+ EXPECT_EQ(reader.newest_timestamp(),
+ monotonic_clock::time_point(chrono::nanoseconds(1)));
+ EXPECT_TRUE(reader.ReadMessage());
+ EXPECT_EQ(reader.newest_timestamp(),
+ monotonic_clock::time_point(chrono::nanoseconds(2)));
+ // Confirm default crashing behavior
+ EXPECT_DEATH(reader.ReadMessage(), "Corrupted message at offset");
+ }
+
+ {
+ gflags::FlagSaver fs;
+
+ MessageReader reader(logfile);
+ reader.set_crash_on_corrupt_message_flag(false);
+
+ EXPECT_EQ(reader.filename(), logfile);
+
+ EXPECT_EQ(
+ reader.max_out_of_order_duration(),
+ std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
+ EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
+ EXPECT_TRUE(reader.ReadMessage());
+ EXPECT_EQ(reader.newest_timestamp(),
+ monotonic_clock::time_point(chrono::nanoseconds(1)));
+ EXPECT_TRUE(reader.ReadMessage());
+ EXPECT_EQ(reader.newest_timestamp(),
+ monotonic_clock::time_point(chrono::nanoseconds(2)));
+ // Confirm avoiding the corrupted message crash, stopping instead.
+ EXPECT_FALSE(reader.ReadMessage());
+ }
+
+ {
+ gflags::FlagSaver fs;
+
+ MessageReader reader(logfile);
+ reader.set_crash_on_corrupt_message_flag(false);
+ reader.set_ignore_corrupt_messages_flag(true);
+
+ EXPECT_EQ(reader.filename(), logfile);
+
+ EXPECT_EQ(
+ reader.max_out_of_order_duration(),
+ std::chrono::nanoseconds(config.message().max_out_of_order_duration()));
+ EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
+ EXPECT_TRUE(reader.ReadMessage());
+ EXPECT_EQ(reader.newest_timestamp(),
+ monotonic_clock::time_point(chrono::nanoseconds(1)));
+ EXPECT_TRUE(reader.ReadMessage());
+ EXPECT_EQ(reader.newest_timestamp(),
+ monotonic_clock::time_point(chrono::nanoseconds(2)));
+ // Confirm skipping of the corrupted message to read the hidden one.
+ EXPECT_TRUE(reader.ReadMessage());
+ EXPECT_EQ(reader.newest_timestamp(),
+ monotonic_clock::time_point(chrono::nanoseconds(4)));
+ EXPECT_FALSE(reader.ReadMessage());
+ }
+}
+
} // namespace testing
} // namespace logger
} // namespace aos