Handle empty and corrupted log parts.
We used to crash. Now, sort them as "corrupted", and also use what we
can extract.
Corrupted files come from unsafe shutdowns. It is impractical to be
perfect and never have any.
Change-Id: I68acc05d482c484f17ad335f4ab2054e180d8a37
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index b7f3b5a..f4c4ceb 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -291,6 +291,11 @@
cc_test(
name = "logger_test",
srcs = ["logger_test.cc"],
+ copts = select({
+ "//tools:cpu_k8": ["-DLZMA=1"],
+ "//tools:cpu_aarch64": ["-DLZMA=1"],
+ "//conditions:default": [],
+ }),
data = [
":multinode_pingpong_config",
"//aos/events:pingpong_config",
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index d3f7f38..43e6aa2 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -15,6 +15,8 @@
namespace chrono = std::chrono;
std::vector<LogFile> SortParts(const std::vector<std::string> &parts) {
+ std::vector<std::string> corrupted;
+
// Start by grouping all parts by UUID, and extracting the part index.
// Datastructure to hold all the info extracted from a set of parts which go
// together so we can sort them afterwords.
@@ -63,32 +65,44 @@
// Now extract everything into our datastructures above for sorting.
for (const std::string &part : parts) {
- FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
+ std::optional<FlatbufferVector<LogFileHeader>> log_header =
+ ReadHeader(part);
+ if (!log_header) {
+ LOG(WARNING) << "Skipping " << part << " without a header";
+ corrupted.emplace_back(part);
+ continue;
+ }
const monotonic_clock::time_point monotonic_start_time(
- chrono::nanoseconds(log_header.message().monotonic_start_time()));
+ chrono::nanoseconds(log_header->message().monotonic_start_time()));
const realtime_clock::time_point realtime_start_time(
- chrono::nanoseconds(log_header.message().realtime_start_time()));
+ chrono::nanoseconds(log_header->message().realtime_start_time()));
const std::string_view node =
- log_header.message().has_node()
- ? log_header.message().node()->name()->string_view()
+ log_header->message().has_node()
+ ? log_header->message().node()->name()->string_view()
: "";
const std::string_view logger_node =
- log_header.message().has_logger_node()
- ? log_header.message().logger_node()->name()->string_view()
+ log_header->message().has_logger_node()
+ ? log_header->message().logger_node()->name()->string_view()
: "";
// Looks like an old log. No UUID, index, and also single node. We have
// little to no multi-node log files in the wild without part UUIDs and
// indexes which we care much about.
- if (!log_header.message().has_parts_uuid() &&
- !log_header.message().has_parts_index() &&
- !log_header.message().has_node()) {
- FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
+ if (!log_header->message().has_parts_uuid() &&
+ !log_header->message().has_parts_index() &&
+ !log_header->message().has_node()) {
+ std::optional<FlatbufferVector<MessageHeader>> first_message =
+ ReadNthMessage(part, 0);
+ if (!first_message) {
+ LOG(WARNING) << "Skipping " << part << " without any messages";
+ corrupted.emplace_back(part);
+ continue;
+ }
const monotonic_clock::time_point first_message_time(
- chrono::nanoseconds(first_message.message().monotonic_sent_time()));
+ chrono::nanoseconds(first_message->message().monotonic_sent_time()));
// Find anything with a matching start time. They all go together.
auto result = std::find_if(
@@ -111,17 +125,17 @@
continue;
}
- CHECK(log_header.message().has_log_event_uuid());
- CHECK(log_header.message().has_parts_uuid());
- CHECK(log_header.message().has_parts_index());
+ CHECK(log_header->message().has_log_event_uuid());
+ CHECK(log_header->message().has_parts_uuid());
+ CHECK(log_header->message().has_parts_index());
- CHECK_EQ(log_header.message().has_logger_node(),
- log_header.message().has_node());
+ CHECK_EQ(log_header->message().has_logger_node(),
+ log_header->message().has_node());
const std::string log_event_uuid =
- log_header.message().log_event_uuid()->str();
- const std::string parts_uuid = log_header.message().parts_uuid()->str();
- int32_t parts_index = log_header.message().parts_index();
+ log_header->message().log_event_uuid()->str();
+ const std::string parts_uuid = log_header->message().parts_uuid()->str();
+ int32_t parts_index = log_header->message().parts_index();
auto log_it = parts_list.find(log_event_uuid);
if (log_it == parts_list.end()) {
@@ -170,6 +184,15 @@
it->second.parts.emplace_back(std::make_pair(part, parts_index));
}
+ if (old_parts.empty() && parts_list.empty()) {
+ if (parts.empty()) {
+ return std::vector<LogFile>{};
+ } else {
+ LogFile log_file;
+ log_file.corrupted = std::move(corrupted);
+ return std::vector<LogFile>{log_file};
+ }
+ }
CHECK_NE(old_parts.empty(), parts_list.empty())
<< ": Can't have a mix of old and new parts.";
@@ -192,6 +215,7 @@
log_file.parts.emplace_back(std::move(p.parts));
log_file.monotonic_start_time = log_file.parts[0].monotonic_start_time;
log_file.realtime_start_time = log_file.parts[0].realtime_start_time;
+ log_file.corrupted = corrupted;
result.emplace_back(std::move(log_file));
}
@@ -207,6 +231,7 @@
new_file.logger_node = logs.second.logger_node;
new_file.monotonic_start_time = logs.second.monotonic_start_time;
new_file.realtime_start_time = logs.second.realtime_start_time;
+ new_file.corrupted = corrupted;
for (std::pair<const std::string, UnsortedLogParts> &parts :
logs.second.unsorted_parts) {
LogParts new_parts;
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 50bfbd7..0d2a0fb 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -45,6 +45,9 @@
// All the parts, unsorted.
std::vector<LogParts> parts;
+
+ // A list of parts which were corrupted and are unknown where they should go.
+ std::vector<std::string> corrupted;
};
std::ostream &operator<<(std::ostream &stream, const LogFile &file);
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 6b7a598..1c42e17 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -348,13 +348,15 @@
return true;
}
-FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename) {
+std::optional<FlatbufferVector<LogFileHeader>> ReadHeader(
+ std::string_view filename) {
SpanReader span_reader(filename);
absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
// Make sure something was read.
- CHECK(config_data != absl::Span<const uint8_t>())
- << ": Failed to read header from: " << filename;
+ if (config_data == absl::Span<const uint8_t>()) {
+ return std::nullopt;
+ }
// And copy the config so we have it forever, removing the size prefix.
ResizeableBuffer data;
@@ -364,16 +366,17 @@
return FlatbufferVector<LogFileHeader>(std::move(data));
}
-FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
- size_t n) {
+std::optional<FlatbufferVector<MessageHeader>> ReadNthMessage(
+ std::string_view filename, size_t n) {
SpanReader span_reader(filename);
absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
for (size_t i = 0; i < n + 1; ++i) {
data_span = span_reader.ReadMessage();
// Make sure something was read.
- CHECK(data_span != absl::Span<const uint8_t>())
- << ": Failed to read data from: " << filename;
+ if (data_span == absl::Span<const uint8_t>()) {
+ return std::nullopt;
+ }
}
// And copy the config so we have it forever, removing the size prefix.
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 12d7cac..8381a9a 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -181,9 +181,10 @@
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, LogType log_type);
-FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename);
-FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
- size_t n);
+std::optional<FlatbufferVector<LogFileHeader>> ReadHeader(
+ std::string_view filename);
+std::optional<FlatbufferVector<MessageHeader>> ReadNthMessage(
+ std::string_view filename, size_t n);
// Class to read chunks out of a log file.
class SpanReader {
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index c38cf03..977a82f 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -41,7 +41,10 @@
const std::vector<std::vector<std::string>> &filenames) {
CHECK_GE(filenames.size(), 1u) << ": Empty filenames list";
CHECK_GE(filenames[0].size(), 1u) << ": Empty filenames list";
- return ReadHeader(filenames[0][0]);
+ std::optional<FlatbufferVector<LogFileHeader>> result =
+ ReadHeader(filenames[0][0]);
+ CHECK(result);
+ return result.value();
}
namespace chrono = std::chrono;
} // namespace
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 57d274b..c55b18b 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -12,6 +12,10 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
+#ifdef LZMA
+#include "aos/events/logging/lzma_encoder.h"
+#endif
+
namespace aos {
namespace logger {
namespace testing {
@@ -261,7 +265,7 @@
// parts_index increments.
std::vector<FlatbufferVector<LogFileHeader>> log_header;
for (std::string_view f : {logfile0, logfile1}) {
- log_header.emplace_back(ReadHeader(f));
+ log_header.emplace_back(ReadHeader(f).value());
}
EXPECT_EQ(log_header[0].message().log_event_uuid()->string_view(),
@@ -422,20 +426,114 @@
return {factory->MakeEventLoop("logger", node), {}};
}
- void StartLogger(LoggerState *logger, std::string logfile_base = "") {
+ void StartLogger(LoggerState *logger, std::string logfile_base = "",
+ bool compress = false) {
if (logfile_base.empty()) {
logfile_base = logfile_base_;
}
logger->logger = std::make_unique<Logger>(logger->event_loop.get());
logger->logger->set_polling_period(std::chrono::milliseconds(100));
- logger->event_loop->OnRun([logger, logfile_base]() {
- logger->logger->StartLogging(std::make_unique<MultiNodeLogNamer>(
- logfile_base, logger->event_loop->configuration(),
- logger->event_loop->node()));
+ logger->event_loop->OnRun([logger, logfile_base, compress]() {
+ std::unique_ptr<MultiNodeLogNamer> namer =
+ std::make_unique<MultiNodeLogNamer>(
+ logfile_base, logger->event_loop->configuration(),
+ logger->event_loop->node());
+ if (compress) {
+#ifdef LZMA
+ namer->set_extension(".xz");
+ namer->set_encoder_factory(
+ []() { return std::make_unique<aos::logger::LzmaEncoder>(3); });
+#else
+ LOG(FATAL) << "Compression unsupported";
+#endif
+ }
+
+ logger->logger->StartLogging(std::move(namer));
});
}
+ void VerifyParts(const std::vector<LogFile> &sorted_parts,
+ const std::vector<std::string> &corrupted_parts = {}) {
+ EXPECT_EQ(sorted_parts.size(), 2u);
+
+ // Count up the number of UUIDs and make sure they are what we expect as a
+ // sanity check.
+ std::set<std::string> log_event_uuids;
+ std::set<std::string> parts_uuids;
+ std::set<std::string> both_uuids;
+
+ size_t missing_rt_count = 0;
+
+ std::vector<std::string> logger_nodes;
+ for (const LogFile &log_file : sorted_parts) {
+ EXPECT_FALSE(log_file.log_event_uuid.empty());
+ log_event_uuids.insert(log_file.log_event_uuid);
+ logger_nodes.emplace_back(log_file.logger_node);
+ both_uuids.insert(log_file.log_event_uuid);
+
+ for (const LogParts &part : log_file.parts) {
+ EXPECT_NE(part.monotonic_start_time, aos::monotonic_clock::min_time)
+ << ": " << part;
+ missing_rt_count +=
+ part.realtime_start_time == aos::realtime_clock::min_time;
+
+ EXPECT_TRUE(log_event_uuids.find(part.log_event_uuid) !=
+ log_event_uuids.end());
+ EXPECT_NE(part.node, "");
+ parts_uuids.insert(part.parts_uuid);
+ both_uuids.insert(part.parts_uuid);
+ }
+ }
+
+ // We won't have RT timestamps for 5 log files. We don't log the RT start
+ // time on remote nodes because we don't know it and would be guessing. And
+ // the log reader can actually do a better job.
+ EXPECT_EQ(missing_rt_count, 5u);
+
+ EXPECT_EQ(log_event_uuids.size(), 2u);
+ EXPECT_EQ(parts_uuids.size(), ToLogReaderVector(sorted_parts).size());
+ EXPECT_EQ(log_event_uuids.size() + parts_uuids.size(), both_uuids.size());
+
+ // Test that each list of parts is in order. Don't worry about the ordering
+ // between part file lists though.
+ // (inner vectors all need to be in order, but outer one doesn't matter).
+ EXPECT_THAT(ToLogReaderVector(sorted_parts),
+ ::testing::UnorderedElementsAreArray(structured_logfiles_));
+
+ EXPECT_THAT(logger_nodes, ::testing::UnorderedElementsAre("pi1", "pi2"));
+
+ EXPECT_NE(sorted_parts[0].realtime_start_time,
+ aos::realtime_clock::min_time);
+ EXPECT_NE(sorted_parts[1].realtime_start_time,
+ aos::realtime_clock::min_time);
+
+ EXPECT_NE(sorted_parts[0].monotonic_start_time,
+ aos::monotonic_clock::min_time);
+ EXPECT_NE(sorted_parts[1].monotonic_start_time,
+ aos::monotonic_clock::min_time);
+
+ EXPECT_THAT(sorted_parts[0].corrupted, ::testing::Eq(corrupted_parts));
+ EXPECT_THAT(sorted_parts[1].corrupted, ::testing::Eq(corrupted_parts));
+ }
+
+ void AddExtension(std::string_view extension) {
+ std::transform(logfiles_.begin(), logfiles_.end(), logfiles_.begin(),
+ [extension](const std::string &in) {
+ return absl::StrCat(in, extension);
+ });
+
+ std::transform(structured_logfiles_.begin(), structured_logfiles_.end(),
+ structured_logfiles_.begin(),
+ [extension](std::vector<std::string> in) {
+ std::transform(in.begin(), in.end(), in.begin(),
+ [extension](const std::string &in_str) {
+ return absl::StrCat(in_str, extension);
+ });
+ return in;
+ });
+ }
+
// Config and factory.
aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
SimulatedEventLoopFactory event_loop_factory_;
@@ -540,7 +638,7 @@
// UUIDs and parts UUIDs.
std::vector<FlatbufferVector<LogFileHeader>> log_header;
for (std::string_view f : logfiles_) {
- log_header.emplace_back(ReadHeader(f));
+ log_header.emplace_back(ReadHeader(f).value());
logfile_uuids.insert(log_header.back().message().log_event_uuid()->str());
parts_uuids.insert(log_header.back().message().parts_uuid()->str());
}
@@ -1116,64 +1214,96 @@
}
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ VerifyParts(sorted_parts);
+}
- EXPECT_EQ(sorted_parts.size(), 2u);
+// Tests that we can sort a bunch of parts with an empty part. We should ignore
+// it and remove it from the sorted list.
+TEST_F(MultinodeLoggerTest, SortEmptyParts) {
+ // Make a bunch of parts.
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
- // Count up the number of UUIDs and make sure they are what we expect as a
- // sanity check.
- std::set<std::string> log_event_uuids;
- std::set<std::string> parts_uuids;
- std::set<std::string> both_uuids;
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
- size_t missing_rt_count = 0;
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
- std::vector<std::string> logger_nodes;
- for (const LogFile &log_file : sorted_parts) {
- EXPECT_FALSE(log_file.log_event_uuid.empty());
- log_event_uuids.insert(log_file.log_event_uuid);
- logger_nodes.emplace_back(log_file.logger_node);
- both_uuids.insert(log_file.log_event_uuid);
-
- for (const LogParts &part : log_file.parts) {
- EXPECT_NE(part.monotonic_start_time, aos::monotonic_clock::min_time)
- << ": " << part;
- missing_rt_count +=
- part.realtime_start_time == aos::realtime_clock::min_time;
-
- EXPECT_TRUE(log_event_uuids.find(part.log_event_uuid) !=
- log_event_uuids.end());
- EXPECT_NE(part.node, "");
- parts_uuids.insert(part.parts_uuid);
- both_uuids.insert(part.parts_uuid);
- }
+ event_loop_factory_.RunFor(chrono::milliseconds(2000));
}
- // We won't have RT timestamps for 5 log files. We don't log the RT start
- // time on remote nodes because we don't know it and would be guessing. And
- // the log reader can actually do a better job.
- EXPECT_EQ(missing_rt_count, 5u);
+ // TODO(austin): Should we flip out if the file can't open?
+ const std::string kEmptyFile("foobarinvalidfiledoesnotexist.bfbs");
- EXPECT_EQ(log_event_uuids.size(), 2u);
- EXPECT_EQ(parts_uuids.size(), ToLogReaderVector(sorted_parts).size());
- EXPECT_EQ(log_event_uuids.size() + parts_uuids.size(), both_uuids.size());
+ aos::util::WriteStringToFileOrDie(kEmptyFile, "");
+ logfiles_.emplace_back(kEmptyFile);
- // Test that each list of parts is in order. Don't worry about the ordering
- // between part file lists though.
- // (inner vectors all need to be in order, but outer one doesn't matter).
- EXPECT_THAT(ToLogReaderVector(sorted_parts),
- ::testing::UnorderedElementsAreArray(structured_logfiles_));
-
- EXPECT_THAT(logger_nodes, ::testing::UnorderedElementsAre("pi1", "pi2"));
-
- EXPECT_NE(sorted_parts[0].realtime_start_time, aos::realtime_clock::min_time);
- EXPECT_NE(sorted_parts[1].realtime_start_time, aos::realtime_clock::min_time);
-
- EXPECT_NE(sorted_parts[0].monotonic_start_time,
- aos::monotonic_clock::min_time);
- EXPECT_NE(sorted_parts[1].monotonic_start_time,
- aos::monotonic_clock::min_time);
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ VerifyParts(sorted_parts, {kEmptyFile});
}
+#ifdef LZMA
+// Tests that we can sort a bunch of parts with an empty .xz file in there. The
+// empty file should be ignored.
+TEST_F(MultinodeLoggerTest, SortEmptyCompressedParts) {
+ // Make a bunch of parts.
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger, "", true);
+ StartLogger(&pi2_logger, "", true);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(2000));
+ }
+
+ // TODO(austin): Should we flip out if the file can't open?
+ const std::string kEmptyFile("foobarinvalidfiledoesnotexist.bfbs.xz");
+
+ AddExtension(".xz");
+
+ aos::util::WriteStringToFileOrDie(kEmptyFile, "");
+ logfiles_.emplace_back(kEmptyFile);
+
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ VerifyParts(sorted_parts, {kEmptyFile});
+}
+
+// Tests that we can sort a bunch of parts with the end missing off a compressed
+// file. We should use the part we can read.
+TEST_F(MultinodeLoggerTest, SortTruncatedCompressedParts) {
+ // Make a bunch of parts.
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger, "", true);
+ StartLogger(&pi2_logger, "", true);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(2000));
+ }
+
+ // Append everything with .xz.
+ AddExtension(".xz");
+
+ // Strip off the end of one of the files. Pick one with a lot of data.
+ ::std::string compressed_contents =
+ aos::util::ReadFileToStringOrDie(logfiles_[0]);
+
+ aos::util::WriteStringToFileOrDie(
+ logfiles_[0],
+ compressed_contents.substr(0, compressed_contents.size() - 100));
+
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ VerifyParts(sorted_parts);
+}
+#endif
+
// Tests that if we remap a remapped channel, it shows up correctly.
TEST_F(MultinodeLoggerTest, RemapLoggedChannel) {
{
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index a8cafda..d3f2e11 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -5,13 +5,13 @@
namespace aos::logger {
namespace {
-// Returns if `status` is not an error code, otherwise logs the appropriate
-// error message and crashes.
-void CheckLzmaCodeIsOk(lzma_ret status) {
+// Returns true if `status` is not an error code, false if it is recoverable, or
+// otherwise logs the appropriate error message and crashes.
+bool LzmaCodeIsOk(lzma_ret status) {
switch (status) {
case LZMA_OK:
case LZMA_STREAM_END:
- return;
+ return true;
case LZMA_MEM_ERROR:
LOG(FATAL) << "Memory allocation failed:" << status;
case LZMA_OPTIONS_ERROR:
@@ -31,9 +31,11 @@
case LZMA_FORMAT_ERROR:
LOG(FATAL) << "File format not recognized: " << status;
case LZMA_DATA_ERROR:
- LOG(FATAL) << "Compressed file is corrupt: " << status;
+ LOG(WARNING) << "Compressed file is corrupt: " << status;
+ return false;
case LZMA_BUF_ERROR:
- LOG(FATAL) << "Compressed file is truncated or corrupt: " << status;
+ LOG(WARNING) << "Compressed file is truncated or corrupt: " << status;
+ return false;
default:
LOG(FATAL) << "Unexpected return value: " << status;
}
@@ -50,7 +52,7 @@
lzma_ret status =
lzma_easy_encoder(&stream_, compression_preset_, LZMA_CHECK_CRC64);
- CheckLzmaCodeIsOk(status);
+ CHECK(LzmaCodeIsOk(status));
stream_.avail_out = 0;
VLOG(2) << "LzmaEncoder: Initialization succeeded.";
}
@@ -125,7 +127,7 @@
// Encode the data.
lzma_ret status = lzma_code(&stream_, action);
- CheckLzmaCodeIsOk(status);
+ CHECK(LzmaCodeIsOk(status));
if (action == LZMA_FINISH) {
if (status == LZMA_STREAM_END) {
// This is returned when lzma_code is all done.
@@ -143,12 +145,12 @@
}
LzmaDecoder::LzmaDecoder(std::string_view filename)
- : dummy_decoder_(filename), stream_(LZMA_STREAM_INIT) {
+ : dummy_decoder_(filename), stream_(LZMA_STREAM_INIT), filename_(filename) {
compressed_data_.resize(kBufSize);
lzma_ret status =
lzma_stream_decoder(&stream_, UINT64_MAX, LZMA_CONCATENATED);
- CheckLzmaCodeIsOk(status);
+ CHECK(LzmaCodeIsOk(status)) << "Failed initializing LZMA stream decoder.";
stream_.avail_out = 0;
VLOG(2) << "LzmaDecoder: Initialization succeeded.";
}
@@ -186,7 +188,14 @@
finished_ = true;
return (end - begin) - stream_.avail_out;
}
- CheckLzmaCodeIsOk(status);
+
+ // If we fail to decompress, give up. Return everything that has been
+ // produced so far.
+ if (!LzmaCodeIsOk(status)) {
+ finished_ = true;
+ LOG(WARNING) << filename_ << " is truncated or corrupted.";
+ return (end - begin) - stream_.avail_out;
+ }
}
return end - begin;
}
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index 8caca1b..1b6a0fe 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -73,6 +73,9 @@
// Flag that represents whether or not all the data from the file has been
// successfully decoded.
bool finished_ = false;
+
+ // Filename we are decompressing.
+ std::string filename_;
};
} // namespace aos::logger
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
index 2d619c4..2a56740 100644
--- a/aos/events/logging/lzma_encoder_test.cc
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -1,6 +1,8 @@
#include "aos/events/logging/lzma_encoder.h"
#include "aos/events/logging/buffer_encoder_param_test.h"
+#include "aos/util/file.h"
+#include "gmock/gmock.h"
#include "gtest/gtest.h"
namespace aos::logger::testing {
@@ -15,4 +17,64 @@
}),
::testing::Range(0, 100)));
+// Tests that we return as much of the file as we can read if the end is
+// corrupted.
+TEST_F(BufferEncoderBaseTest, CorruptedBuffer) {
+ std::uniform_int_distribution<int> quantity_distribution(20, 60);
+ const char *const test_dir = CHECK_NOTNULL(getenv("TEST_TMPDIR"));
+ const std::string file_path = std::string(test_dir) + "/foo";
+
+ std::vector<std::vector<uint8_t>> encoded_buffers;
+ {
+ const int encode_chunks = quantity_distribution(*random_number_generator());
+ const auto encoder = std::make_unique<LzmaEncoder>(2);
+ encoded_buffers = CreateAndEncode(encode_chunks, encoder.get());
+ encoder->Finish();
+
+ std::string contents = "";
+ for (auto span : encoder->queue()) {
+ absl::StrAppend(
+ &contents,
+ std::string_view(reinterpret_cast<const char *>(span.data()),
+ span.size()));
+ }
+ aos::util::WriteStringToFileOrDie(
+ file_path, contents.substr(0, contents.size() - 200));
+ }
+
+ const size_t total_encoded_size = TotalSize(encoded_buffers);
+
+ // Try decoding in multiple random chunkings.
+ for (int i = 0; i < 20; ++i) {
+ const auto decoder = std::make_unique<LzmaDecoder>(file_path);
+ std::vector<std::vector<uint8_t>> decoded_buffers;
+ size_t total_decoded_size = 0;
+ while (true) {
+ const int chunk_size = quantity_distribution(*random_number_generator());
+ std::vector<uint8_t> chunk(chunk_size);
+ const size_t read_result =
+ decoder->Read(chunk.data(), chunk.data() + chunk_size);
+ // Eventually we'll get here, once the decoder is really sure it's done.
+ if (read_result == 0) {
+ // Sanity check the math in the test code.
+ LOG(INFO) << "Decoded " << total_decoded_size << " encoded "
+ << total_encoded_size;
+ CHECK_EQ(total_decoded_size, TotalSize(decoded_buffers));
+ break;
+ }
+ // If we're at the end, trim off the 0s so our comparison later works out.
+ chunk.resize(read_result);
+ total_decoded_size += read_result;
+ decoded_buffers.emplace_back(std::move(chunk));
+ }
+ auto flattened_encoded = Flatten(encoded_buffers);
+ auto flattened_decoded = Flatten(decoded_buffers);
+
+ ASSERT_LE(flattened_decoded.size(), flattened_encoded.size());
+ flattened_encoded.resize(flattened_decoded.size());
+
+ ASSERT_THAT(flattened_decoded, ::testing::Eq(flattened_encoded));
+ }
+}
+
} // namespace aos::logger::testing