Rename of couple log reading utility classes
It is opinionated and hopefully clarifies purpose:
LogPartsSorter => MessageSorter - because it sorts messages
NodeMerger => PartsMerger - it merges parts
Change-Id: Ic79cdcbaa27c90f70a23db482d08098e0cb3e133
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index fbec1b5..cda6561 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1748,12 +1748,12 @@
return os;
}
-LogPartsSorter::LogPartsSorter(LogParts log_parts)
+MessageSorter::MessageSorter(LogParts log_parts)
: parts_message_reader_(log_parts),
source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
}
-Message *LogPartsSorter::Front() {
+Message *MessageSorter::Front() {
// Queue up data until enough data has been queued that the front message is
// sorted enough to be safe to pop. This may do nothing, so we should make
// sure the nothing path is checked quickly.
@@ -1825,9 +1825,9 @@
return &(*messages_.begin());
}
-void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
+void MessageSorter::PopFront() { messages_.erase(messages_.begin()); }
-std::string LogPartsSorter::DebugString() const {
+std::string MessageSorter::DebugString() const {
std::stringstream ss;
ss << "messages: [\n";
int count = 0;
@@ -1845,7 +1845,7 @@
return ss.str();
}
-NodeMerger::NodeMerger(std::vector<LogParts> parts) {
+PartsMerger::PartsMerger(std::vector<LogParts> parts) {
CHECK_GE(parts.size(), 1u);
// Enforce that we are sorting things only from a single node from a single
// boot.
@@ -1860,25 +1860,25 @@
node_ = configuration::GetNodeIndex(parts[0].config.get(), part0_node);
for (LogParts &part : parts) {
- parts_sorters_.emplace_back(std::move(part));
+ message_sorters_.emplace_back(std::move(part));
}
monotonic_start_time_ = monotonic_clock::max_time;
realtime_start_time_ = realtime_clock::min_time;
- for (const LogPartsSorter &parts_sorter : parts_sorters_) {
+ for (const MessageSorter &message_sorter : message_sorters_) {
// We want to capture the earliest meaningful start time here. The start
// time defaults to min_time when there's no meaningful value to report, so
// let's ignore those.
- if (parts_sorter.monotonic_start_time() != monotonic_clock::min_time) {
+ if (message_sorter.monotonic_start_time() != monotonic_clock::min_time) {
bool accept = false;
// We want to prioritize start times from the logger node. Really, we
// want to prioritize start times with a valid realtime_clock time. So,
// if we have a start time without a RT clock, prefer a start time with a
// RT clock, even it if is later.
- if (parts_sorter.realtime_start_time() != realtime_clock::min_time) {
+ if (message_sorter.realtime_start_time() != realtime_clock::min_time) {
// We've got a good one. See if the current start time has a good RT
// clock, or if we should use this one instead.
- if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
+ if (message_sorter.monotonic_start_time() < monotonic_start_time_) {
accept = true;
} else if (realtime_start_time_ == realtime_clock::min_time) {
// The previous start time doesn't have a good RT time, so it is very
@@ -1888,14 +1888,14 @@
}
} else if (realtime_start_time_ == realtime_clock::min_time) {
// We don't have a RT time, so take the oldest.
- if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
+ if (message_sorter.monotonic_start_time() < monotonic_start_time_) {
accept = true;
}
}
if (accept) {
- monotonic_start_time_ = parts_sorter.monotonic_start_time();
- realtime_start_time_ = parts_sorter.realtime_start_time();
+ monotonic_start_time_ = message_sorter.monotonic_start_time();
+ realtime_start_time_ = message_sorter.realtime_start_time();
}
}
}
@@ -1907,16 +1907,16 @@
}
}
-std::vector<const LogParts *> NodeMerger::Parts() const {
+std::vector<const LogParts *> PartsMerger::Parts() const {
std::vector<const LogParts *> p;
- p.reserve(parts_sorters_.size());
- for (const LogPartsSorter &parts_sorter : parts_sorters_) {
- p.emplace_back(&parts_sorter.parts());
+ p.reserve(message_sorters_.size());
+ for (const MessageSorter &message_sorter : message_sorters_) {
+ p.emplace_back(&message_sorter.parts());
}
return p;
}
-Message *NodeMerger::Front() {
+Message *PartsMerger::Front() {
// Return the current Front if we have one, otherwise go compute one.
if (current_ != nullptr) {
Message *result = current_->Front();
@@ -1928,33 +1928,33 @@
// duplicates.
Message *oldest = nullptr;
sorted_until_ = monotonic_clock::max_time;
- for (LogPartsSorter &parts_sorter : parts_sorters_) {
- Message *m = parts_sorter.Front();
+ for (MessageSorter &message_sorter : message_sorters_) {
+ Message *m = message_sorter.Front();
if (!m) {
- sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
+ sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
continue;
}
if (oldest == nullptr || *m < *oldest) {
oldest = m;
- current_ = &parts_sorter;
+ current_ = &message_sorter;
} else if (*m == *oldest) {
// Found a duplicate. If there is a choice, we want the one which has
// the timestamp time.
if (!m->data->has_monotonic_timestamp_time) {
- parts_sorter.PopFront();
+ message_sorter.PopFront();
} else if (!oldest->data->has_monotonic_timestamp_time) {
current_->PopFront();
- current_ = &parts_sorter;
+ current_ = &message_sorter;
oldest = m;
} else {
CHECK_EQ(m->data->monotonic_timestamp_time,
oldest->data->monotonic_timestamp_time);
- parts_sorter.PopFront();
+ message_sorter.PopFront();
}
}
// PopFront may change this, so compute it down here.
- sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
+ sorted_until_ = std::min(sorted_until_, message_sorter.sorted_until());
}
if (oldest) {
@@ -1971,7 +1971,7 @@
return oldest;
}
-void NodeMerger::PopFront() {
+void PartsMerger::PopFront() {
CHECK(current_ != nullptr) << "Popping before calling Front()";
current_->PopFront();
current_ = nullptr;
@@ -1989,25 +1989,25 @@
boots[boot_count].emplace_back(std::move(files[i]));
}
- node_mergers_.reserve(boots.size());
+ parts_mergers_.reserve(boots.size());
for (size_t i = 0; i < boots.size(); ++i) {
VLOG(2) << "Boot " << i;
for (auto &p : boots[i]) {
VLOG(2) << "Part " << p;
}
- node_mergers_.emplace_back(
- std::make_unique<NodeMerger>(std::move(boots[i])));
+ parts_mergers_.emplace_back(
+ std::make_unique<PartsMerger>(std::move(boots[i])));
}
}
Message *BootMerger::Front() {
- Message *result = node_mergers_[index_]->Front();
+ Message *result = parts_mergers_[index_]->Front();
if (result != nullptr) {
return result;
}
- if (index_ + 1u == node_mergers_.size()) {
+ if (index_ + 1u == parts_mergers_.size()) {
// At the end of the last node merger, just return.
return nullptr;
} else {
@@ -2016,12 +2016,12 @@
}
}
-void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
+void BootMerger::PopFront() { parts_mergers_[index_]->PopFront(); }
std::vector<const LogParts *> BootMerger::Parts() const {
std::vector<const LogParts *> results;
- for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
- std::vector<const LogParts *> node_parts = node_merger->Parts();
+ for (const std::unique_ptr<PartsMerger> &parts_merger : parts_mergers_) {
+ std::vector<const LogParts *> node_parts = parts_merger->Parts();
results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
std::make_move_iterator(node_parts.end()));
@@ -2165,7 +2165,7 @@
CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
last_message_time_ = matched_messages_.back().monotonic_event_time;
- // We are thin wrapper around node_merger. Call it directly.
+ // We are thin wrapper around parts_merger. Call it directly.
boot_merger_.PopFront();
timestamp_callback_(&matched_messages_.back());
if (CheckReplayChannelsAndMaybePop(matched_messages_.back())) {
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index b04be9f..1ee09fa 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -535,9 +535,9 @@
std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m);
// Class to sort the resulting messages from a PartsMessageReader.
-class LogPartsSorter {
+class MessageSorter {
public:
- LogPartsSorter(LogParts log_parts);
+ MessageSorter(LogParts log_parts);
// Returns the parts that this is sorting messages from.
const LogParts &parts() const { return parts_message_reader_.parts(); }
@@ -581,18 +581,18 @@
std::vector<size_t> source_node_index_;
};
-// Class to run merge sort on the messages from multiple LogPartsSorter
-// instances.
-class NodeMerger {
+// Class to run merge sort on the messages associated with specific node and
+// boot.
+class PartsMerger {
public:
- NodeMerger(std::vector<LogParts> parts);
+ PartsMerger(std::vector<LogParts> parts);
// Copying and moving will mess up the internal raw pointers. Just don't do
// it.
- NodeMerger(NodeMerger const &) = delete;
- NodeMerger(NodeMerger &&) = delete;
- void operator=(NodeMerger const &) = delete;
- void operator=(NodeMerger &&) = delete;
+ PartsMerger(PartsMerger const &) = delete;
+ PartsMerger(PartsMerger &&) = delete;
+ void operator=(PartsMerger const &) = delete;
+ void operator=(PartsMerger &&) = delete;
// Node index in the configuration of this node.
int node() const { return node_; }
@@ -601,7 +601,7 @@
std::vector<const LogParts *> Parts() const;
const Configuration *configuration() const {
- return parts_sorters_[0].parts().config.get();
+ return message_sorters_[0].parts().config.get();
}
monotonic_clock::time_point monotonic_start_time() const {
@@ -626,10 +626,10 @@
private:
// Unsorted list of all parts sorters.
- std::vector<LogPartsSorter> parts_sorters_;
+ std::vector<MessageSorter> message_sorters_;
// Pointer to the parts sorter holding the current Front message if one
// exists, or nullptr if a new one needs to be found.
- LogPartsSorter *current_ = nullptr;
+ MessageSorter *current_ = nullptr;
// Cached sorted_until value.
aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
@@ -660,30 +660,31 @@
void operator=(BootMerger &&) = delete;
// Node index in the configuration of this node.
- int node() const { return node_mergers_[0]->node(); }
+ int node() const { return parts_mergers_[0]->node(); }
// List of parts being sorted together.
std::vector<const LogParts *> Parts() const;
const Configuration *configuration() const {
- return node_mergers_[0]->configuration();
+ return parts_mergers_[0]->configuration();
}
monotonic_clock::time_point monotonic_start_time(size_t boot) const {
- CHECK_LT(boot, node_mergers_.size());
- return node_mergers_[boot]->monotonic_start_time();
+ CHECK_LT(boot, parts_mergers_.size());
+ return parts_mergers_[boot]->monotonic_start_time();
}
realtime_clock::time_point realtime_start_time(size_t boot) const {
- CHECK_LT(boot, node_mergers_.size());
- return node_mergers_[boot]->realtime_start_time();
+ CHECK_LT(boot, parts_mergers_.size());
+ return parts_mergers_[boot]->realtime_start_time();
}
monotonic_clock::time_point monotonic_oldest_time(size_t boot) const {
- CHECK_LT(boot, node_mergers_.size());
- return node_mergers_[boot]->monotonic_oldest_time();
+ CHECK_LT(boot, parts_mergers_.size());
+ return parts_mergers_[boot]->monotonic_oldest_time();
}
bool started() const {
- return node_mergers_[index_]->sorted_until() != monotonic_clock::min_time ||
+ return parts_mergers_[index_]->sorted_until() !=
+ monotonic_clock::min_time ||
index_ != 0;
}
@@ -699,7 +700,7 @@
// TODO(austin): Sanjay points out this is pretty inefficient. Don't keep so
// many things open.
- std::vector<std::unique_ptr<NodeMerger>> node_mergers_;
+ std::vector<std::unique_ptr<PartsMerger>> parts_mergers_;
};
// Class to match timestamps with the corresponding data from other nodes.
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index eaeb4b7..8c057e9 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -583,13 +583,13 @@
std::vector<uint32_t> queue_index_;
};
-using LogPartsSorterTest = SortingElementTest;
-using LogPartsSorterDeathTest = LogPartsSorterTest;
-using NodeMergerTest = SortingElementTest;
+using MessageSorterTest = SortingElementTest;
+using MessageSorterDeathTest = MessageSorterTest;
+using PartsMergerTest = SortingElementTest;
using TimestampMapperTest = SortingElementTest;
// Tests that we can pull messages out of a log sorted in order.
-TEST_F(LogPartsSorterTest, Pull) {
+TEST_F(MessageSorterTest, Pull) {
const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
{
TestDetachedBufferWriter writer(logfile0_);
@@ -606,35 +606,35 @@
const std::vector<LogFile> parts = SortParts({logfile0_});
- LogPartsSorter parts_sorter(parts[0].parts[0]);
+ MessageSorter message_sorter(parts[0].parts[0]);
// Confirm we aren't sorted until any time until the message is popped.
// Peeking shouldn't change the sorted until time.
- EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+ EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
std::deque<Message> output;
- ASSERT_TRUE(parts_sorter.Front() != nullptr);
- output.emplace_back(std::move(*parts_sorter.Front()));
- parts_sorter.PopFront();
- EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+ ASSERT_TRUE(message_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*message_sorter.Front()));
+ message_sorter.PopFront();
+ EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
- ASSERT_TRUE(parts_sorter.Front() != nullptr);
- output.emplace_back(std::move(*parts_sorter.Front()));
- parts_sorter.PopFront();
- EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+ ASSERT_TRUE(message_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*message_sorter.Front()));
+ message_sorter.PopFront();
+ EXPECT_EQ(message_sorter.sorted_until(), e + chrono::milliseconds(1900));
- ASSERT_TRUE(parts_sorter.Front() != nullptr);
- output.emplace_back(std::move(*parts_sorter.Front()));
- parts_sorter.PopFront();
- EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+ ASSERT_TRUE(message_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*message_sorter.Front()));
+ message_sorter.PopFront();
+ EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
- ASSERT_TRUE(parts_sorter.Front() != nullptr);
- output.emplace_back(std::move(*parts_sorter.Front()));
- parts_sorter.PopFront();
- EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+ ASSERT_TRUE(message_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*message_sorter.Front()));
+ message_sorter.PopFront();
+ EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::max_time);
- ASSERT_TRUE(parts_sorter.Front() == nullptr);
+ ASSERT_TRUE(message_sorter.Front() == nullptr);
EXPECT_EQ(output[0].timestamp.boot, 0);
EXPECT_EQ(output[0].timestamp.time, e + chrono::milliseconds(1000));
@@ -647,7 +647,7 @@
}
// Tests that we can pull messages out of a log sorted in order.
-TEST_F(LogPartsSorterTest, WayBeforeStart) {
+TEST_F(MessageSorterTest, WayBeforeStart) {
const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
{
TestDetachedBufferWriter writer(logfile0_);
@@ -666,11 +666,11 @@
const std::vector<LogFile> parts = SortParts({logfile0_});
- LogPartsSorter parts_sorter(parts[0].parts[0]);
+ MessageSorter message_sorter(parts[0].parts[0]);
// Confirm we aren't sorted until any time until the message is popped.
// Peeking shouldn't change the sorted until time.
- EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+ EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
std::deque<Message> output;
@@ -678,13 +678,13 @@
{e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
e + chrono::milliseconds(1900), monotonic_clock::max_time,
monotonic_clock::max_time}) {
- ASSERT_TRUE(parts_sorter.Front() != nullptr);
- output.emplace_back(std::move(*parts_sorter.Front()));
- parts_sorter.PopFront();
- EXPECT_EQ(parts_sorter.sorted_until(), t);
+ ASSERT_TRUE(message_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*message_sorter.Front()));
+ message_sorter.PopFront();
+ EXPECT_EQ(message_sorter.sorted_until(), t);
}
- ASSERT_TRUE(parts_sorter.Front() == nullptr);
+ ASSERT_TRUE(message_sorter.Front() == nullptr);
EXPECT_EQ(output[0].timestamp.boot, 0u);
EXPECT_EQ(output[0].timestamp.time, e - chrono::milliseconds(1000));
@@ -699,7 +699,7 @@
}
// Tests that messages too far out of order trigger death.
-TEST_F(LogPartsSorterDeathTest, Pull) {
+TEST_F(MessageSorterDeathTest, Pull) {
const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
{
TestDetachedBufferWriter writer(logfile0_);
@@ -717,25 +717,25 @@
const std::vector<LogFile> parts = SortParts({logfile0_});
- LogPartsSorter parts_sorter(parts[0].parts[0]);
+ MessageSorter message_sorter(parts[0].parts[0]);
// Confirm we aren't sorted until any time until the message is popped.
// Peeking shouldn't change the sorted until time.
- EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+ EXPECT_EQ(message_sorter.sorted_until(), monotonic_clock::min_time);
std::deque<Message> output;
- ASSERT_TRUE(parts_sorter.Front() != nullptr);
- parts_sorter.PopFront();
- ASSERT_TRUE(parts_sorter.Front() != nullptr);
- ASSERT_TRUE(parts_sorter.Front() != nullptr);
- parts_sorter.PopFront();
+ ASSERT_TRUE(message_sorter.Front() != nullptr);
+ message_sorter.PopFront();
+ ASSERT_TRUE(message_sorter.Front() != nullptr);
+ ASSERT_TRUE(message_sorter.Front() != nullptr);
+ message_sorter.PopFront();
- EXPECT_DEATH({ parts_sorter.Front(); },
+ EXPECT_DEATH({ message_sorter.Front(); },
"Max out of order of 100000000ns exceeded.");
}
// Tests that we can merge data from 2 separate files, including duplicate data.
-TEST_F(NodeMergerTest, TwoFileMerger) {
+TEST_F(PartsMergerTest, TwoFileMerger) {
const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
{
TestDetachedBufferWriter writer0(logfile0_);
@@ -766,7 +766,7 @@
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
ASSERT_EQ(parts.size(), 1u);
- NodeMerger merger(FilterPartsForNode(parts, "pi1"));
+ PartsMerger merger(FilterPartsForNode(parts, "pi1"));
EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
@@ -823,7 +823,7 @@
// Tests that we can merge timestamps with various combinations of
// monotonic_timestamp_time.
-TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
+TEST_F(PartsMergerTest, TwoFileTimestampMerger) {
const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
{
TestDetachedBufferWriter writer0(logfile0_);
@@ -867,7 +867,7 @@
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
ASSERT_EQ(parts.size(), 1u);
- NodeMerger merger(FilterPartsForNode(parts, "pi1"));
+ PartsMerger merger(FilterPartsForNode(parts, "pi1"));
EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);