Add LogPartsSorter to sort messages from a log file
Based on https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#
we need to start by sorting all the messages from a set of parts. Make
a class and test this. (The testing infrastructure looks a bit
over-kill, but will be re-used a lot for the following tests).
Change-Id: Ifa1ba880ddf7cf923f24826e504b902a4787ad03
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 3a39803..83d798d 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -442,8 +442,13 @@
newest_timestamp_ = message_reader_.newest_timestamp();
const monotonic_clock::time_point monotonic_sent_time(
chrono::nanoseconds(message->message().monotonic_sent_time()));
+ // TODO(austin): Does this work with startup? Might need to use the start
+ // time.
+ // TODO(austin): Does this work with startup when we don't know the remote
+ // start time too? Look at one of those logs to compare.
CHECK_GE(monotonic_sent_time,
- newest_timestamp_ - max_out_of_order_duration());
+ newest_timestamp_ - max_out_of_order_duration())
+ << ": Max out of order exceeded.";
return message;
}
NextLog();
@@ -489,6 +494,67 @@
return os;
}
+LogPartsSorter::LogPartsSorter(LogParts log_parts)
+ : parts_message_reader_(log_parts) {}
+
+Message *LogPartsSorter::Front() {
+ // Queue up data until enough data has been queued that the front message is
+ // sorted enough to be safe to pop. This may do nothing, so we should make
+ // sure the nothing path is checked quickly.
+ if (sorted_until() != monotonic_clock::max_time) {
+ while (true) {
+ if (!messages_.empty() && messages_.begin()->timestamp < sorted_until()) {
+ break;
+ }
+
+ std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
+ parts_message_reader_.ReadMessage();
+ // No data left, sorted forever, work through what is left.
+ if (!m) {
+ sorted_until_ = monotonic_clock::max_time;
+ break;
+ }
+
+ messages_.insert(
+ {.channel_index = m.value().message().channel_index(),
+ .queue_index = m.value().message().queue_index(),
+ .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
+ m.value().message().monotonic_sent_time())),
+ .data = std::move(m.value())});
+
+ // Now, update sorted_until_ to match the new message.
+ if (parts_message_reader_.newest_timestamp() >
+ monotonic_clock::min_time +
+ parts_message_reader_.max_out_of_order_duration()) {
+ sorted_until_ = parts_message_reader_.newest_timestamp() -
+ parts_message_reader_.max_out_of_order_duration();
+ } else {
+ sorted_until_ = monotonic_clock::min_time;
+ }
+ }
+ }
+
+ // Now that we have enough data queued, return a pointer to the oldest piece
+ // of data if it exists.
+ if (messages_.empty()) {
+ return nullptr;
+ }
+
+ return &(*messages_.begin());
+}
+
+void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
+
+std::string LogPartsSorter::DebugString() const {
+ std::stringstream ss;
+ ss << "messages: [\n";
+ for (const Message &m : messages_) {
+ ss << m << "\n";
+ }
+ ss << "] <- " << parts_message_reader_.filename();
+ return ss.str();
+}
+
SplitMessageReader::SplitMessageReader(
const std::vector<std::string> &filenames)
: filenames_(filenames),
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 25f02f9..13c83cf 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -14,6 +14,7 @@
#include <utility>
#include <vector>
+#include "absl/container/btree_set.h"
#include "absl/types/span.h"
#include "aos/containers/resizeable_buffer.h"
#include "aos/events/event_loop.h"
@@ -280,6 +281,10 @@
std::string_view filename() const { return message_reader_.filename(); }
+ const LogFileHeader *log_file_header() const {
+ return message_reader_.log_file_header();
+ }
+
// Returns the minimum amount of data needed to queue up for sorting before
// we are guarenteed to not see data out of order.
std::chrono::nanoseconds max_out_of_order_duration() const {
@@ -326,6 +331,44 @@
std::ostream &operator<<(std::ostream &os, const Message &m);
+// Class to sort the resulting messages from a PartsMessageReader.
+class LogPartsSorter {
+ public:
+ LogPartsSorter(LogParts log_parts);
+
+ // Returns the current log file header.
+ // TODO(austin): Is this the header we want to report? Do we want a better
+ // start time?
+ // TODO(austin): Report a start time from the LogParts. Figure out how that
+ // all works.
+ const LogFileHeader *log_file_header() const {
+ return parts_message_reader_.log_file_header();
+ }
+
+ // The time this data is sorted until.
+ monotonic_clock::time_point sorted_until() const { return sorted_until_; }
+
+ // Returns the next sorted message from the log file. It is safe to call
+ // std::move() on the result to move the data flatbuffer from it.
+ Message *Front();
+ // Pops the front message. This should only be called after a call to
+ // Front().
+ void PopFront();
+
+ // Returns a debug string representing the contents of this sorter.
+ std::string DebugString() const;
+
+ private:
+ // Log parts reader we are wrapping.
+ PartsMessageReader parts_message_reader_;
+ // Cache of the time we are sorted until.
+ aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
+
+ // Set used for efficient sorting of messages. We can benchmark and evaluate
+ // other data structures if this proves to be the bottleneck.
+ absl::btree_set<Message> messages_;
+};
+
class TimestampMerger;
// A design requirement is that the relevant data for a channel is not more than
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index dafb452..d1025fa 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -5,6 +5,7 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/test_message_generated.h"
+#include "aos/flatbuffer_merge.h"
#include "aos/flatbuffers.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/testing/tmpdir.h"
@@ -31,11 +32,9 @@
unlink(logfile.c_str());
const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
- JsonToSizedFlatbuffer<TestMessage>(
- R"({ "value": 1 })");
+ JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
- JsonToSizedFlatbuffer<TestMessage>(
- R"({ "value": 2 })");
+ JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
{
DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
@@ -249,6 +248,265 @@
EXPECT_GE(m2, m1);
}
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+ const aos::FlatbufferDetachedBuffer<Configuration> &config,
+ const std::string_view json) {
+ flatbuffers::FlatBufferBuilder fbb;
+ flatbuffers::Offset<Configuration> config_offset =
+ aos::CopyFlatBuffer(config, &fbb);
+ LogFileHeader::Builder header_builder(fbb);
+ header_builder.add_configuration(config_offset);
+ fbb.Finish(header_builder.Finish());
+ aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
+
+ aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
+ JsonToFlatbuffer<LogFileHeader>(json));
+ CHECK(header_updates.Verify());
+ flatbuffers::FlatBufferBuilder fbb2;
+ fbb2.FinishSizePrefixed(
+ aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
+ return fbb2.Release();
+}
+
+class SortingElementTest : public ::testing::Test {
+ public:
+ SortingElementTest()
+ : config_(JsonToFlatbuffer<Configuration>(
+ R"({
+ "channels": [
+ {
+ "name": "/a",
+ "type": "aos.logger.testing.TestMessage",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2"
+ },
+ {
+ "name": "pi3"
+ }
+ ]
+ },
+ {
+ "name": "/b",
+ "type": "aos.logger.testing.TestMessage",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/c",
+ "type": "aos.logger.testing.TestMessage",
+ "source_node": "pi1"
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1"
+ },
+ {
+ "name": "pi2"
+ },
+ {
+ "name": "pi3"
+ }
+ ]
+}
+)")),
+ config0_(MakeHeader(config_, R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 0,
+ "realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
+ "parts_index": 0
+})")),
+ config1_(MakeHeader(config_,
+ R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 0,
+ "realtime_start_time": 1000000000000,
+ "log_event_uuid": "d4724d35-a6c6-4a30-8a94-d192f4c18260",
+ "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
+ "parts_index": 0
+})")) {
+ unlink(logfile0_.c_str());
+ unlink(logfile1_.c_str());
+ queue_index_.resize(kChannels);
+ }
+
+ protected:
+ static constexpr size_t kChannels = 3u;
+
+ flatbuffers::DetachedBuffer MakeLogMessage(
+ const aos::monotonic_clock::time_point monotonic_now, int channel_index,
+ int value) {
+ flatbuffers::FlatBufferBuilder message_fbb;
+ message_fbb.ForceDefaults(true);
+ TestMessage::Builder test_message_builder(message_fbb);
+ test_message_builder.add_value(value);
+ message_fbb.Finish(test_message_builder.Finish());
+
+ aos::Context context;
+ context.monotonic_event_time = monotonic_now;
+ context.realtime_event_time = aos::realtime_clock::epoch() +
+ chrono::seconds(1000) +
+ monotonic_now.time_since_epoch();
+ context.queue_index = queue_index_[channel_index];
+ context.size = message_fbb.GetSize();
+ context.data = message_fbb.GetBufferPointer();
+
+ ++queue_index_[channel_index];
+
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.FinishSizePrefixed(
+ PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
+
+ return fbb.Release();
+ }
+
+ flatbuffers::DetachedBuffer MakeTimestampMessage(
+ const aos::monotonic_clock::time_point sender_monotonic_now,
+ int channel_index, chrono::nanoseconds receiver_monotonic_offset) {
+ aos::Context context;
+ context.monotonic_remote_time = sender_monotonic_now;
+ context.realtime_remote_time = aos::realtime_clock::epoch() +
+ chrono::seconds(1000) +
+ sender_monotonic_now.time_since_epoch();
+ context.remote_queue_index = queue_index_[channel_index] - 1;
+ context.monotonic_event_time =
+ sender_monotonic_now + receiver_monotonic_offset;
+ context.realtime_event_time =
+ aos::realtime_clock::epoch() + chrono::seconds(1000) +
+ context.monotonic_event_time.time_since_epoch();
+ context.queue_index = queue_index_[channel_index] - 1 + 100;
+ context.size = 0;
+ context.data = nullptr;
+
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index,
+ LogType::kLogDeliveryTimeOnly));
+ LOG(INFO) << aos::FlatbufferToJson(
+ aos::SizePrefixedFlatbufferSpan<MessageHeader>(
+ absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
+
+ return fbb.Release();
+ }
+
+ const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
+ const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
+
+ const aos::FlatbufferDetachedBuffer<Configuration> config_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
+
+ std::vector<uint32_t> queue_index_;
+};
+
+using LogPartsSorterTest = SortingElementTest;
+using LogPartsSorterDeathTest = LogPartsSorterTest;
+
+// Tests that we can pull messages out of a log sorted in order.
+TEST_F(LogPartsSorterTest, Pull) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+ writer.QueueSpan(config0_.span());
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_});
+
+ LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+ // Confirm we aren't sorted until any time until the message is popped.
+ // Peeking shouldn't change the sorted until time.
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+
+ std::deque<Message> output;
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(parts_sorter.Front() == nullptr);
+
+ EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+ EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1000));
+ EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1901));
+ EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
+}
+
+// Tests that messages too far out of order trigger death.
+TEST_F(LogPartsSorterDeathTest, Pull) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+ writer.QueueSpan(config0_.span());
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
+ // The following message is too far out of order and will trigger the CHECK.
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_});
+
+ LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+ // Confirm we aren't sorted until any time until the message is popped.
+ // Peeking shouldn't change the sorted until time.
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+ std::deque<Message> output;
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ parts_sorter.PopFront();
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ parts_sorter.PopFront();
+
+ EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order exceeded.");
+}
+
} // namespace testing
} // namespace logger
} // namespace aos
diff --git a/aos/flatbuffer_merge.h b/aos/flatbuffer_merge.h
index d98a9eb..0c09b22 100644
--- a/aos/flatbuffer_merge.h
+++ b/aos/flatbuffer_merge.h
@@ -50,6 +50,15 @@
MergeFlatBuffers<T>(&fb1.message(), &fb2.message()));
}
+template <class T>
+inline flatbuffers::Offset<T> MergeFlatBuffers(
+ const aos::Flatbuffer<T> &fb1, const aos::Flatbuffer<T> &fb2,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ return MergeFlatBuffers<T>(
+ reinterpret_cast<const flatbuffers::Table *>(&fb1.message()),
+ reinterpret_cast<const flatbuffers::Table *>(&fb2.message()), fbb);
+}
+
// Copies a flatbuffer by walking the tree and copying all the pieces. This
// converts DAGs to trees.
template <class T>
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 81a8cee..adb9769 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -477,6 +477,37 @@
ResizeableBuffer data_;
};
+// Non-owning Span backed flatbuffer.
+template <typename T>
+class SizePrefixedFlatbufferSpan : public SizePrefixedFlatbuffer<T> {
+ public:
+ // Builds a flatbuffer pointing to the contents of a span.
+ SizePrefixedFlatbufferSpan(const absl::Span<const uint8_t> data)
+ : data_(data) {}
+ // Builds a Flatbuffer pointing to the contents of another flatbuffer.
+ SizePrefixedFlatbufferSpan(const SizePrefixedFlatbuffer<T> &other) {
+ data_ = other.span();
+ }
+
+ // Points to the data in the other flatbuffer.
+ SizePrefixedFlatbufferSpan &operator=(
+ const SizePrefixedFlatbuffer<T> &other) {
+ data_ = other.span();
+ return *this;
+ }
+
+ ~SizePrefixedFlatbufferSpan() override {}
+
+ absl::Span<uint8_t> span() override {
+ LOG(FATAL) << "Unimplemented";
+ return absl::Span<uint8_t>(nullptr, 0);
+ }
+ absl::Span<const uint8_t> span() const override { return data_; }
+
+ private:
+ absl::Span<const uint8_t> data_;
+};
+
inline flatbuffers::DetachedBuffer CopySpanAsDetachedBuffer(
absl::Span<const uint8_t> span) {
// Copy the data from the span.