Add a MessageHeader field for monotonic_timestamp_time and extract it

We track the time from send -> republish already by logging delivery
timestamps.  When that time is sent back to the logger node, we actually
measure the time from republish -> logger node too, which would be
useful for timeline reconstruction.  Add a field to log that to and
extract any timestamps we find there.

Next step is to log it and use it again.

Change-Id: I36286903adfa418e830be68382b157794a16ff7a
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 59f478c..00322e1 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -530,6 +530,9 @@
   if (m.realtime_remote_time != realtime_clock::min_time) {
     os << ", .realtime_remote_time=" << m.realtime_remote_time;
   }
+  if (m.monotonic_timestamp_time != monotonic_clock::min_time) {
+    os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
+  }
   if (m.data.Verify()) {
     os << ", .data="
        << aos::FlatbufferToJson(m.data,
@@ -666,9 +669,19 @@
       oldest = m;
       current_ = &parts_sorter;
     } else if (*m == *oldest) {
-      // Found a duplicate.  It doesn't matter which one we return.  It is
-      // easiest to just drop the new one.
-      parts_sorter.PopFront();
+      // Found a duplicate.  If there is a choice, we want the one which has the
+      // timestamp time.
+      if (!m->data.message().has_monotonic_timestamp_time()) {
+        parts_sorter.PopFront();
+      } else if (!oldest->data.message().has_monotonic_timestamp_time()) {
+        current_->PopFront();
+        current_ = &parts_sorter;
+        oldest = m;
+      } else {
+        CHECK_EQ(m->data.message().monotonic_timestamp_time(),
+                 oldest->data.message().monotonic_timestamp_time());
+        parts_sorter.PopFront();
+      }
     }
 
     // PopFront may change this, so compute it down here.
@@ -702,6 +715,7 @@
                .remote_queue_index = 0xffffffff,
                .monotonic_remote_time = monotonic_clock::min_time,
                .realtime_remote_time = realtime_clock::min_time,
+               .monotonic_timestamp_time = monotonic_clock::min_time,
                .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()} {
   for (const LogParts *part : node_merger_.Parts()) {
     if (!configuration_) {
@@ -770,6 +784,7 @@
       .remote_queue_index = 0xffffffff,
       .monotonic_remote_time = monotonic_clock::min_time,
       .realtime_remote_time = realtime_clock::min_time,
+      .monotonic_timestamp_time = monotonic_clock::min_time,
       .data = std::move(m->data)};
 }
 
@@ -843,6 +858,9 @@
                 m->data.message().monotonic_remote_time())),
         .realtime_remote_time = realtime_clock::time_point(
             std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
+        .monotonic_timestamp_time =
+            monotonic_clock::time_point(std::chrono::nanoseconds(
+                m->data.message().monotonic_timestamp_time())),
         .data = std::move(data.data)};
     CHECK_GE(message_.monotonic_event_time, last_message_time_);
     last_message_time_ = message_.monotonic_event_time;
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 088ce73..94525cf 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -358,6 +358,9 @@
   monotonic_clock::time_point monotonic_remote_time = monotonic_clock::min_time;
   realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
 
+  monotonic_clock::time_point monotonic_timestamp_time =
+      monotonic_clock::min_time;
+
   SizePrefixedFlatbufferVector<MessageHeader> data;
 };
 
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 7795e66..979821e 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -376,6 +376,22 @@
   "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
   "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
   "parts_index": 0
+})")),
+        config4_(MakeHeader(config_,
+                            R"({
+  /* 100ms */
+  "max_out_of_order_duration": 100000000,
+  "node": {
+    "name": "pi2"
+  },
+  "logger_node": {
+    "name": "pi1"
+  },
+  "monotonic_start_time": 2000000,
+  "realtime_start_time": 1000000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
+  "parts_index": 0
 })")) {
     unlink(logfile0_.c_str());
     unlink(logfile1_.c_str());
