Use log source for log reading
Change-Id: I4d6018a6117f5864cda38a5e6485c6d08a782999
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 62c19ae..36bac42 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1376,8 +1376,8 @@
return result;
}
-MessageReader::MessageReader(std::string_view filename)
- : span_reader_(filename),
+MessageReader::MessageReader(SpanReader span_reader)
+ : span_reader_(std::move(span_reader)),
raw_log_file_header_(
SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
@@ -1387,7 +1387,8 @@
raw_log_file_header = ReadHeader(&span_reader_);
// Make sure something was read.
- CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
+ CHECK(raw_log_file_header)
+ << ": Failed to read header from: " << span_reader_.filename();
raw_log_file_header_ = std::move(*raw_log_file_header);
@@ -1401,7 +1402,7 @@
chrono::duration<double>(FLAGS_max_out_of_order))
: chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
- VLOG(1) << "Opened " << filename << " as node "
+ VLOG(1) << "Opened " << span_reader_.filename() << " as node "
<< FlatbufferToJson(log_file_header()->node());
}
@@ -1560,18 +1561,32 @@
&DestroyAndFree);
}
-PartsMessageReader::PartsMessageReader(LogParts log_parts)
- : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
- if (parts_.parts.size() >= 2) {
- next_message_reader_.emplace(parts_.parts[1]);
+SpanReader PartsMessageReader::MakeSpanReader(
+ const LogPartsAccess &log_parts_access, size_t part_number) {
+ const auto part = log_parts_access.GetPartAt(part_number);
+ if (log_parts_access.log_source().has_value()) {
+ return SpanReader(part,
+ log_parts_access.log_source().value()->GetDecoder(part));
+ } else {
+ return SpanReader(part);
+ }
+}
+
+PartsMessageReader::PartsMessageReader(LogPartsAccess log_parts_access)
+ : log_parts_access_(std::move(log_parts_access)),
+ message_reader_(MakeSpanReader(log_parts_access_, 0)) {
+ if (log_parts_access_.size() >= 2) {
+ next_message_reader_.emplace(MakeSpanReader(log_parts_access_, 1));
}
ComputeBootCounts();
}
void PartsMessageReader::ComputeBootCounts() {
- boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
+ boot_counts_.assign(configuration::NodesCount(log_parts_access_.config()),
std::nullopt);
+ const auto boots = log_parts_access_.parts().boots;
+
// We have 3 vintages of log files with different amounts of information.
if (log_file_header()->has_boot_uuids()) {
// The new hotness with the boots explicitly listed out. We can use the log
@@ -1580,10 +1595,10 @@
size_t node_index = 0;
for (const flatbuffers::String *boot_uuid :
*log_file_header()->boot_uuids()) {
- CHECK(parts_.boots);
+ CHECK(boots);
if (boot_uuid->size() != 0) {
- auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
- if (it != parts_.boots->boot_count_map.end()) {
+ auto it = boots->boot_count_map.find(boot_uuid->str());
+ if (it != boots->boot_count_map.end()) {
boot_counts_[node_index] = it->second;
}
} else if (parts().boots->boots[node_index].size() == 1u) {
@@ -1595,11 +1610,10 @@
// Older multi-node logs which are guarenteed to have UUIDs logged, or
// single node log files with boot UUIDs in the header. We only know how to
// order certain boots in certain circumstances.
- if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
+ if (configuration::MultiNode(log_parts_access_.config()) || boots) {
for (size_t node_index = 0; node_index < boot_counts_.size();
++node_index) {
- CHECK(parts_.boots);
- if (parts().boots->boots[node_index].size() == 1u) {
+ if (boots->boots[node_index].size() == 1u) {
boot_counts_[node_index] = 0;
}
}
@@ -1624,17 +1638,18 @@
// start time.
// TODO(austin): Does this work with startup when we don't know the
// remote start time too? Look at one of those logs to compare.
- if (monotonic_sent_time >
- parts_.monotonic_start_time + max_out_of_order_duration()) {
+ if (monotonic_sent_time > log_parts_access_.parts().monotonic_start_time +
+ max_out_of_order_duration()) {
after_start_ = true;
}
if (after_start_) {
CHECK_GE(monotonic_sent_time,
newest_timestamp_ - max_out_of_order_duration())
<< ": Max out of order of " << max_out_of_order_duration().count()
- << "ns exceeded. " << parts_ << ", start time is "
- << parts_.monotonic_start_time << " currently reading "
- << filename();
+ << "ns exceeded. " << log_parts_access_.parts()
+ << ", start time is "
+ << log_parts_access_.parts().monotonic_start_time
+ << " currently reading " << filename();
}
return message;
}
@@ -1645,7 +1660,7 @@
}
void PartsMessageReader::NextLog() {
- if (next_part_index_ == parts_.parts.size()) {
+ if (next_part_index_ == log_parts_access_.size()) {
CHECK(!next_message_reader_);
done_ = true;
return;
@@ -1653,8 +1668,9 @@
CHECK(next_message_reader_);
message_reader_ = std::move(*next_message_reader_);
ComputeBootCounts();
- if (next_part_index_ + 1 < parts_.parts.size()) {
- next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
+ if (next_part_index_ + 1 < log_parts_access_.size()) {
+ next_message_reader_.emplace(
+ MakeSpanReader(log_parts_access_, next_part_index_ + 1));
} else {
next_message_reader_.reset();
}
@@ -1748,8 +1764,8 @@
return os;
}
-MessageSorter::MessageSorter(LogParts log_parts)
- : parts_message_reader_(log_parts),
+MessageSorter::MessageSorter(const LogPartsAccess log_parts_access)
+ : parts_message_reader_(log_parts_access),
source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
}
@@ -1850,7 +1866,7 @@
const auto parts = log_files.SelectParts(node_name, boot_count);
node_ = configuration::GetNodeIndex(parts.config(), node_name);
- for (LogParts part : parts) {
+ for (LogPartsAccess part : parts) {
message_sorters_.emplace_back(std::move(part));
}