Sort parts by UUID and part_index
Also update log_cat to support this! This makes it significantly more
memory efficient to read logs with lots of parts.
Change-Id: I5ce70f9342b3ab1c7a7823a878ebd890c00ce04f
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index 4ef9237..f406d24 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -122,12 +122,14 @@
LOG(FATAL) << "Expected at least 1 logfile as an argument.";
}
- std::vector<std::vector<std::string>> logfiles;
-
+ std::vector<std::string> unsorted_logfiles;
for (int i = 1; i < argc; ++i) {
- logfiles.emplace_back(std::vector<std::string>{std::string(argv[i])});
+ unsorted_logfiles.emplace_back(std::string(argv[i]));
}
+ std::vector<std::vector<std::string>> logfiles =
+ aos::logger::SortParts(unsorted_logfiles);
+
aos::logger::LogReader reader(logfiles);
aos::FastStringBuilder builder;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 214fa2b..938ede7 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -278,19 +278,36 @@
FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename) {
SpanReader span_reader(filename);
- // Make sure we have enough to read the size.
absl::Span<const uint8_t> config_data = span_reader.ReadMessage();
// Make sure something was read.
CHECK(config_data != absl::Span<const uint8_t>())
<< ": Failed to read header from: " << filename;
- // And copy the config so we have it forever.
+ // And copy the config so we have it forever, removing the size prefix.
std::vector<uint8_t> data(
config_data.begin() + sizeof(flatbuffers::uoffset_t), config_data.end());
return FlatbufferVector<LogFileHeader>(std::move(data));
}
+FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
+ size_t n) {
+ SpanReader span_reader(filename);
+ absl::Span<const uint8_t> data_span = span_reader.ReadMessage();
+ for (size_t i = 0; i < n + 1; ++i) {
+ data_span = span_reader.ReadMessage();
+
+ // Make sure something was read.
+ CHECK(data_span != absl::Span<const uint8_t>())
+ << ": Failed to read data from: " << filename;
+ }
+
+ // And copy the data so we have it forever.
+ std::vector<uint8_t> data(data_span.begin() + sizeof(flatbuffers::uoffset_t),
+ data_span.end());
+ return FlatbufferVector<MessageHeader>(std::move(data));
+}
+
MessageReader::MessageReader(std::string_view filename)
: span_reader_(filename),
raw_log_file_header_(FlatbufferVector<LogFileHeader>::Empty()) {
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index d1fbeb2..87ea229 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -96,6 +96,8 @@
int channel_index, LogType log_type);
FlatbufferVector<LogFileHeader> ReadHeader(std::string_view filename);
+FlatbufferVector<MessageHeader> ReadNthMessage(std::string_view filename,
+ size_t n);
// Class to read chunks out of a log file.
class SpanReader {
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 85809b1..637d1ae 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -568,6 +568,99 @@
} while (last_synchronized_time_ + polling_period_ < monotonic_now);
}
+std::vector<std::vector<std::string>> SortParts(
+ const std::vector<std::string> &parts) {
+ // Start by grouping all parts by UUID, and extracting the part index.
+ std::map<std::string, std::vector<std::pair<std::string, int>>> parts_list;
+
+ // Sort part files without UUIDs and part indexes as well. Extract everything
+ // useful from the log in the first pass, then sort later.
+ struct LogPart {
+ std::string filename;
+ monotonic_clock::time_point start_time;
+ monotonic_clock::time_point first_message_time;
+ };
+
+ std::vector<LogPart> old_parts;
+
+ for (const std::string &part : parts) {
+ FlatbufferVector<LogFileHeader> log_header = ReadHeader(part);
+
+ // Looks like an old log. No UUID, index, and also single node. We have
+ // little to no multi-node log files in the wild without part UUIDs and
+ // indexes which we care much about.
+ if (!log_header.message().has_parts_uuid() &&
+ !log_header.message().has_parts_index() &&
+ !log_header.message().has_node()) {
+ LogPart log_part;
+ log_part.filename = part;
+ log_part.start_time = monotonic_clock::time_point(
+ chrono::nanoseconds(log_header.message().monotonic_start_time()));
+ FlatbufferVector<MessageHeader> first_message = ReadNthMessage(part, 0);
+ log_part.first_message_time = monotonic_clock::time_point(
+ chrono::nanoseconds(first_message.message().monotonic_sent_time()));
+ old_parts.emplace_back(std::move(log_part));
+ continue;
+ }
+
+ CHECK(log_header.message().has_parts_uuid());
+ CHECK(log_header.message().has_parts_index());
+
+ const std::string parts_uuid = log_header.message().parts_uuid()->str();
+ auto it = parts_list.find(parts_uuid);
+ if (it == parts_list.end()) {
+ it = parts_list
+ .insert(std::make_pair(
+ parts_uuid, std::vector<std::pair<std::string, int>>{}))
+ .first;
+ }
+ it->second.emplace_back(
+ std::make_pair(part, log_header.message().parts_index()));
+ }
+
+ CHECK_NE(old_parts.empty(), parts_list.empty())
+ << ": Can't have a mix of old and new parts.";
+
+ if (!old_parts.empty()) {
+ // Confirm they all have the same start time. Old loggers always used the
+ // same start time.
+ for (const LogPart &p : old_parts) {
+ CHECK_EQ(old_parts[0].start_time, p.start_time);
+ }
+ // Sort by the oldest message in each file.
+ std::sort(old_parts.begin(), old_parts.end(),
+ [](const LogPart &a, const LogPart &b) {
+ return a.first_message_time < b.first_message_time;
+ });
+
+ // Produce the final form.
+ std::vector<std::string> sorted_old_parts;
+ sorted_old_parts.reserve(old_parts.size());
+ for (LogPart &p : old_parts) {
+ sorted_old_parts.emplace_back(std::move(p.filename));
+ }
+ return std::vector<std::vector<std::string>>{std::move(sorted_old_parts)};
+ }
+
+ // Now, sort them and produce the final vector form.
+ std::vector<std::vector<std::string>> result;
+ result.reserve(parts_list.size());
+ for (auto &part : parts_list) {
+ std::sort(part.second.begin(), part.second.end(),
+ [](const std::pair<std::string, int> &a,
+ const std::pair<std::string, int> &b) {
+ return a.second < b.second;
+ });
+ std::vector<std::string> result_line;
+ result_line.reserve(part.second.size());
+ for (std::pair<std::string, int> &p : part.second) {
+ result_line.emplace_back(std::move(p.first));
+ }
+ result.emplace_back(std::move(result_line));
+ }
+ return result;
+}
+
LogReader::LogReader(std::string_view filename,
const Configuration *replay_configuration)
: LogReader(std::vector<std::string>{std::string(filename)},
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index d4ed786..c4ce769 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -375,6 +375,10 @@
std::vector<NodeState> node_state_;
};
+// Takes a bunch of parts and sorts them based on part_uuid and part_index.
+std::vector<std::vector<std::string>> SortParts(
+ const std::vector<std::string> &parts);
+
// We end up with one of the following 3 log file types.
//
// Single node logged as the source node.
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index bf062b8..47fe9e2 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -958,6 +958,31 @@
reader.Deregister();
}
+// Tests that we can sort a bunch of parts into the pre-determined sorted parts.
+TEST_F(MultinodeLoggerTest, SortParts) {
+ // Make a bunch of parts.
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(2000));
+ }
+
+ const std::vector<std::vector<std::string>> sorted_parts =
+ SortParts(logfiles_);
+
+ // Test that each list of parts is in order. Don't worry about the ordering
+ // between part file lists though.
+ // (inner vectors all need to be in order, but outer one doesn't matter).
+ EXPECT_THAT(sorted_parts,
+ ::testing::UnorderedElementsAreArray(structured_logfiles_));
+}
+
// TODO(austin): We can write a test which recreates a logfile and confirms that
// we get it back. That is the ultimate test.