@@ -415,25 +431,45 @@
 
   flatbuffers::DetachedBuffer MakeTimestampMessage(
       const aos::monotonic_clock::time_point sender_monotonic_now,
-      int channel_index, chrono::nanoseconds receiver_monotonic_offset) {
-    aos::Context context;
-    context.monotonic_remote_time = sender_monotonic_now;
-    context.realtime_remote_time = aos::realtime_clock::epoch() +
-                                   chrono::seconds(1000) +
-                                   sender_monotonic_now.time_since_epoch();
-    context.remote_queue_index = queue_index_[channel_index] - 1;
-    context.monotonic_event_time =
+      int channel_index, chrono::nanoseconds receiver_monotonic_offset,
+      monotonic_clock::time_point monotonic_timestamp_time =
+          monotonic_clock::min_time) {
+    const monotonic_clock::time_point monotonic_sent_time =
         sender_monotonic_now + receiver_monotonic_offset;
-    context.realtime_event_time =
-        aos::realtime_clock::epoch() + chrono::seconds(1000) +
-        context.monotonic_event_time.time_since_epoch();
-    context.queue_index = queue_index_[channel_index] - 1 + 100;
-    context.size = 0;
-    context.data = nullptr;
 
     flatbuffers::FlatBufferBuilder fbb;
-    fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index,
-                                       LogType::kLogDeliveryTimeOnly));
+    fbb.ForceDefaults(true);
+
+    logger::MessageHeader::Builder message_header_builder(fbb);
+
+    message_header_builder.add_channel_index(channel_index);
+
+    message_header_builder.add_queue_index(queue_index_[channel_index] - 1 +
+                                           100);
+    message_header_builder.add_monotonic_sent_time(
+        monotonic_sent_time.time_since_epoch().count());
+    message_header_builder.add_realtime_sent_time(
+        (aos::realtime_clock::epoch() + chrono::seconds(1000) +
+         monotonic_sent_time.time_since_epoch())
+            .time_since_epoch()
+            .count());
+
+    message_header_builder.add_monotonic_remote_time(
+        sender_monotonic_now.time_since_epoch().count());
+    message_header_builder.add_realtime_remote_time(
+        (aos::realtime_clock::epoch() + chrono::seconds(1000) +
+         sender_monotonic_now.time_since_epoch())
+            .time_since_epoch()
+            .count());
+    message_header_builder.add_remote_queue_index(queue_index_[channel_index] -
+                                                  1);
+
+    if (monotonic_timestamp_time != monotonic_clock::min_time) {
+      message_header_builder.add_monotonic_timestamp_time(
+          monotonic_timestamp_time.time_since_epoch().count());
+    }
+
+    fbb.FinishSizePrefixed(message_header_builder.Finish());
     LOG(INFO) << aos::FlatbufferToJson(
         aos::SizePrefixedFlatbufferSpan<MessageHeader>(
             absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
@@ -450,6 +486,7 @@
   const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
   const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
   const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config4_;
 
   std::vector<uint32_t> queue_index_;
 };
@@ -676,6 +713,78 @@
   EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
 }
 
+// Tests that we can merge timestamps with various combinations of
+// monotonic_timestamp_time.
+TEST_F(NodeMergerTest, TwoFileTimestampMerger) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config1_.span());
+
+    // Neither has it.
+    MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
+    writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+    // First only has it.
+    MakeLogMessage(e + chrono::milliseconds(1001), 0, 0x006);
+    writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1001), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(971)));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1001), 0, chrono::seconds(100)));
+
+    // Second only has it.
+    MakeLogMessage(e + chrono::milliseconds(1002), 0, 0x007);
+    writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1002), 0, chrono::seconds(100)));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1002), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(972)));
+
+    // Both have it.
+    MakeLogMessage(e + chrono::milliseconds(1003), 0, 0x008);
+    writer0.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1003), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(973)));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1003), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(973)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+  ASSERT_EQ(parts.size(), 1u);
+
+  NodeMerger merger(FilterPartsForNode(parts, "pi1"));
+
+  EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
+
+  std::deque<Message> output;
+
+  for (int i = 0; i < 4; ++i) {
+    ASSERT_TRUE(merger.Front() != nullptr);
+    output.emplace_back(std::move(*merger.Front()));
+    merger.PopFront();
+  }
+  ASSERT_TRUE(merger.Front() == nullptr);
+
+  EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(101000));
+  EXPECT_FALSE(output[0].data.message().has_monotonic_timestamp_time());
+  EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(101001));
+  EXPECT_TRUE(output[1].data.message().has_monotonic_timestamp_time());
+  EXPECT_EQ(output[1].data.message().monotonic_timestamp_time(), 971);
+  EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(101002));
+  EXPECT_TRUE(output[2].data.message().has_monotonic_timestamp_time());
+  EXPECT_EQ(output[2].data.message().monotonic_timestamp_time(), 972);
+  EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(101003));
+  EXPECT_TRUE(output[3].data.message().has_monotonic_timestamp_time());
+  EXPECT_EQ(output[3].data.message().monotonic_timestamp_time(), 973);
+}
+
 // Tests that we can match timestamps on delivered messages.
 TEST_F(TimestampMapperTest, ReadNode0First) {
   const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
@@ -775,6 +884,99 @@
   }
 }
 
