Work around duplicate headers at the start of logs
Somehow, we have managed to log duplicate headers at the beginning of
some of our logs.
log_11_data/11/aos/aos.message_bridge.Timestamp.part0.bfbs.xz and
friends seem to be where we are finding these issues. Hide this
behavior behind a flag.
We urgently need a follow-up commit which fixes the log writer so it
doesn't write duplicate headers. Unfortuantely, we have valuable data
logged with duplicate headers in it so we need this workaround.
Change-Id: I905023fc647547fda5004b303f1a6988ec311a93
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index c2378ab..edd0ac7 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -6,6 +6,7 @@
#include <string_view>
#include <vector>
+#include "absl/strings/escaping.h"
#include "aos/configuration.h"
#include "aos/events/logging/log_reader.h"
#include "aos/events/simulated_event_loop.h"
@@ -91,20 +92,61 @@
if (argc != 2) {
LOG(FATAL) << "Expected 1 logfile as an argument.";
}
- aos::logger::MessageReader reader(argv[1]);
+ aos::logger::SpanReader reader(argv[1]);
+ absl::Span<const uint8_t> raw_log_file_header_span = reader.ReadMessage();
+ if (raw_log_file_header_span == absl::Span<const uint8_t>()) {
+ LOG(WARNING) << "Empty log file on " << reader.filename();
+ return 0;
+ }
+
+ // Now, reproduce the log file header deduplication logic inline so we can
+ // print out all the headers we find.
+ aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>
+ log_file_header(raw_log_file_header_span);
+ if (!log_file_header.Verify()) {
+ LOG(ERROR) << "Header corrupted on " << reader.filename();
+ return 1;
+ }
+ while (true) {
+ absl::Span<const uint8_t> maybe_header_data = reader.PeekMessage();
+ if (maybe_header_data == absl::Span<const uint8_t>()) {
+ break;
+ }
+
+ aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
+ maybe_header_data);
+ if (maybe_header.Verify()) {
+ std::cout << aos::FlatbufferToJson(
+ log_file_header,
+ {.multi_line = FLAGS_pretty,
+ .max_vector_size =
+ static_cast<size_t>(FLAGS_max_vector_size)})
+ << std::endl;
+ LOG(WARNING) << "Found duplicate LogFileHeader in "
+ << reader.filename();
+ log_file_header =
+ aos::SizePrefixedFlatbufferVector<aos::logger::LogFileHeader>(
+ maybe_header_data);
+
+ reader.ConsumeMessage();
+ } else {
+ break;
+ }
+ }
+
+ // And now use the final sha256 to match the raw_header.
std::optional<aos::logger::MessageReader> raw_header_reader;
- const aos::logger::LogFileHeader *full_header = reader.log_file_header();
+ const aos::logger::LogFileHeader *full_header = &log_file_header.message();
if (!FLAGS_raw_header.empty()) {
raw_header_reader.emplace(FLAGS_raw_header);
std::cout << aos::FlatbufferToJson(
- reader.log_file_header(),
- {.multi_line = FLAGS_pretty,
- .max_vector_size =
- static_cast<size_t>(FLAGS_max_vector_size)})
+ full_header, {.multi_line = FLAGS_pretty,
+ .max_vector_size = static_cast<size_t>(
+ FLAGS_max_vector_size)})
<< std::endl;
CHECK_EQ(
- reader.log_file_header()->configuration_sha256()->string_view(),
+ full_header->configuration_sha256()->string_view(),
aos::logger::Sha256(raw_header_reader->raw_log_file_header().span()));
full_header = raw_header_reader->log_file_header();
}
@@ -115,46 +157,45 @@
<< std::endl;
while (true) {
- std::optional<
- aos::SizePrefixedFlatbufferVector<aos::logger::MessageHeader>>
- message = reader.ReadMessage();
- if (!message) {
+ const aos::SizePrefixedFlatbufferSpan<aos::logger::MessageHeader> message(
+ reader.ReadMessage());
+ if (message.span() == absl::Span<const uint8_t>()) {
break;
}
const auto *const channels = full_header->configuration()->channels();
- const size_t channel_index = message.value().message().channel_index();
+ const size_t channel_index = message.message().channel_index();
CHECK_LT(channel_index, channels->size());
const aos::Channel *const channel = channels->Get(channel_index);
- if (message.value().message().data() != nullptr) {
+ CHECK(message.Verify()) << absl::BytesToHexString(std::string_view(
+ reinterpret_cast<const char *>(message.span().data()),
+ message.span().size()));
+
+ if (message.message().data() != nullptr) {
CHECK(channel->has_schema());
- CHECK(flatbuffers::Verify(*channel->schema(),
- *channel->schema()->root_table(),
- message.value().message().data()->data(),
- message.value().message().data()->size()))
+ CHECK(flatbuffers::Verify(
+ *channel->schema(), *channel->schema()->root_table(),
+ message.message().data()->data(), message.message().data()->size()))
<< ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
<< channel->type()->c_str();
}
- if (FLAGS_format_raw && message.value().message().data() != nullptr) {
+ if (FLAGS_format_raw && message.message().data() != nullptr) {
std::cout << aos::configuration::StrippedChannelToString(channel) << " "
- << aos::FlatbufferToJson(
- message.value(),
- {.multi_line = FLAGS_pretty, .max_vector_size = 4})
+ << aos::FlatbufferToJson(message, {.multi_line = FLAGS_pretty,
+ .max_vector_size = 4})
<< ": "
<< aos::FlatbufferToJson(
- channel->schema(),
- message.value().message().data()->data(),
+ channel->schema(), message.message().data()->data(),
{FLAGS_pretty,
static_cast<size_t>(FLAGS_max_vector_size)})
<< std::endl;
} else {
std::cout << aos::configuration::StrippedChannelToString(channel) << " "
<< aos::FlatbufferToJson(
- message.value(),
- {FLAGS_pretty,
- static_cast<size_t>(FLAGS_max_vector_size)})
+ message, {FLAGS_pretty,
+ static_cast<size_t>(FLAGS_max_vector_size)})
<< std::endl;
}
}
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 5f0e372..cb22130 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -38,6 +38,10 @@
max_out_of_order, -1,
"If set, this overrides the max out of order duration for a log file.");
+DEFINE_bool(workaround_double_headers, true,
+ "Some old log files have two headers at the beginning. Use the "
+ "last header as the actual header.");
+
namespace aos::logger {
namespace chrono = std::chrono;
@@ -401,9 +405,8 @@
}
std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
- std::string_view filename) {
- SpanReader span_reader(filename);
- absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
+ SpanReader *span_reader) {
+ absl::Span<const uint8_t> config_data = span_reader->ReadMessage();
// Make sure something was read.
if (config_data == absl::Span<const uint8_t>()) {
@@ -415,9 +418,41 @@
if (!result.Verify()) {
return std::nullopt;
}
+
+ if (FLAGS_workaround_double_headers) {
+ while (true) {
+ absl::Span<const uint8_t> maybe_header_data = span_reader->PeekMessage();
+ if (maybe_header_data == absl::Span<const uint8_t>()) {
+ break;
+ }
+
+ aos::SizePrefixedFlatbufferSpan<aos::logger::LogFileHeader> maybe_header(
+ maybe_header_data);
+ if (maybe_header.Verify()) {
+ LOG(WARNING) << "Found duplicate LogFileHeader in "
+ << span_reader->filename();
+ ResizeableBuffer header_data_copy;
+ header_data_copy.resize(maybe_header_data.size());
+ memcpy(header_data_copy.data(), maybe_header_data.begin(),
+ header_data_copy.size());
+ result = SizePrefixedFlatbufferVector<LogFileHeader>(
+ std::move(header_data_copy));
+
+ span_reader->ConsumeMessage();
+ } else {
+ break;
+ }
+ }
+ }
return result;
}
+std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
+ std::string_view filename) {
+ SpanReader span_reader(filename);
+ return ReadHeader(&span_reader);
+}
+
std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
std::string_view filename, size_t n) {
SpanReader span_reader(filename);
@@ -443,19 +478,13 @@
: span_reader_(filename),
raw_log_file_header_(
SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
- // Make sure we have enough to read the size.
- absl::Span<const uint8_t> header_data = span_reader_.ReadMessage();
+ std::optional<SizePrefixedFlatbufferVector<LogFileHeader>>
+ raw_log_file_header = ReadHeader(&span_reader_);
// Make sure something was read.
- CHECK(header_data != absl::Span<const uint8_t>())
- << ": Failed to read header from: " << filename;
+ CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
- // And copy the header data so we have it forever.
- ResizeableBuffer header_data_copy;
- header_data_copy.resize(header_data.size());
- memcpy(header_data_copy.data(), header_data.begin(), header_data_copy.size());
- raw_log_file_header_ =
- SizePrefixedFlatbufferVector<LogFileHeader>(std::move(header_data_copy));
+ raw_log_file_header_ = std::move(*raw_log_file_header);
max_out_of_order_duration_ =
FLAGS_max_out_of_order > 0
@@ -476,6 +505,8 @@
SizePrefixedFlatbufferVector<MessageHeader> result(msg_data);
+ CHECK(result.Verify()) << ": Corrupted message from " << filename();
+
const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
chrono::nanoseconds(result.message().monotonic_sent_time()));
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index ee13a7c..f876d25 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -199,8 +199,12 @@
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, LogType log_type);
+// Reads the last header from a log file. This handles any duplicate headers
+// that were written.
std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> ReadHeader(
std::string_view filename);
+// Reads the Nth message from a log file, excluding the header. Note: this
+// doesn't handle duplicate headers.
std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadNthMessage(
std::string_view filename, size_t n);