Add NodeMerger for merging sorted logs
Following
https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#
further, the next step is to sort all the log files together for a
single node, deduplicating here. This implements and tests that.
Change-Id: I506ae65b350d83e41ba011521c0860627ac662df
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 83d798d..4474c7e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -483,6 +483,10 @@
}
bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
+bool Message::operator==(const Message &m2) const {
+ return timestamp == m2.timestamp && channel_index == m2.channel_index &&
+ queue_index == m2.queue_index;
+}
std::ostream &operator<<(std::ostream &os, const Message &m) {
os << "{.channel_index=" << m.channel_index
@@ -555,6 +559,49 @@
return ss.str();
}
+NodeMerger::NodeMerger(std::vector<std::unique_ptr<LogPartsSorter>> parts)
+ : parts_sorters_(std::move(parts)) {}
+
+Message *NodeMerger::Front() {
+ // Return the current Front if we have one, otherwise go compute one.
+ if (current_ != nullptr) {
+ return current_->Front();
+ }
+
+ // Otherwise, do a simple search for the oldest message, deduplicating any
+ // duplicates.
+ Message *oldest = nullptr;
+ sorted_until_ = monotonic_clock::max_time;
+ for (std::unique_ptr<LogPartsSorter> &parts_sorter : parts_sorters_) {
+ Message *m = parts_sorter->Front();
+ if (!m) {
+ sorted_until_ = std::min(sorted_until_, parts_sorter->sorted_until());
+ continue;
+ }
+ if (oldest == nullptr || *m < *oldest) {
+ oldest = m;
+ current_ = parts_sorter.get();
+ } else if (*m == *oldest) {
+ // Found a duplicate. It doesn't matter which one we return. It is
+ // easiest to just drop the new one.
+ parts_sorter->PopFront();
+ }
+
+ // PopFront may change this, so compute it down here.
+ sorted_until_ = std::min(sorted_until_, parts_sorter->sorted_until());
+ }
+
+ // Return the oldest message found. This will be nullptr if nothing was
+ // found, indicating there is nothing left.
+ return oldest;
+}
+
+void NodeMerger::PopFront() {
+ CHECK(current_ != nullptr) << "Popping before calling Front()";
+ current_->PopFront();
+ current_ = nullptr;
+}
+
SplitMessageReader::SplitMessageReader(
const std::vector<std::string> &filenames)
: filenames_(filenames),