Add LogPartsSorter to sort messages from a log file
Based on https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#
we need to start by sorting all the messages from a set of parts. Make
a class and test this. (The testing infrastructure looks a bit
over-kill, but will be re-used a lot for the following tests).
Change-Id: Ifa1ba880ddf7cf923f24826e504b902a4787ad03
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 3a39803..83d798d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -442,8 +442,13 @@
newest_timestamp_ = message_reader_.newest_timestamp();
const monotonic_clock::time_point monotonic_sent_time(
chrono::nanoseconds(message->message().monotonic_sent_time()));
+ // TODO(austin): Does this work with startup? Might need to use the start
+ // time.
+ // TODO(austin): Does this work with startup when we don't know the remote
+ // start time too? Look at one of those logs to compare.
CHECK_GE(monotonic_sent_time,
- newest_timestamp_ - max_out_of_order_duration());
+ newest_timestamp_ - max_out_of_order_duration())
+ << ": Max out of order exceeded.";
return message;
}
NextLog();
@@ -489,6 +494,67 @@
return os;
}
+LogPartsSorter::LogPartsSorter(LogParts log_parts)
+ : parts_message_reader_(log_parts) {}
+
+Message *LogPartsSorter::Front() {
+ // Queue up data until enough data has been queued that the front message is
+ // sorted enough to be safe to pop. This may do nothing, so we should make
+ // sure the nothing path is checked quickly.
+ if (sorted_until() != monotonic_clock::max_time) {
+ while (true) {
+ if (!messages_.empty() && messages_.begin()->timestamp < sorted_until()) {
+ break;
+ }
+
+ std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
+ parts_message_reader_.ReadMessage();
+ // No data left, sorted forever, work through what is left.
+ if (!m) {
+ sorted_until_ = monotonic_clock::max_time;
+ break;
+ }
+
+ messages_.insert(
+ {.channel_index = m.value().message().channel_index(),
+ .queue_index = m.value().message().queue_index(),
+ .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
+ m.value().message().monotonic_sent_time())),
+ .data = std::move(m.value())});
+
+ // Now, update sorted_until_ to match the new message.
+ if (parts_message_reader_.newest_timestamp() >
+ monotonic_clock::min_time +
+ parts_message_reader_.max_out_of_order_duration()) {
+ sorted_until_ = parts_message_reader_.newest_timestamp() -
+ parts_message_reader_.max_out_of_order_duration();
+ } else {
+ sorted_until_ = monotonic_clock::min_time;
+ }
+ }
+ }
+
+ // Now that we have enough data queued, return a pointer to the oldest piece
+ // of data if it exists.
+ if (messages_.empty()) {
+ return nullptr;
+ }
+
+ return &(*messages_.begin());
+}
+
+void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
+
+std::string LogPartsSorter::DebugString() const {
+ std::stringstream ss;
+ ss << "messages: [\n";
+ for (const Message &m : messages_) {
+ ss << m << "\n";
+ }
+ ss << "] <- " << parts_message_reader_.filename();
+ return ss.str();
+}
+
SplitMessageReader::SplitMessageReader(
const std::vector<std::string> &filenames)
: filenames_(filenames),