+// Tests that a MessageHeader with monotonic_timestamp_time set gets properly
+// returned.
+TEST_F(TimestampMapperTest, MessageWithTimestampTime) {
+  const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+  {
+    DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+    writer0.QueueSpan(config0_.span());
+    DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+    writer1.QueueSpan(config4_.span());
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(1000), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(971)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(2000), 0, chrono::seconds(100),
+        e + chrono::nanoseconds(5458)));
+
+    writer0.QueueSizedFlatbuffer(
+        MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+    writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+        e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+  for (const auto &p : parts) {
+    LOG(INFO) << p;
+  }
+
+  ASSERT_EQ(parts.size(), 1u);
+
+  TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+  TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+  mapper0.AddPeer(&mapper1);
+  mapper1.AddPeer(&mapper0);
+
+  {
+    std::deque<TimestampedMessage> output0;
+
+    for (int i = 0; i < 3; ++i) {
+      ASSERT_TRUE(mapper0.Front() != nullptr) << ": " << i;
+      output0.emplace_back(std::move(*mapper0.Front()));
+      mapper0.PopFront();
+    }
+
+    ASSERT_TRUE(mapper0.Front() == nullptr);
+
+    EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+    EXPECT_EQ(output0[0].monotonic_timestamp_time, monotonic_clock::min_time);
+    EXPECT_TRUE(output0[0].data.Verify());
+    EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
+    EXPECT_EQ(output0[1].monotonic_timestamp_time, monotonic_clock::min_time);
+    EXPECT_TRUE(output0[1].data.Verify());
+    EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
+    EXPECT_EQ(output0[2].monotonic_timestamp_time, monotonic_clock::min_time);
+    EXPECT_TRUE(output0[2].data.Verify());
+  }
+
+  {
+    SCOPED_TRACE("Trying node1 now");
+    std::deque<TimestampedMessage> output1;
+
+    for (int i = 0; i < 3; ++i) {
+      ASSERT_TRUE(mapper1.Front() != nullptr);
+      output1.emplace_back(std::move(*mapper1.Front()));
+      mapper1.PopFront();
+    }
+
+    ASSERT_TRUE(mapper1.Front() == nullptr);
+
+    EXPECT_EQ(output1[0].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(1000));
+    EXPECT_EQ(output1[0].monotonic_timestamp_time,
+              e + chrono::nanoseconds(971));
+    EXPECT_TRUE(output1[0].data.Verify());
+    EXPECT_EQ(output1[1].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(2000));
+    EXPECT_EQ(output1[1].monotonic_timestamp_time,
+              e + chrono::nanoseconds(5458));
+    EXPECT_TRUE(output1[1].data.Verify());
+    EXPECT_EQ(output1[2].monotonic_event_time,
+              e + chrono::seconds(100) + chrono::milliseconds(3000));
+    EXPECT_EQ(output1[2].monotonic_timestamp_time, monotonic_clock::min_time);
+    EXPECT_TRUE(output1[2].data.Verify());
+  }
+}
+
 // Tests that we can match timestamps on delivered messages.  By doing this in
 // the reverse order, the second node needs to queue data up from the first node
 // to find the matching timestamp.
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 33702e0..2896963 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -124,6 +124,10 @@
   realtime_remote_time:int64 = -9223372036854775808 (id: 6);
   // Queue index of this message on the remote node.
   remote_queue_index:uint32 = 4294967295 (id: 7);
+
+  // Time this timestamp was received on the monotonic clock of the logger node
+  // in nanoseconds.
+  monotonic_timestamp_time:int64 = -9223372036854775808 (id: 8);
 }
 
 root_type MessageHeader;