Add BootMerger to concatenate messages across boots
This wraps multiple NodeMergers in order to take a sorted list of
messages per boot and concatenate them across multiple ordered reboots.
This gives us a nice interface to move TimestampMapper over to to handle
reboots.
Change-Id: I9e6e565f39bf7a4a23e340d28eea686f1d53631f
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index e01b4c6..f4d4501 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -527,6 +527,8 @@
}
bool Message::operator<(const Message &m2) const {
+ CHECK_EQ(this->boot_count, m2.boot_count);
+
if (this->timestamp < m2.timestamp) {
return true;
} else if (this->timestamp > m2.timestamp) {
@@ -544,13 +546,16 @@
bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
bool Message::operator==(const Message &m2) const {
+ CHECK_EQ(this->boot_count, m2.boot_count);
+
return timestamp == m2.timestamp && channel_index == m2.channel_index &&
queue_index == m2.queue_index;
}
std::ostream &operator<<(std::ostream &os, const Message &m) {
os << "{.channel_index=" << m.channel_index
- << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
+ << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp
+ << ", .boot_count=" << m.boot_count;
if (m.data.Verify()) {
os << ", .data="
<< aos::FlatbufferToJson(m.data,
@@ -608,12 +613,13 @@
break;
}
- messages_.insert(
- {.channel_index = m.value().message().channel_index(),
- .queue_index = m.value().message().queue_index(),
- .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
- m.value().message().monotonic_sent_time())),
- .data = std::move(m.value())});
+ messages_.insert(Message{
+ .channel_index = m.value().message().channel_index(),
+ .queue_index = m.value().message().queue_index(),
+ .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
+ m.value().message().monotonic_sent_time())),
+ .boot_count = parts().boot_count,
+ .data = std::move(m.value())});
// Now, update sorted_until_ to match the new message.
if (parts_message_reader_.newest_timestamp() >
@@ -756,6 +762,49 @@
current_ = nullptr;
}
+BootMerger::BootMerger(std::vector<LogParts> files) {
+ std::vector<std::vector<LogParts>> boots;
+
+ // Now, we need to split things out by boot.
+ for (size_t i = 0; i < files.size(); ++i) {
+ LOG(INFO) << "Trying file " << i;
+ const size_t boot_count = files[i].boot_count;
+ LOG(INFO) << "Boot count " << boot_count;
+ if (boot_count + 1 > boots.size()) {
+ boots.resize(boot_count + 1);
+ }
+ boots[boot_count].emplace_back(std::move(files[i]));
+ }
+
+ node_mergers_.reserve(boots.size());
+ for (size_t i = 0; i < boots.size(); ++i) {
+ LOG(INFO) << "Boot " << i;
+ for (auto &p : boots[i]) {
+ LOG(INFO) << "Part " << p;
+ }
+ node_mergers_.emplace_back(
+ std::make_unique<NodeMerger>(std::move(boots[i])));
+ }
+}
+
+Message *BootMerger::Front() {
+ Message *result = node_mergers_[index_]->Front();
+
+ if (result != nullptr) {
+ return result;
+ }
+
+ if (index_ + 1u == node_mergers_.size()) {
+ // At the end of the last node merger, just return.
+ return nullptr;
+ } else {
+ ++index_;
+ return Front();
+ }
+}
+
+void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
+
TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
: node_merger_(std::move(parts)),
timestamp_callback_([](TimestampedMessage *) {}) {
@@ -989,6 +1038,7 @@
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
+ .boot_count = 0,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
@@ -1003,6 +1053,7 @@
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
+ .boot_count = 0,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
@@ -1012,6 +1063,7 @@
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
+ .boot_count = 0,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
@@ -1046,6 +1098,7 @@
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
+ .boot_count = 0,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}