Add helper classes for log reading
The goal is to inject span reader factory deep into the log reading
and sorting. LogFilesContainer encapsulates log files and provides
a couple of useful validations and abstractions to simplify log sorting.
It is the first part of change. Follow one will push SelectedLogParts
even deeper into the log reader.
Change-Id: Ic5253bd2b7c87fbbf55ad8d39a480af2871ddb71
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index d0957bf..af8a4e8 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -238,33 +238,28 @@
LogReader::LogReader(std::string_view filename,
const Configuration *replay_configuration,
const ReplayChannels *replay_channels)
- : LogReader(SortParts({std::string(filename)}), replay_configuration,
- replay_channels) {}
+ : LogReader(LogFilesContainer(SortParts({std::string(filename)})),
+ replay_configuration, replay_channels) {}
LogReader::LogReader(std::vector<LogFile> log_files,
const Configuration *replay_configuration,
const ReplayChannels *replay_channels)
+ : LogReader(LogFilesContainer(std::move(log_files)), replay_configuration,
+ replay_channels) {}
+
+LogReader::LogReader(LogFilesContainer log_files,
+ const Configuration *replay_configuration,
+ const ReplayChannels *replay_channels)
: log_files_(std::move(log_files)),
replay_configuration_(replay_configuration),
replay_channels_(replay_channels) {
SetStartTime(FLAGS_start_time);
SetEndTime(FLAGS_end_time);
- CHECK_GT(log_files_.size(), 0u);
{
- // Validate that we have the same config everwhere. This will be true if
- // all the parts were sorted together and the configs match.
- const Configuration *config = nullptr;
- for (const LogFile &log_file : log_files_) {
- if (log_file.config.get() == nullptr) {
- LOG(FATAL) << "Couldn't find a config in " << log_file;
- }
- if (config == nullptr) {
- config = log_file.config.get();
- } else {
- CHECK_EQ(config, log_file.config.get());
- }
- }
+ // Log files container validates that log files shared the same config.
+ const Configuration *config = log_files_.config();
+ CHECK_NOTNULL(config);
}
if (replay_channels_ != nullptr) {
@@ -398,7 +393,7 @@
}
const Configuration *LogReader::logged_configuration() const {
- return log_files_[0].config.get();
+ return log_files_.config();
}
const Configuration *LogReader::configuration() const {
@@ -603,25 +598,23 @@
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop_factory_->configuration(), logged_configuration(),
- log_files_[0].boots, FLAGS_skip_order_validation,
+ log_files_.front_boots(), FLAGS_skip_order_validation,
chrono::duration_cast<chrono::nanoseconds>(
chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
std::vector<TimestampMapper *> timestamp_mappers;
for (const Node *node : configuration::GetNodes(configuration())) {
- const size_t node_index =
- configuration::GetNodeIndex(configuration(), node);
- std::vector<LogParts> filtered_parts = FilterPartsForNode(
- log_files_, node != nullptr ? node->name()->string_view() : "");
+ size_t node_index = configuration::GetNodeIndex(configuration(), node);
+ std::string_view node_name = MaybeNodeName(node);
// We don't run with threading on the buffering for simulated event loops
// because we haven't attempted to validate how the interactions beteen the
// buffering and the timestamp mapper works when running multiple nodes
// concurrently.
states_[node_index] = std::make_unique<State>(
- filtered_parts.size() == 0u
+ !log_files_.ContainsPartsForNode(node_name)
? nullptr
- : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+ : std::make_unique<TimestampMapper>(node_name, log_files_),
filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndices(node),
before_send_callbacks_);
@@ -792,21 +785,20 @@
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop->configuration(), logged_configuration(),
- log_files_[0].boots, FLAGS_skip_order_validation,
+ log_files_.front_boots(), FLAGS_skip_order_validation,
chrono::duration_cast<chrono::nanoseconds>(
chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
std::vector<TimestampMapper *> timestamp_mappers;
for (const Node *node : configuration::GetNodes(configuration())) {
+ auto node_name = MaybeNodeName(node);
const size_t node_index =
configuration::GetNodeIndex(configuration(), node);
- std::vector<LogParts> filtered_parts = FilterPartsForNode(
- log_files_, node != nullptr ? node->name()->string_view() : "");
states_[node_index] = std::make_unique<State>(
- filtered_parts.size() == 0u
+ !log_files_.ContainsPartsForNode(node_name)
? nullptr
- : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+ : std::make_unique<TimestampMapper>(node_name, log_files_),
filters_.get(), std::bind(&LogReader::NoticeRealtimeEnd, this), node,
State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndices(node),
before_send_callbacks_);
@@ -1115,8 +1107,8 @@
// case and tell the user what is happening so they can either update
// their config to log the channel or can find a log with the data.
const std::vector<std::string> logger_nodes =
- FindLoggerNodes(log_files_);
- if (logger_nodes.size()) {
+ log_files_.logger_nodes();
+ if (!logger_nodes.empty()) {
// We have old logs which don't have the logger nodes logged. In
// that case, we can't be helpful :(
bool data_logged = false;
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index b691b30..0486665 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -9,6 +9,8 @@
#include <vector>
#include "flatbuffers/flatbuffers.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
#include "aos/condition.h"
#include "aos/events/event_loop.h"
@@ -102,6 +104,9 @@
LogReader(std::vector<LogFile> log_files,
const Configuration *replay_configuration = nullptr,
const ReplayChannels *replay_channels = nullptr);
+ LogReader(LogFilesContainer log_files,
+ const Configuration *replay_configuration = nullptr,
+ const ReplayChannels *replay_channels = nullptr);
~LogReader();
// Registers all the callbacks to send the log file data out on an event loop
@@ -334,10 +339,7 @@
return event_loop_factory_;
}
- std::string_view name() const { return log_files_[0].name; }
- std::string_view log_event_uuid() const {
- return log_files_[0].log_event_uuid;
- }
+ std::string_view name() const { return log_files_.name(); }
// Set whether to exit the SimulatedEventLoopFactory when we finish reading
// the logfile.
@@ -420,7 +422,7 @@
// entire event loop once all nodes are stopped.
void NoticeRealtimeEnd();
- const std::vector<LogFile> log_files_;
+ const LogFilesContainer log_files_;
// Class to manage sending RemoteMessages on the provided node after the
// correct delay.
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 582c3f5..0566120 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -1998,45 +1998,6 @@
return sorter.SortParts();
}
-std::vector<std::string> FindNodes(const std::vector<LogFile> &parts) {
- std::set<std::string> nodes;
- for (const LogFile &log_file : parts) {
- for (const LogParts &part : log_file.parts) {
- nodes.insert(part.node);
- }
- }
- std::vector<std::string> node_list;
- while (!nodes.empty()) {
- node_list.emplace_back(std::move(nodes.extract(nodes.begin()).value()));
- }
- return node_list;
-}
-
-std::vector<std::string> FindLoggerNodes(const std::vector<LogFile> &parts) {
- std::set<std::string> nodes;
- for (const LogFile &log_file : parts) {
- nodes.insert(log_file.logger_node);
- }
- std::vector<std::string> node_list;
- while (!nodes.empty()) {
- node_list.emplace_back(nodes.extract(nodes.begin()).value());
- }
- return node_list;
-}
-
-std::vector<LogParts> FilterPartsForNode(const std::vector<LogFile> &parts,
- std::string_view node) {
- std::vector<LogParts> result;
- for (const LogFile &log_file : parts) {
- for (const LogParts &part : log_file.parts) {
- if (part.node == node) {
- result.emplace_back(part);
- }
- }
- }
- return result;
-}
-
std::ostream &operator<<(std::ostream &stream, const LogFile &file) {
stream << "{\n";
if (!file.log_event_uuid.empty()) {
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 266619b..41a3498 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -4,6 +4,9 @@
#include <iostream>
#include <map>
#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
#include <vector>
#include "aos/configuration.h"
@@ -137,15 +140,6 @@
// Sort parts of a single log.
std::vector<LogFile> SortParts(const LogSource &log_source);
-// Finds all the nodes which have parts logged from their point of view.
-std::vector<std::string> FindNodes(const std::vector<LogFile> &parts);
-// Finds all the parts which are from the point of view of a single node.
-std::vector<LogParts> FilterPartsForNode(const std::vector<LogFile> &parts,
- std::string_view node);
-
-// Finds all the nodes on which the loggers which generated these log files ran.
-std::vector<std::string> FindLoggerNodes(const std::vector<LogFile> &parts);
-
// Recursively searches the file/folder for .bfbs and .bfbs.xz files and adds
// them to the vector.
void FindLogs(std::vector<std::string> *files, std::string filename);
@@ -157,6 +151,186 @@
// Recursively searches for logfiles in argv[1] and onward.
std::vector<std::string> FindLogs(int argc, char **argv);
+// Validates that collection of log files or log parts shares the same configs.
+template <typename TCollection>
+bool CheckMatchingConfigs(const TCollection &items) {
+ const Configuration *config = nullptr;
+ for (const auto &item : items) {
+ VLOG(1) << item;
+ if (config == nullptr) {
+ config = item.config.get();
+ } else {
+ if (config != item.config.get()) {
+ LOG(ERROR) << ": Config mismatched: " << config << " vs. "
+ << item.config.get();
+ return false;
+ }
+ }
+ }
+ if (config == nullptr) {
+ LOG(ERROR) << ": No configs are found";
+ return false;
+ }
+ return true;
+}
+
+// Collection of log parts that associated with pair: node and boot.
+class SelectedLogParts {
+ public:
+ SelectedLogParts(std::optional<const LogSource *> log_source,
+ std::string_view node_name, size_t boot_count,
+ std::vector<LogParts> log_parts)
+ : log_source_(log_source),
+ node_name_(node_name),
+ boot_count_(boot_count),
+ log_parts_(std::move(log_parts)) {
+ CHECK_GT(log_parts_.size(), 0u) << ": Nothing was selected for node "
+ << node_name << " boot " << boot_count;
+ configs_matched_ = CheckMatchingConfigs(log_parts_);
+
+ // Enforce that we are sorting things only from a single node from a single
+ // boot.
+ const std::string_view part0_source_boot_uuid =
+ log_parts_.front().source_boot_uuid;
+ for (const auto &part : log_parts_) {
+ CHECK_EQ(node_name_, part.node) << ": Can't merge different nodes.";
+ CHECK_EQ(part0_source_boot_uuid, part.source_boot_uuid)
+ << ": Can't merge different boots.";
+ CHECK_EQ(boot_count_, part.boot_count);
+ }
+ }
+
+ // Use items in fancy loops.
+ auto begin() { return log_parts_.begin(); }
+ auto end() { return log_parts_.end(); }
+ auto cbegin() const { return log_parts_.cbegin(); }
+ auto cend() const { return log_parts_.cend(); }
+ auto begin() const { return log_parts_.begin(); }
+ auto end() const { return log_parts_.end(); }
+
+ const Configuration *config() const {
+ // TODO (Alexei): it is a first usage of assumption that all parts have
+ // matching configs. Should it be strong requirement and validated in the
+ // constructor?
+ CHECK(configs_matched_);
+ return log_parts_.front().config.get();
+ }
+
+ const std::string &node_name() const { return node_name_; }
+
+ // Number of boots found in the log parts.
+ size_t boot_count() const { return boot_count_; }
+
+ private:
+ std::optional<const LogSource *> log_source_;
+ std::string node_name_;
+ size_t boot_count_;
+ std::vector<LogParts> log_parts_;
+
+ // Indicates that all parts shared the same config.
+ bool configs_matched_;
+};
+
+// Container that keeps a sorted list of log files and provides functions that
+// commonly used during log reading.
+class LogFilesContainer {
+ public:
+ // Initializes log file container with the list of sorted files (results of
+ // SortParts).
+ explicit LogFilesContainer(std::vector<LogFile> log_files)
+ : LogFilesContainer(std::nullopt, std::move(log_files)) {}
+
+ // Sorts and initializes log container with files from an abstract log source.
+ explicit LogFilesContainer(const LogSource *log_source)
+ : LogFilesContainer(log_source, SortParts(*log_source)) {}
+
+ // Returns true when at least on of the log files associated with node.
+ bool ContainsPartsForNode(std::string_view node_name) const {
+ // TODO (Alexei): Implement
+ // https://en.cppreference.com/w/cpp/container/unordered_map/find with C++20
+ return nodes_boots_.count(std::string(node_name)) > 0;
+ }
+
+ // Returns numbers of reboots found in log files associated with the node.
+ size_t BootsForNode(std::string_view node_name) const {
+ const auto &node_item = nodes_boots_.find(std::string(node_name));
+ CHECK(node_item != nodes_boots_.end())
+ << ": Missing parts associated with node " << node_name;
+ CHECK_GT(node_item->second, 0u) << ": No boots for node " << node_name;
+ return node_item->second;
+ }
+
+ // Get only log parts that associated with node and boot number.
+ SelectedLogParts SelectParts(std::string_view node_name,
+ size_t boot_count) const {
+ std::vector<LogParts> result;
+ for (const LogFile &log_file : log_files_) {
+ for (const LogParts &part : log_file.parts) {
+ if (part.node == node_name && part.boot_count == boot_count) {
+ result.emplace_back(part);
+ }
+ }
+ }
+ return SelectedLogParts(log_source_, node_name, boot_count, result);
+ }
+
+ // It provides access to boots of the first element. I'm not sure why...
+ const auto &front_boots() const { return log_files_.front().boots; }
+
+ // Access the configuration shared with all log files in the container.
+ const Configuration *config() const {
+ // TODO (Alexei): it is a first usage of assumption that all parts have
+ // matching configs. Should it be strong requirement and validated in the
+ // constructor?
+ CHECK(configs_matched_);
+ return log_files_.front().config.get();
+ }
+
+ // List of logger nodes for given set of log files.
+ const auto &logger_nodes() const { return logger_nodes_; }
+
+ // TODO (Alexei): it is not clear what it represents for multiple log events.
+ // Review its usage.
+ std::string_view name() const { return log_files_[0].name; }
+
+ private:
+ LogFilesContainer(std::optional<const LogSource *> log_source,
+ std::vector<LogFile> log_files)
+ : log_source_(log_source), log_files_(std::move(log_files)) {
+ CHECK_GT(log_files_.size(), 0u);
+
+ std::unordered_set<std::string> logger_nodes;
+
+ // Scan and collect all related nodes and number of reboots per node.
+ for (const LogFile &log_file : log_files_) {
+ for (const LogParts &part : log_file.parts) {
+ auto node_item = nodes_boots_.find(part.node);
+ if (node_item != nodes_boots_.end()) {
+ node_item->second = std::max(node_item->second, part.boot_count + 1);
+ } else {
+ nodes_boots_[part.node] = part.boot_count + 1;
+ }
+ }
+ logger_nodes.insert(log_file.logger_node);
+ }
+ while (!logger_nodes.empty()) {
+ logger_nodes_.emplace_back(
+ logger_nodes.extract(logger_nodes.begin()).value());
+ }
+ configs_matched_ = CheckMatchingConfigs(log_files_);
+ }
+
+ std::optional<const LogSource *> log_source_;
+ std::vector<LogFile> log_files_;
+
+ // Keeps information about nodes and number of reboots per node.
+ std::unordered_map<std::string, size_t> nodes_boots_;
+ std::vector<std::string> logger_nodes_;
+
+ // Indicates that all parts shared the same config.
+ bool configs_matched_;
+};
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index b76bcad..62c19ae 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1845,21 +1845,12 @@
return ss.str();
}
-PartsMerger::PartsMerger(std::vector<LogParts> parts) {
- CHECK_GE(parts.size(), 1u);
- // Enforce that we are sorting things only from a single node from a single
- // boot.
- const std::string_view part0_node = parts[0].node;
- const std::string_view part0_source_boot_uuid = parts[0].source_boot_uuid;
- for (size_t i = 1; i < parts.size(); ++i) {
- CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
- CHECK_EQ(part0_source_boot_uuid, parts[i].source_boot_uuid)
- << ": Can't merge different boots.";
- }
+PartsMerger::PartsMerger(std::string_view node_name, size_t boot_count,
+ const LogFilesContainer &log_files) {
+ const auto parts = log_files.SelectParts(node_name, boot_count);
+ node_ = configuration::GetNodeIndex(parts.config(), node_name);
- node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
-
- for (LogParts &part : parts) {
+ for (LogParts part : parts) {
message_sorters_.emplace_back(std::move(part));
}
@@ -1977,26 +1968,14 @@
current_ = nullptr;
}
-BootMerger::BootMerger(std::vector<LogParts> files) {
- std::vector<std::vector<LogParts>> boots;
-
- // Now, we need to split things out by boot.
- for (size_t i = 0; i < files.size(); ++i) {
- const size_t boot_count = files[i].boot_count;
- if (boot_count + 1 > boots.size()) {
- boots.resize(boot_count + 1);
- }
- boots[boot_count].emplace_back(std::move(files[i]));
- }
-
- parts_mergers_.reserve(boots.size());
- for (size_t i = 0; i < boots.size(); ++i) {
+BootMerger::BootMerger(std::string_view node_name,
+ const LogFilesContainer &log_files) {
+ size_t number_of_boots = log_files.BootsForNode(node_name);
+ parts_mergers_.reserve(number_of_boots);
+ for (size_t i = 0; i < number_of_boots; ++i) {
VLOG(2) << "Boot " << i;
- for (auto &p : boots[i]) {
- VLOG(2) << "Part " << p;
- }
parts_mergers_.emplace_back(
- std::make_unique<PartsMerger>(std::move(boots[i])));
+ std::make_unique<PartsMerger>(node_name, i, log_files));
}
}
@@ -2030,8 +2009,9 @@
return results;
}
-TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
- : boot_merger_(std::move(parts)),
+TimestampMapper::TimestampMapper(std::string_view node_name,
+ const LogFilesContainer &log_files)
+ : boot_merger_(node_name, log_files),
timestamp_callback_([](TimestampedMessage *) {}) {
for (const LogParts *part : boot_merger_.Parts()) {
if (!configuration_) {
@@ -2041,7 +2021,7 @@
}
}
const Configuration *config = configuration_.get();
- // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
+ // Only fill out nodes_data_ if there are nodes. Otherwise, everything is
// pretty simple.
if (configuration::MultiNode(config)) {
nodes_data_.resize(config->nodes()->size());
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 67b6b84..3281bc8 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -592,7 +592,8 @@
// boot.
class PartsMerger {
public:
- PartsMerger(std::vector<LogParts> parts);
+ PartsMerger(std::string_view node_name, size_t boot_count,
+ const LogFilesContainer &log_files);
// Copying and moving will mess up the internal raw pointers. Just don't do
// it.
@@ -657,7 +658,7 @@
// stream.
class BootMerger {
public:
- BootMerger(std::vector<LogParts> file);
+ BootMerger(std::string_view node_name, const LogFilesContainer &log_files);
// Copying and moving will mess up the internal raw pointers. Just don't do
// it.
@@ -716,7 +717,8 @@
// notifying when new data is queued as well as queueing until a point in time.
class TimestampMapper {
public:
- TimestampMapper(std::vector<LogParts> file);
+ TimestampMapper(std::string_view node_name,
+ const LogFilesContainer &log_files);
// Copying and moving will mess up the internal raw pointers. Just don't do
// it.
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 8c057e9..cd6da67 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -764,9 +764,10 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts.size(), 1u);
- PartsMerger merger(FilterPartsForNode(parts, "pi1"));
+ PartsMerger merger("pi1", 0, log_files);
EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
@@ -865,9 +866,10 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts.size(), 1u);
- PartsMerger merger(FilterPartsForNode(parts, "pi1"));
+ PartsMerger merger("pi1", 0, log_files);
EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
@@ -929,16 +931,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
-
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1071,20 +1074,21 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
-
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
// mapper0 will not provide any messages while mapper1 will provide all
// messages due to the channel filter callbacks used
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
mapper0.set_replay_channels_callback(
[&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
mapper1.set_replay_channels_callback(
@@ -1211,19 +1215,15 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
-
- for (const auto &p : parts) {
- LOG(INFO) << p;
- }
-
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts.size(), 1u);
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1335,16 +1335,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1454,16 +1455,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1537,16 +1539,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1621,16 +1624,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1695,16 +1699,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1761,9 +1766,10 @@
const std::vector<LogFile> parts =
SortParts({logfile0_, logfile1_, logfile2_});
+ LogFilesContainer log_files(parts);
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
@@ -1799,12 +1805,13 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -1872,16 +1879,17 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts[0].logger_node, "pi1");
ASSERT_EQ(parts[1].logger_node, "pi2");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -2130,10 +2138,11 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ LogFilesContainer log_files(parts);
ASSERT_EQ(parts.size(), 1u);
ASSERT_EQ(parts[0].parts.size(), 2u);
- BootMerger merger(FilterPartsForNode(parts, "pi2"));
+ BootMerger merger("pi2", log_files);
EXPECT_EQ(merger.node(), 1u);
@@ -2308,6 +2317,7 @@
const std::vector<LogFile> parts =
SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+ LogFilesContainer log_files(parts);
for (const auto &x : parts) {
LOG(INFO) << x;
@@ -2316,11 +2326,11 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
@@ -2526,6 +2536,7 @@
const std::vector<LogFile> parts =
SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+ LogFilesContainer log_files(parts);
for (const auto &x : parts) {
LOG(INFO) << x;
@@ -2534,11 +2545,11 @@
ASSERT_EQ(parts[0].logger_node, "pi1");
size_t mapper0_count = 0;
- TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper0("pi1", log_files);
mapper0.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper0_count; });
size_t mapper1_count = 0;
- TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ TimestampMapper mapper1("pi2", log_files);
mapper1.set_timestamp_callback(
[&](TimestampedMessage *) { ++mapper1_count; });
diff --git a/aos/events/logging/single_node_merge.cc b/aos/events/logging/single_node_merge.cc
index cc78c2b..afb88af 100644
--- a/aos/events/logging/single_node_merge.cc
+++ b/aos/events/logging/single_node_merge.cc
@@ -19,30 +19,10 @@
namespace chrono = std::chrono;
-std::string LogFileVectorToString(std::vector<logger::LogFile> log_files) {
- std::stringstream ss;
- for (const auto &f : log_files) {
- ss << f << "\n";
- }
- return ss.str();
-}
-
int Main(int argc, char **argv) {
const std::vector<std::string> unsorted_logfiles = FindLogs(argc, argv);
- const std::vector<LogFile> log_files = SortParts(unsorted_logfiles);
-
- CHECK_GT(log_files.size(), 0u);
- // Validate that we have the same config everwhere. This will be true if
- // all the parts were sorted together and the configs match.
- const Configuration *config = nullptr;
- for (const LogFile &log_file : log_files) {
- VLOG(1) << log_file;
- if (config == nullptr) {
- config = log_file.config.get();
- } else {
- CHECK_EQ(config, log_file.config.get());
- }
- }
+ const LogFilesContainer log_files(SortParts(unsorted_logfiles));
+ const Configuration *config = log_files.config();
// Haven't tested this on a single node log, and don't really see a need to
// right now. The higher layers just work.
@@ -53,16 +33,14 @@
TimestampMapper *node_mapper = nullptr;
for (const Node *node : configuration::GetNodes(config)) {
- std::vector<LogParts> filtered_parts =
- FilterPartsForNode(log_files, node->name()->string_view());
-
+ const auto node_name = MaybeNodeName(node);
// Confirm that all the parts are from the same boot if there are enough
// parts to not be from the same boot.
- if (!filtered_parts.empty()) {
+ if (log_files.ContainsPartsForNode(node_name)) {
// Filter the parts relevant to each node when building the mapper.
mappers.emplace_back(
- std::make_unique<TimestampMapper>(std::move(filtered_parts)));
- if (node->name()->string_view() == FLAGS_node) {
+ std::make_unique<TimestampMapper>(node_name, log_files));
+ if (node_name == FLAGS_node) {
node_mapper = mappers.back().get();
}
} else {
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index 67cbc44..cf0ea5a 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -17,30 +17,10 @@
namespace chrono = std::chrono;
-std::string LogFileVectorToString(std::vector<logger::LogFile> log_files) {
- std::stringstream ss;
- for (const auto &f : log_files) {
- ss << f << "\n";
- }
- return ss.str();
-}
-
int Main(int argc, char **argv) {
const std::vector<std::string> unsorted_logfiles = FindLogs(argc, argv);
- const std::vector<LogFile> log_files = SortParts(unsorted_logfiles);
-
- CHECK_GT(log_files.size(), 0u);
- // Validate that we have the same config everwhere. This will be true if
- // all the parts were sorted together and the configs match.
- const Configuration *config = nullptr;
- for (const LogFile &log_file : log_files) {
- VLOG(1) << log_file;
- if (config == nullptr) {
- config = log_file.config.get();
- } else {
- CHECK_EQ(config, log_file.config.get());
- }
- }
+ const LogFilesContainer log_files(SortParts(unsorted_logfiles));
+ const Configuration *config = log_files.config();
CHECK(configuration::MultiNode(config))
<< ": Timestamps only make sense in a multi-node world.";
@@ -49,15 +29,13 @@
std::vector<std::unique_ptr<TimestampMapper>> mappers;
for (const Node *node : configuration::GetNodes(config)) {
- std::vector<LogParts> filtered_parts =
- FilterPartsForNode(log_files, node->name()->string_view());
-
+ auto node_name = MaybeNodeName(node);
// Confirm that all the parts are from the same boot if there are enough
// parts to not be from the same boot.
- if (!filtered_parts.empty()) {
+ if (!log_files.ContainsPartsForNode(node_name)) {
// Filter the parts relevant to each node when building the mapper.
mappers.emplace_back(
- std::make_unique<TimestampMapper>(std::move(filtered_parts)));
+ std::make_unique<TimestampMapper>(node_name, log_files));
} else {
mappers.emplace_back(nullptr);
}
@@ -65,7 +43,7 @@
// Now, build up the estimator used to solve for time.
message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
- config, config, log_files[0].boots, FLAGS_skip_order_validation,
+ config, config, log_files.front_boots(), FLAGS_skip_order_validation,
chrono::seconds(0));
multinode_estimator.set_reboot_found(
[config](distributed_clock::time_point reboot_time,