Add LogPartsSorter to sort messages from a log file
Based on https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#
we need to start by sorting all the messages from a set of parts. Make
a class and test this. (The testing infrastructure looks a bit
over-kill, but will be re-used a lot for the following tests).
Change-Id: Ifa1ba880ddf7cf923f24826e504b902a4787ad03
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index dafb452..d1025fa 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -5,6 +5,7 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/test_message_generated.h"
+#include "aos/flatbuffer_merge.h"
#include "aos/flatbuffers.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/testing/tmpdir.h"
@@ -31,11 +32,9 @@
unlink(logfile.c_str());
const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
- JsonToSizedFlatbuffer<TestMessage>(
- R"({ "value": 1 })");
+ JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
- JsonToSizedFlatbuffer<TestMessage>(
- R"({ "value": 2 })");
+ JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
{
DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
@@ -249,6 +248,265 @@
EXPECT_GE(m2, m1);
}
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+ const aos::FlatbufferDetachedBuffer<Configuration> &config,
+ const std::string_view json) {
+ flatbuffers::FlatBufferBuilder fbb;
+ flatbuffers::Offset<Configuration> config_offset =
+ aos::CopyFlatBuffer(config, &fbb);
+ LogFileHeader::Builder header_builder(fbb);
+ header_builder.add_configuration(config_offset);
+ fbb.Finish(header_builder.Finish());
+ aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
+
+ aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
+ JsonToFlatbuffer<LogFileHeader>(json));
+ CHECK(header_updates.Verify());
+ flatbuffers::FlatBufferBuilder fbb2;
+ fbb2.FinishSizePrefixed(
+ aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
+ return fbb2.Release();
+}
+
+class SortingElementTest : public ::testing::Test {
+ public:
+ SortingElementTest()
+ : config_(JsonToFlatbuffer<Configuration>(
+ R"({
+ "channels": [
+ {
+ "name": "/a",
+ "type": "aos.logger.testing.TestMessage",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2"
+ },
+ {
+ "name": "pi3"
+ }
+ ]
+ },
+ {
+ "name": "/b",
+ "type": "aos.logger.testing.TestMessage",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/c",
+ "type": "aos.logger.testing.TestMessage",
+ "source_node": "pi1"
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1"
+ },
+ {
+ "name": "pi2"
+ },
+ {
+ "name": "pi3"
+ }
+ ]
+}
+)")),
+ config0_(MakeHeader(config_, R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 0,
+ "realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
+ "parts_index": 0
+})")),
+ config1_(MakeHeader(config_,
+ R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 0,
+ "realtime_start_time": 1000000000000,
+ "log_event_uuid": "d4724d35-a6c6-4a30-8a94-d192f4c18260",
+ "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
+ "parts_index": 0
+})")) {
+ unlink(logfile0_.c_str());
+ unlink(logfile1_.c_str());
+ queue_index_.resize(kChannels);
+ }
+
+ protected:
+ static constexpr size_t kChannels = 3u;
+
+ flatbuffers::DetachedBuffer MakeLogMessage(
+ const aos::monotonic_clock::time_point monotonic_now, int channel_index,
+ int value) {
+ flatbuffers::FlatBufferBuilder message_fbb;
+ message_fbb.ForceDefaults(true);
+ TestMessage::Builder test_message_builder(message_fbb);
+ test_message_builder.add_value(value);
+ message_fbb.Finish(test_message_builder.Finish());
+
+ aos::Context context;
+ context.monotonic_event_time = monotonic_now;
+ context.realtime_event_time = aos::realtime_clock::epoch() +
+ chrono::seconds(1000) +
+ monotonic_now.time_since_epoch();
+ context.queue_index = queue_index_[channel_index];
+ context.size = message_fbb.GetSize();
+ context.data = message_fbb.GetBufferPointer();
+
+ ++queue_index_[channel_index];
+
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.FinishSizePrefixed(
+ PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
+
+ return fbb.Release();
+ }
+
+ flatbuffers::DetachedBuffer MakeTimestampMessage(
+ const aos::monotonic_clock::time_point sender_monotonic_now,
+ int channel_index, chrono::nanoseconds receiver_monotonic_offset) {
+ aos::Context context;
+ context.monotonic_remote_time = sender_monotonic_now;
+ context.realtime_remote_time = aos::realtime_clock::epoch() +
+ chrono::seconds(1000) +
+ sender_monotonic_now.time_since_epoch();
+ context.remote_queue_index = queue_index_[channel_index] - 1;
+ context.monotonic_event_time =
+ sender_monotonic_now + receiver_monotonic_offset;
+ context.realtime_event_time =
+ aos::realtime_clock::epoch() + chrono::seconds(1000) +
+ context.monotonic_event_time.time_since_epoch();
+ context.queue_index = queue_index_[channel_index] - 1 + 100;
+ context.size = 0;
+ context.data = nullptr;
+
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index,
+ LogType::kLogDeliveryTimeOnly));
+ LOG(INFO) << aos::FlatbufferToJson(
+ aos::SizePrefixedFlatbufferSpan<MessageHeader>(
+ absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
+
+ return fbb.Release();
+ }
+
+ const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
+ const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
+
+ const aos::FlatbufferDetachedBuffer<Configuration> config_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
+
+ std::vector<uint32_t> queue_index_;
+};
+
+using LogPartsSorterTest = SortingElementTest;
+using LogPartsSorterDeathTest = LogPartsSorterTest;
+
+// Tests that we can pull messages out of a log sorted in order.
+TEST_F(LogPartsSorterTest, Pull) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+ writer.QueueSpan(config0_.span());
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_});
+
+ LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+ // Confirm we aren't sorted until any time until the message is popped.
+ // Peeking shouldn't change the sorted until time.
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+
+ std::deque<Message> output;
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(parts_sorter.Front() == nullptr);
+
+ EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+ EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1000));
+ EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1901));
+ EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
+}
+
+// Tests that messages too far out of order trigger death.
+TEST_F(LogPartsSorterDeathTest, Pull) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+ writer.QueueSpan(config0_.span());
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
+ // The following message is too far out of order and will trigger the CHECK.
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_});
+
+ LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+ // Confirm we aren't sorted until any time until the message is popped.
+ // Peeking shouldn't change the sorted until time.
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+ std::deque<Message> output;
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ parts_sorter.PopFront();
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ parts_sorter.PopFront();
+
+ EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order exceeded.");
+}
+
} // namespace testing
} // namespace logger
} // namespace aos