Use log source for log reading
Change-Id: I4d6018a6117f5864cda38a5e6485c6d08a782999
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index af8a4e8..874fe43 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -598,7 +598,7 @@
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop_factory_->configuration(), logged_configuration(),
- log_files_.front_boots(), FLAGS_skip_order_validation,
+ log_files_.boots(), FLAGS_skip_order_validation,
chrono::duration_cast<chrono::nanoseconds>(
chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
@@ -785,7 +785,7 @@
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop->configuration(), logged_configuration(),
- log_files_.front_boots(), FLAGS_skip_order_validation,
+ log_files_.boots(), FLAGS_skip_order_validation,
chrono::duration_cast<chrono::nanoseconds>(
chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 0566120..dc7078a 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -2084,5 +2084,113 @@
return stream;
}
+// Validates that collection of log files or log parts shares the same configs.
+template <typename TCollection>
+bool CheckMatchingConfigs(const TCollection &items) {
+ const Configuration *config = nullptr;
+ for (const auto &item : items) {
+ VLOG(1) << item;
+ if (config == nullptr) {
+ config = GetConfig(item);
+ } else {
+ if (config != GetConfig(item)) {
+ LOG(ERROR) << ": Config mismatched: " << config << " vs. "
+ << GetConfig(item);
+ return false;
+ }
+ }
+ }
+ if (config == nullptr) {
+ LOG(ERROR) << ": No configs are found";
+ return false;
+ }
+ return true;
+}
+
+// Provides unified access to config field stored in LogFile. It is used in
+// CheckMatchingConfigs.
+inline const Configuration *GetConfig(const LogFile &log_file) {
+ return log_file.config.get();
+}
+
+// Output of LogPartsAccess for debug purposes.
+std::ostream &operator<<(std::ostream &stream,
+ const LogPartsAccess &log_parts_access) {
+ stream << log_parts_access.parts();
+ return stream;
+}
+
+SelectedLogParts::SelectedLogParts(std::string_view node_name,
+ size_t boot_index,
+ std::vector<LogPartsAccess> log_parts)
+ : node_name_(node_name),
+ boot_index_(boot_index),
+ log_parts_(std::move(log_parts)) {
+ CHECK_GT(log_parts_.size(), 0u) << ": Nothing was selected for node "
+ << node_name_ << " boot " << boot_index_;
+ CHECK(CheckMatchingConfigs(log_parts_));
+ config_ = log_parts_.front().config();
+
+ // Enforce that we are sorting things only from a single node from a single
+ // boot.
+ const std::string_view part0_source_boot_uuid =
+ log_parts_.front().source_boot_uuid();
+ for (const auto &part : log_parts_) {
+ CHECK_EQ(node_name_, part.node_name()) << ": Can't merge different nodes.";
+ CHECK_EQ(part0_source_boot_uuid, part.source_boot_uuid())
+ << ": Can't merge different boots.";
+ CHECK_EQ(boot_index_, part.boot_count());
+ }
+}
+
+LogFilesContainer::LogFilesContainer(
+ std::optional<const LogSource *> log_source, std::vector<LogFile> log_files)
+ : log_source_(log_source), log_files_(std::move(log_files)) {
+ CHECK_GT(log_files_.size(), 0u);
+ CHECK(CheckMatchingConfigs(log_files_));
+ config_ = log_files_.front().config.get();
+ boots_ = log_files_.front().boots;
+
+ std::unordered_set<std::string> logger_nodes;
+
+ // Scan and collect all related nodes and number of reboots per node.
+ for (const LogFile &log_file : log_files_) {
+ for (const LogParts &part : log_file.parts) {
+ auto node_item = nodes_boots_.find(part.node);
+ if (node_item != nodes_boots_.end()) {
+ node_item->second = std::max(node_item->second, part.boot_count + 1);
+ } else {
+ nodes_boots_[part.node] = part.boot_count + 1;
+ }
+ }
+ logger_nodes.insert(log_file.logger_node);
+ }
+ while (!logger_nodes.empty()) {
+ logger_nodes_.emplace_back(
+ logger_nodes.extract(logger_nodes.begin()).value());
+ }
+}
+
+size_t LogFilesContainer::BootsForNode(std::string_view node_name) const {
+ const auto &node_item = nodes_boots_.find(std::string(node_name));
+ CHECK(node_item != nodes_boots_.end())
+ << ": Missing parts associated with node " << node_name;
+ CHECK_GT(node_item->second, 0u) << ": No boots for node " << node_name;
+ return node_item->second;
+}
+
+SelectedLogParts LogFilesContainer::SelectParts(std::string_view node_name,
+ size_t boot_index) const {
+ std::vector<LogPartsAccess> result;
+ for (const LogFile &log_file : log_files_) {
+ for (const LogParts &part : log_file.parts) {
+ if (part.node == node_name && part.boot_count == boot_index) {
+ result.emplace_back(log_source_, part);
+ }
+ }
+ }
+ return SelectedLogParts(node_name, boot_index, result);
+}
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 41a3498..7461e89 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -151,84 +151,77 @@
// Recursively searches for logfiles in argv[1] and onward.
std::vector<std::string> FindLogs(int argc, char **argv);
-// Validates that collection of log files or log parts shares the same configs.
-template <typename TCollection>
-bool CheckMatchingConfigs(const TCollection &items) {
- const Configuration *config = nullptr;
- for (const auto &item : items) {
- VLOG(1) << item;
- if (config == nullptr) {
- config = item.config.get();
- } else {
- if (config != item.config.get()) {
- LOG(ERROR) << ": Config mismatched: " << config << " vs. "
- << item.config.get();
- return false;
- }
- }
+// Proxy container to bind log parts with log source. It helps with reading logs
+// from virtual media such as memory or S3.
+class LogPartsAccess {
+ public:
+ LogPartsAccess(std::optional<const LogSource *> log_source,
+ LogParts log_parts)
+ : log_source_(std::move(log_source)), log_parts_(std::move(log_parts)) {
+ CHECK(!log_parts_.parts.empty());
}
- if (config == nullptr) {
- LOG(ERROR) << ": No configs are found";
- return false;
+
+ std::string_view node_name() const { return log_parts_.node; }
+
+ std::string_view source_boot_uuid() const {
+ return log_parts_.source_boot_uuid;
}
- return true;
+
+ size_t boot_count() const { return log_parts_.boot_count; }
+
+ const Configuration *config() const { return log_parts_.config.get(); }
+
+ std::optional<const LogSource *> log_source() const { return log_source_; }
+
+ std::string GetPartAt(size_t index) const {
+ CHECK_LT(index, log_parts_.parts.size());
+ return log_parts_.parts[index];
+ }
+
+ // TODO (Alexei): do we need to reduce it to concrete operations?
+ const LogParts &parts() const { return log_parts_; }
+
+ size_t size() const { return log_parts_.parts.size(); }
+
+ private:
+ std::optional<const LogSource *> log_source_;
+ LogParts log_parts_;
+};
+
+// Provides unified access to config field stored in LogPartsAccess. It is used
+// in CheckMatchingConfigs.
+inline const Configuration *GetConfig(const LogPartsAccess &log_parts_access) {
+ return log_parts_access.config();
}
+// Output of LogPartsAccess for debug purposes.
+std::ostream &operator<<(std::ostream &stream,
+ const LogPartsAccess &log_parts_access);
+
// Collection of log parts that associated with pair: node and boot.
class SelectedLogParts {
public:
- SelectedLogParts(std::optional<const LogSource *> log_source,
- std::string_view node_name, size_t boot_count,
- std::vector<LogParts> log_parts)
- : log_source_(log_source),
- node_name_(node_name),
- boot_count_(boot_count),
- log_parts_(std::move(log_parts)) {
- CHECK_GT(log_parts_.size(), 0u) << ": Nothing was selected for node "
- << node_name << " boot " << boot_count;
- configs_matched_ = CheckMatchingConfigs(log_parts_);
-
- // Enforce that we are sorting things only from a single node from a single
- // boot.
- const std::string_view part0_source_boot_uuid =
- log_parts_.front().source_boot_uuid;
- for (const auto &part : log_parts_) {
- CHECK_EQ(node_name_, part.node) << ": Can't merge different nodes.";
- CHECK_EQ(part0_source_boot_uuid, part.source_boot_uuid)
- << ": Can't merge different boots.";
- CHECK_EQ(boot_count_, part.boot_count);
- }
- }
+ SelectedLogParts(std::string_view node_name, size_t boot_index,
+ std::vector<LogPartsAccess> log_parts);
// Use items in fancy loops.
- auto begin() { return log_parts_.begin(); }
- auto end() { return log_parts_.end(); }
- auto cbegin() const { return log_parts_.cbegin(); }
- auto cend() const { return log_parts_.cend(); }
auto begin() const { return log_parts_.begin(); }
auto end() const { return log_parts_.end(); }
- const Configuration *config() const {
- // TODO (Alexei): it is a first usage of assumption that all parts have
- // matching configs. Should it be strong requirement and validated in the
- // constructor?
- CHECK(configs_matched_);
- return log_parts_.front().config.get();
- }
+ // Config that shared across all log parts.
+ const Configuration *config() const { return config_; }
const std::string &node_name() const { return node_name_; }
// Number of boots found in the log parts.
- size_t boot_count() const { return boot_count_; }
+ size_t boot_count() const { return boot_index_; }
private:
- std::optional<const LogSource *> log_source_;
std::string node_name_;
- size_t boot_count_;
- std::vector<LogParts> log_parts_;
+ size_t boot_index_;
+ std::vector<LogPartsAccess> log_parts_;
- // Indicates that all parts shared the same config.
- bool configs_matched_;
+ const Configuration *config_;
};
// Container that keeps a sorted list of log files and provides functions that
@@ -252,39 +245,17 @@
}
// Returns numbers of reboots found in log files associated with the node.
- size_t BootsForNode(std::string_view node_name) const {
- const auto &node_item = nodes_boots_.find(std::string(node_name));
- CHECK(node_item != nodes_boots_.end())
- << ": Missing parts associated with node " << node_name;
- CHECK_GT(node_item->second, 0u) << ": No boots for node " << node_name;
- return node_item->second;
- }
+ size_t BootsForNode(std::string_view node_name) const;
// Get only log parts that associated with node and boot number.
SelectedLogParts SelectParts(std::string_view node_name,
- size_t boot_count) const {
- std::vector<LogParts> result;
- for (const LogFile &log_file : log_files_) {
- for (const LogParts &part : log_file.parts) {
- if (part.node == node_name && part.boot_count == boot_count) {
- result.emplace_back(part);
- }
- }
- }
- return SelectedLogParts(log_source_, node_name, boot_count, result);
- }
+ size_t boot_index) const;
- // It provides access to boots of the first element. I'm not sure why...
- const auto &front_boots() const { return log_files_.front().boots; }
+ // It provides access to boots logged by all log files in the container.
+ const std::shared_ptr<const Boots> &boots() const { return boots_; }
// Access the configuration shared with all log files in the container.
- const Configuration *config() const {
- // TODO (Alexei): it is a first usage of assumption that all parts have
- // matching configs. Should it be strong requirement and validated in the
- // constructor?
- CHECK(configs_matched_);
- return log_files_.front().config.get();
- }
+ const Configuration *config() const { return config_; }
// List of logger nodes for given set of log files.
const auto &logger_nodes() const { return logger_nodes_; }
@@ -295,40 +266,17 @@
private:
LogFilesContainer(std::optional<const LogSource *> log_source,
- std::vector<LogFile> log_files)
- : log_source_(log_source), log_files_(std::move(log_files)) {
- CHECK_GT(log_files_.size(), 0u);
-
- std::unordered_set<std::string> logger_nodes;
-
- // Scan and collect all related nodes and number of reboots per node.
- for (const LogFile &log_file : log_files_) {
- for (const LogParts &part : log_file.parts) {
- auto node_item = nodes_boots_.find(part.node);
- if (node_item != nodes_boots_.end()) {
- node_item->second = std::max(node_item->second, part.boot_count + 1);
- } else {
- nodes_boots_[part.node] = part.boot_count + 1;
- }
- }
- logger_nodes.insert(log_file.logger_node);
- }
- while (!logger_nodes.empty()) {
- logger_nodes_.emplace_back(
- logger_nodes.extract(logger_nodes.begin()).value());
- }
- configs_matched_ = CheckMatchingConfigs(log_files_);
- }
+ std::vector<LogFile> log_files);
std::optional<const LogSource *> log_source_;
std::vector<LogFile> log_files_;
+ const Configuration *config_;
+ std::shared_ptr<const Boots> boots_;
+
// Keeps information about nodes and number of reboots per node.
std::unordered_map<std::string, size_t> nodes_boots_;
std::vector<std::string> logger_nodes_;
-
- // Indicates that all parts shared the same config.
- bool configs_matched_;
};
} // namespace logger
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 62c19ae..36bac42 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1376,8 +1376,8 @@
return result;
}
-MessageReader::MessageReader(std::string_view filename)
- : span_reader_(filename),
+MessageReader::MessageReader(SpanReader span_reader)
+ : span_reader_(std::move(span_reader)),
raw_log_file_header_(
SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
set_crash_on_corrupt_message_flag(FLAGS_crash_on_corrupt_message);
@@ -1387,7 +1387,8 @@
raw_log_file_header = ReadHeader(&span_reader_);
// Make sure something was read.
- CHECK(raw_log_file_header) << ": Failed to read header from: " << filename;
+ CHECK(raw_log_file_header)
+ << ": Failed to read header from: " << span_reader_.filename();
raw_log_file_header_ = std::move(*raw_log_file_header);
@@ -1401,7 +1402,7 @@
chrono::duration<double>(FLAGS_max_out_of_order))
: chrono::nanoseconds(log_file_header()->max_out_of_order_duration());
- VLOG(1) << "Opened " << filename << " as node "
+ VLOG(1) << "Opened " << span_reader_.filename() << " as node "
<< FlatbufferToJson(log_file_header()->node());
}
@@ -1560,18 +1561,32 @@
&DestroyAndFree);
}
-PartsMessageReader::PartsMessageReader(LogParts log_parts)
- : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
- if (parts_.parts.size() >= 2) {
- next_message_reader_.emplace(parts_.parts[1]);
+SpanReader PartsMessageReader::MakeSpanReader(
+ const LogPartsAccess &log_parts_access, size_t part_number) {
+ const auto part = log_parts_access.GetPartAt(part_number);
+ if (log_parts_access.log_source().has_value()) {
+ return SpanReader(part,
+ log_parts_access.log_source().value()->GetDecoder(part));
+ } else {
+ return SpanReader(part);
+ }
+}
+
+PartsMessageReader::PartsMessageReader(LogPartsAccess log_parts_access)
+ : log_parts_access_(std::move(log_parts_access)),
+ message_reader_(MakeSpanReader(log_parts_access_, 0)) {
+ if (log_parts_access_.size() >= 2) {
+ next_message_reader_.emplace(MakeSpanReader(log_parts_access_, 1));
}
ComputeBootCounts();
}
void PartsMessageReader::ComputeBootCounts() {
- boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
+ boot_counts_.assign(configuration::NodesCount(log_parts_access_.config()),
std::nullopt);
+ const auto boots = log_parts_access_.parts().boots;
+
// We have 3 vintages of log files with different amounts of information.
if (log_file_header()->has_boot_uuids()) {
// The new hotness with the boots explicitly listed out. We can use the log
@@ -1580,10 +1595,10 @@
size_t node_index = 0;
for (const flatbuffers::String *boot_uuid :
*log_file_header()->boot_uuids()) {
- CHECK(parts_.boots);
+ CHECK(boots);
if (boot_uuid->size() != 0) {
- auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
- if (it != parts_.boots->boot_count_map.end()) {
+ auto it = boots->boot_count_map.find(boot_uuid->str());
+ if (it != boots->boot_count_map.end()) {
boot_counts_[node_index] = it->second;
}
} else if (parts().boots->boots[node_index].size() == 1u) {
@@ -1595,11 +1610,10 @@
// Older multi-node logs which are guarenteed to have UUIDs logged, or
// single node log files with boot UUIDs in the header. We only know how to
// order certain boots in certain circumstances.
- if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
+ if (configuration::MultiNode(log_parts_access_.config()) || boots) {
for (size_t node_index = 0; node_index < boot_counts_.size();
++node_index) {
- CHECK(parts_.boots);
- if (parts().boots->boots[node_index].size() == 1u) {
+ if (boots->boots[node_index].size() == 1u) {
boot_counts_[node_index] = 0;
}
}
@@ -1624,17 +1638,18 @@
// start time.
// TODO(austin): Does this work with startup when we don't know the
// remote start time too? Look at one of those logs to compare.
- if (monotonic_sent_time >
- parts_.monotonic_start_time + max_out_of_order_duration()) {
+ if (monotonic_sent_time > log_parts_access_.parts().monotonic_start_time +
+ max_out_of_order_duration()) {
after_start_ = true;
}
if (after_start_) {
CHECK_GE(monotonic_sent_time,
newest_timestamp_ - max_out_of_order_duration())
<< ": Max out of order of " << max_out_of_order_duration().count()
- << "ns exceeded. " << parts_ << ", start time is "
- << parts_.monotonic_start_time << " currently reading "
- << filename();
+ << "ns exceeded. " << log_parts_access_.parts()
+ << ", start time is "
+ << log_parts_access_.parts().monotonic_start_time
+ << " currently reading " << filename();
}
return message;
}
@@ -1645,7 +1660,7 @@
}
void PartsMessageReader::NextLog() {
- if (next_part_index_ == parts_.parts.size()) {
+ if (next_part_index_ == log_parts_access_.size()) {
CHECK(!next_message_reader_);
done_ = true;
return;
@@ -1653,8 +1668,9 @@
CHECK(next_message_reader_);
message_reader_ = std::move(*next_message_reader_);
ComputeBootCounts();
- if (next_part_index_ + 1 < parts_.parts.size()) {
- next_message_reader_.emplace(parts_.parts[next_part_index_ + 1]);
+ if (next_part_index_ + 1 < log_parts_access_.size()) {
+ next_message_reader_.emplace(
+ MakeSpanReader(log_parts_access_, next_part_index_ + 1));
} else {
next_message_reader_.reset();
}
@@ -1748,8 +1764,8 @@
return os;
}
-MessageSorter::MessageSorter(LogParts log_parts)
- : parts_message_reader_(log_parts),
+MessageSorter::MessageSorter(const LogPartsAccess log_parts_access)
+ : parts_message_reader_(log_parts_access),
source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
}
@@ -1850,7 +1866,7 @@
const auto parts = log_files.SelectParts(node_name, boot_count);
node_ = configuration::GetNodeIndex(parts.config(), node_name);
- for (LogParts part : parts) {
+ for (LogPartsAccess part : parts) {
message_sorters_.emplace_back(std::move(part));
}
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 3281bc8..9dc7d88 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -269,7 +269,11 @@
// handles any per-file state left before merging below.
class MessageReader {
public:
- MessageReader(std::string_view filename);
+ // TODO (Alexei): it's deprecated and needs to be removed.
+ explicit MessageReader(std::string_view filename)
+ : MessageReader(SpanReader(filename)) {}
+
+ explicit MessageReader(SpanReader span_reader);
std::string_view filename() const { return span_reader_.filename(); }
@@ -353,12 +357,17 @@
// A class to seamlessly read messages from a list of part files.
class PartsMessageReader {
public:
- PartsMessageReader(LogParts log_parts);
+ // TODO (Alexei): it's deprecated, need to removed.
+ explicit PartsMessageReader(LogParts log_parts)
+ : PartsMessageReader(LogPartsAccess(std::nullopt, std::move(log_parts))) {
+ }
+
+ explicit PartsMessageReader(LogPartsAccess log_parts_access);
std::string_view filename() const { return message_reader_.filename(); }
// Returns the LogParts that holds the filenames we are reading.
- const LogParts &parts() const { return parts_; }
+ const LogParts &parts() const { return log_parts_access_.parts(); }
const LogFileHeader *log_file_header() const {
return message_reader_.log_file_header();
@@ -389,12 +398,15 @@
}
private:
+ static SpanReader MakeSpanReader(const LogPartsAccess &log_parts_access,
+ size_t part_number);
+
// Opens the next log and updates message_reader_. Sets done_ if there is
// nothing more to do.
void NextLog();
void ComputeBootCounts();
- const LogParts parts_;
+ const LogPartsAccess log_parts_access_;
size_t next_part_index_ = 1u;
bool done_ = false;
@@ -544,7 +556,11 @@
// Class to sort the resulting messages from a PartsMessageReader.
class MessageSorter {
public:
- MessageSorter(LogParts log_parts);
+ // TODO (Alexei): it's deperecated and need to be removed.
+ explicit MessageSorter(LogParts log_parts)
+ : MessageSorter(LogPartsAccess(std::nullopt, std::move(log_parts))) {}
+
+ explicit MessageSorter(const LogPartsAccess log_parts_access);
// Returns the parts that this is sorting messages from.
const LogParts &parts() const { return parts_message_reader_.parts(); }
@@ -635,6 +651,7 @@
private:
// Unsorted list of all parts sorters.
std::vector<MessageSorter> message_sorters_;
+
// Pointer to the parts sorter holding the current Front message if one
// exists, or nullptr if a new one needs to be found.
MessageSorter *current_ = nullptr;
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index cf0ea5a..72f9de8 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -43,7 +43,7 @@
// Now, build up the estimator used to solve for time.
message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
- config, config, log_files.front_boots(), FLAGS_skip_order_validation,
+ config, config, log_files.boots(), FLAGS_skip_order_validation,
chrono::seconds(0));
multinode_estimator.set_reboot_found(
[config](distributed_clock::time_point reboot_time,