Add a MessageHeader field for monotonic_timestamp_time and extract it
We track the time from send -> republish already by logging delivery
timestamps. When that time is sent back to the logger node, we actually
measure the time from republish -> logger node too, which would be
useful for timeline reconstruction. Add a field to log that to and
extract any timestamps we find there.
Next step is to log it and use it again.
Change-Id: I36286903adfa418e830be68382b157794a16ff7a
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 7795e66..979821e 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -376,6 +376,22 @@
"log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
"parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
"parts_index": 0
+})")),
+ config4_(MakeHeader(config_,
+ R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi2"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 2000000,
+ "realtime_start_time": 1000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
+ "parts_index": 0
})")) {
unlink(logfile0_.c_str());
unlink(logfile1_.c_str());
@@ -415,25 +431,45 @@
flatbuffers::DetachedBuffer MakeTimestampMessage(
const aos::monotonic_clock::time_point sender_monotonic_now,
- int channel_index, chrono::nanoseconds receiver_monotonic_offset) {
- aos::Context context;
- context.monotonic_remote_time = sender_monotonic_now;
- context.realtime_remote_time = aos::realtime_clock::epoch() +
- chrono::seconds(1000) +
- sender_monotonic_now.time_since_epoch();
- context.remote_queue_index = queue_index_[channel_index] - 1;
- context.monotonic_event_time =
+ int channel_index, chrono::nanoseconds receiver_monotonic_offset,
+ monotonic_clock::time_point monotonic_timestamp_time =
+ monotonic_clock::min_time) {
+ const monotonic_clock::time_point monotonic_sent_time =
sender_monotonic_now + receiver_monotonic_offset;
- context.realtime_event_time =
- aos::realtime_clock::epoch() + chrono::seconds(1000) +
- context.monotonic_event_time.time_since_epoch();
- context.queue_index = queue_index_[channel_index] - 1 + 100;
- context.size = 0;
- context.data = nullptr;
flatbuffers::FlatBufferBuilder fbb;
- fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index,
- LogType::kLogDeliveryTimeOnly));
+ fbb.ForceDefaults(true);
+
+ logger::MessageHeader::Builder message_header_builder(fbb);
+
+ message_header_builder.add_channel_index(channel_index);
+
+ message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
+ 100);
+ message_header_builder.add_monotonic_sent_time(
+ monotonic_sent_time.time_since_epoch().count());
+ message_header_builder.add_realtime_sent_time(
+ (aos::realtime_clock::epoch() + chrono::seconds(1000) +
+ monotonic_sent_time.time_since_epoch())
+ .time_since_epoch()
+ .count());
+
+ message_header_builder.add_monotonic_remote_time(
+ sender_monotonic_now.time_since_epoch().count());
+ message_header_builder.add_realtime_remote_time(
+ (aos::realtime_clock::epoch() + chrono::seconds(1000) +
+ sender_monotonic_now.time_since_epoch())
+ .time_since_epoch()
+ .count());
+ message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
+ 1);
+
+ if (monotonic_timestamp_time != monotonic_clock::min_time) {
+ message_header_builder.add_monotonic_timestamp_time(
+ monotonic_timestamp_time.time_since_epoch().count());
+ }
+
+ fbb.FinishSizePrefixed(message_header_builder.Finish());
LOG(INFO) << aos::FlatbufferToJson(
aos::SizePrefixedFlatbufferSpan<MessageHeader>(
absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
@@ -450,6 +486,7 @@
const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
std::vector<uint32_t> queue_index_;
};
@@ -676,6 +713,78 @@
EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
}
+// Tests that we can merge timestamps with various combinations of
+// monotonic_timestamp_time.
+TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config1_.span());
+
+ // Neither has it.
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
+ writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ // First only has it.
+ MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
+ writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1001), 0, chrono::seconds(100),
+ e + chrono::nanoseconds(971)));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
+
+ // Second only has it.
+ MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
+ writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1002), 0, chrono::seconds(100),
+ e + chrono::nanoseconds(972)));
+
+ // Both have it.
+ MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
+ writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1003), 0, chrono::seconds(100),
+ e + chrono::nanoseconds(973)));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1003), 0, chrono::seconds(100),
+ e + chrono::nanoseconds(973)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ ASSERT_EQ(parts.size(), 1u);
+
+ NodeMerger merger(FilterPartsForNode(parts, "pi1"));
+
+ EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
+
+ std::deque<Message> output;
+
+ for (int i = 0; i < 4; ++i) {
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ }
+ ASSERT_TRUE(merger.Front() == nullptr);
+
+ EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(101000));
+ EXPECT_FALSE(output[0].data.message().has_monotonic_timestamp_time());
+ EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(101001));
+ EXPECT_TRUE(output[1].data.message().has_monotonic_timestamp_time());
+ EXPECT_EQ(output[1].data.message().monotonic_timestamp_time(), 971);
+ EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(101002));
+ EXPECT_TRUE(output[2].data.message().has_monotonic_timestamp_time());
+ EXPECT_EQ(output[2].data.message().monotonic_timestamp_time(), 972);
+ EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(101003));
+ EXPECT_TRUE(output[3].data.message().has_monotonic_timestamp_time());
+ EXPECT_EQ(output[3].data.message().monotonic_timestamp_time(), 973);
+}
+
// Tests that we can match timestamps on delivered messages.
TEST_F(TimestampMapperTest, ReadNode0First) {
const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
@@ -775,6 +884,99 @@
}
}
+// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
+// returned.
+TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config4_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100),
+ e + chrono::nanoseconds(971)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100),
+ e + chrono::nanoseconds(5458)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ for (const auto &p : parts) {
+ LOG(INFO) << p;
+ }
+
+ ASSERT_EQ(parts.size(), 1u);
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ }
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+ EXPECT_EQ(output0[0].monotonic_timestamp_time, monotonic_clock::min_time);
+ EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
+ EXPECT_EQ(output0[1].monotonic_timestamp_time, monotonic_clock::min_time);
+ EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
+ EXPECT_EQ(output0[2].monotonic_timestamp_time, monotonic_clock::min_time);
+ EXPECT_TRUE(output0[2].data.Verify());
+ }
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ }
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_EQ(output1[0].monotonic_timestamp_time,
+ e + chrono::nanoseconds(971));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_EQ(output1[1].monotonic_timestamp_time,
+ e + chrono::nanoseconds(5458));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_EQ(output1[2].monotonic_timestamp_time, monotonic_clock::min_time);
+ EXPECT_TRUE(output1[2].data.Verify());
+ }
+}
+
// Tests that we can match timestamps on delivered messages. By doing this in
// the reverse order, the second node needs to queue data up from the first node
// to find the matching timestamp.