Logger: Pipe the monotonic_remote_transmit_time through event loop

Populate the field in all the network bridges and pipe it through
all the required spots in the event loop.  Update all the tests to match
the update.

As part of this, we realized that our modeling of network delay was
wrong.  Wakeups don't always take 50 uS if something else triggers the
wakeup and the message is ready in shared memory.  This wasn't being
handled properly.

Change-Id: Idf94c5c6d7c87f4d65868c71b1cceedca7bf3853
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index 050c9a3..b717ea3 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -715,7 +715,7 @@
 
           EXPECT_EQ(connection->partial_deliveries(), 0);
           EXPECT_TRUE(connection->has_monotonic_offset());
-          EXPECT_EQ(connection->monotonic_offset(), 150000);
+          EXPECT_EQ(connection->monotonic_offset(), 100000);
           EXPECT_EQ(connection->connection_count(), 1u);
           EXPECT_EQ(connection->connected_since_time(), 0);
         }
@@ -735,7 +735,7 @@
         EXPECT_GT(connection->received_packets(), 50);
         EXPECT_EQ(connection->partial_deliveries(), 0);
         EXPECT_TRUE(connection->has_monotonic_offset());
-        EXPECT_EQ(connection->monotonic_offset(), 150000);
+        EXPECT_EQ(connection->monotonic_offset(), 100000);
         EXPECT_EQ(connection->connection_count(), 1u);
         EXPECT_EQ(connection->connected_since_time(), 0);
         ++pi2_client_statistics_count;
@@ -754,7 +754,7 @@
         EXPECT_GE(connection->received_packets(), 5);
         EXPECT_EQ(connection->partial_deliveries(), 0);
         EXPECT_TRUE(connection->has_monotonic_offset());
-        EXPECT_EQ(connection->monotonic_offset(), 150000);
+        EXPECT_EQ(connection->monotonic_offset(), 100000);
         EXPECT_EQ(connection->connection_count(), 1u);
         EXPECT_EQ(connection->connected_since_time(), 0);
         ++pi3_client_statistics_count;
@@ -798,8 +798,10 @@
         [pi1_timestamp_channel, ping_timestamp_channel, &ping_on_pi2_fetcher,
          &ping_on_pi1_fetcher, &pi1_on_pi2_timestamp_fetcher,
          &pi1_on_pi1_timestamp_fetcher, &simulated_event_loop_factory, pi2,
-         channel_index = channel.first](const RemoteMessage &header) {
-          VLOG(1) << aos::FlatbufferToJson(&header);
+         channel_index = channel.first,
+         channel_name = channel.second](const RemoteMessage &header) {
+          VLOG(1) << channel_name << " aos::message_bridge::RemoteMessage -> "
+                  << aos::FlatbufferToJson(&header);
           EXPECT_TRUE(header.has_boot_uuid());
           EXPECT_EQ(UUID::FromVector(header.boot_uuid()),
                     simulated_event_loop_factory.GetNodeEventLoopFactory(pi2)
@@ -811,6 +813,9 @@
               chrono::nanoseconds(header.realtime_sent_time()));
           const aos::monotonic_clock::time_point header_monotonic_remote_time(
               chrono::nanoseconds(header.monotonic_remote_time()));
+          const aos::monotonic_clock::time_point
+              header_monotonic_remote_transmit_time(
+                  chrono::nanoseconds(header.monotonic_remote_transmit_time()));
           const aos::realtime_clock::time_point header_realtime_remote_time(
               chrono::nanoseconds(header.realtime_remote_time()));
 
@@ -836,6 +841,9 @@
 
             pi1_context = &pi1_on_pi1_timestamp_fetcher.context();
             pi2_context = &pi1_on_pi2_timestamp_fetcher.context();
+
+            EXPECT_EQ(header_monotonic_remote_transmit_time,
+                      pi2_context->monotonic_remote_time);
           } else if (header.channel_index() == ping_timestamp_channel) {
             // Find the forwarded message.
             while (ping_on_pi2_fetcher.context().monotonic_event_time <
@@ -851,6 +859,10 @@
 
             pi1_context = &ping_on_pi1_fetcher.context();
             pi2_context = &ping_on_pi2_fetcher.context();
+
+            EXPECT_EQ(header_monotonic_remote_transmit_time,
+                      pi2_context->monotonic_event_time -
+                          simulated_event_loop_factory.network_delay());
           } else {
             LOG(FATAL) << "Unknown channel";
           }
@@ -868,6 +880,8 @@
                     header_realtime_remote_time);
           EXPECT_EQ(pi2_context->monotonic_remote_time,
                     header_monotonic_remote_time);
+          EXPECT_EQ(pi2_context->monotonic_remote_transmit_time,
+                    header_monotonic_remote_transmit_time);
 
           // Confirm the forwarded message also matches the source message.
           EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
@@ -2195,7 +2209,15 @@
     ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
     aos::Fetcher<examples::Ping> fetcher =
         ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
-    EXPECT_TRUE(fetcher.Fetch());
+    ASSERT_TRUE(fetcher.Fetch());
+    EXPECT_EQ(fetcher.context().monotonic_remote_time,
+              monotonic_clock::epoch());
+    // Message bridge picks up the Ping message immediately on reboot.
+    EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+              monotonic_clock::epoch());
+    EXPECT_EQ(fetcher.context().monotonic_event_time,
+              monotonic_clock::epoch() + factory.network_delay());
+    ASSERT_FALSE(fetcher.Fetch());
   }
 
   factory.RunFor(chrono::seconds(1));
@@ -2204,7 +2226,15 @@
     ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
     aos::Fetcher<examples::Ping> fetcher =
         ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
-    EXPECT_TRUE(fetcher.Fetch());
+    ASSERT_TRUE(fetcher.Fetch());
+    EXPECT_EQ(fetcher.context().monotonic_remote_time,
+              monotonic_clock::epoch());
+    // Message bridge picks up the Ping message immediately on reboot.
+    EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+              monotonic_clock::epoch() + chrono::seconds(1));
+    EXPECT_EQ(fetcher.context().monotonic_event_time,
+              monotonic_clock::epoch() + factory.network_delay());
+    ASSERT_FALSE(fetcher.Fetch());
   }
   EXPECT_NE(pi2_boot_uuid, pi2->boot_uuid());
 }
@@ -2260,6 +2290,8 @@
               monotonic_clock::epoch() + factory.network_delay());
     EXPECT_EQ(fetcher.context().monotonic_remote_time,
               monotonic_clock::epoch());
+    EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+              monotonic_clock::epoch() + chrono::seconds(1));
   }
   {
     ::std::unique_ptr<EventLoop> pi1_event_loop = pi1->MakeEventLoop("pong");
@@ -2271,6 +2303,8 @@
                   factory.network_delay());
     EXPECT_EQ(fetcher.context().monotonic_remote_time,
               monotonic_clock::epoch() - std::chrono::seconds(1));
+    EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+              monotonic_clock::epoch());
   }
 }
 
@@ -2461,4 +2495,152 @@
   }
 }
 
+// Tests that rapidly sent messages get timestamped correctly.
+TEST(SimulatedEventLoopTest, TransmitTimestamps) {
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(
+          ArtifactPath("aos/events/multinode_pingpong_test_split_config.json"));
+
+  message_bridge::TestingTimeConverter time(
+      configuration::NodesCount(&config.message()));
+  SimulatedEventLoopFactory factory(&config.message());
+  factory.SetTimeConverter(&time);
+  time.StartEqual();
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  ::std::unique_ptr<EventLoop> ping_event_loop = pi2->MakeEventLoop("pong");
+  aos::Fetcher<examples::Ping> fetcher =
+      ping_event_loop->MakeFetcher<examples::Ping>("/reliable");
+  EXPECT_FALSE(fetcher.Fetch());
+
+  {
+    ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+    aos::Sender<examples::Ping> test_message_sender =
+        ping_event_loop->MakeSender<examples::Ping>("/reliable");
+    for (const std::chrono::nanoseconds dt :
+         {chrono::microseconds(5000), chrono::microseconds(1),
+          chrono::microseconds(2), chrono::microseconds(70),
+          chrono::microseconds(63)}) {
+      factory.RunFor(dt);
+      SendPing(&test_message_sender, 1);
+    }
+
+    factory.RunFor(chrono::milliseconds(10));
+  }
+
+  ASSERT_TRUE(fetcher.FetchNext());
+  EXPECT_EQ(fetcher.context().monotonic_remote_time,
+            monotonic_clock::epoch() + chrono::microseconds(5000));
+  // First message shows up after wakeup + network delay as expected.
+  EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+            monotonic_clock::epoch() + chrono::microseconds(5000) +
+                factory.send_delay());
+  EXPECT_EQ(fetcher.context().monotonic_event_time,
+            monotonic_clock::epoch() + chrono::microseconds(5000) +
+                factory.send_delay() + factory.network_delay());
+
+  ASSERT_TRUE(fetcher.FetchNext());
+  EXPECT_EQ(fetcher.context().monotonic_remote_time,
+            monotonic_clock::epoch() + chrono::microseconds(5001));
+  // Next message is close enough that it gets picked up at the same wakeup.
+  EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+            monotonic_clock::epoch() + chrono::microseconds(5000) +
+                factory.send_delay());
+  EXPECT_EQ(fetcher.context().monotonic_event_time,
+            monotonic_clock::epoch() + chrono::microseconds(5000) +
+                factory.send_delay() + factory.network_delay());
+
+  ASSERT_TRUE(fetcher.FetchNext());
+  EXPECT_EQ(fetcher.context().monotonic_remote_time,
+            monotonic_clock::epoch() + chrono::microseconds(5003));
+  // Same for the third.
+  EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+            monotonic_clock::epoch() + chrono::microseconds(5000) +
+                factory.send_delay());
+  EXPECT_EQ(fetcher.context().monotonic_event_time,
+            monotonic_clock::epoch() + chrono::microseconds(5000) +
+                factory.send_delay() + factory.network_delay());
+
+  ASSERT_TRUE(fetcher.FetchNext());
+  EXPECT_EQ(fetcher.context().monotonic_remote_time,
+            monotonic_clock::epoch() + chrono::microseconds(5073));
+  // Fourth waits long enough to do the right thing.
+  EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+            monotonic_clock::epoch() + chrono::microseconds(5073) +
+                factory.send_delay());
+  EXPECT_EQ(fetcher.context().monotonic_event_time,
+            monotonic_clock::epoch() + chrono::microseconds(5073) +
+                factory.send_delay() + factory.network_delay());
+
+  ASSERT_TRUE(fetcher.FetchNext());
+  EXPECT_EQ(fetcher.context().monotonic_remote_time,
+            monotonic_clock::epoch() + chrono::microseconds(5136));
+  // Fifth waits long enough to do the right thing as well (but kicks off while
+  // the fourth is in flight over the network).
+  EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+            monotonic_clock::epoch() + chrono::microseconds(5136) +
+                factory.send_delay());
+  EXPECT_EQ(fetcher.context().monotonic_event_time,
+            monotonic_clock::epoch() + chrono::microseconds(5136) +
+                factory.send_delay() + factory.network_delay());
+
+  ASSERT_FALSE(fetcher.FetchNext());
+}
+
+// Tests that a reliable message gets forwarded if it was sent originally when
+// nodes were disconnected.
+TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageSendsOnConnect) {
+  time.StartEqual();
+  factory.SkipTimingReport();
+  factory.DisableStatistics();
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  // Fully disconnect the nodes.
+  pi1->Disconnect(pi2->node());
+  pi2->Disconnect(pi1->node());
+
+  std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
+
+  std::unique_ptr<aos::EventLoop> pi2_event_loop =
+      pi2->MakeEventLoop("fetcher");
+  aos::Fetcher<examples::Ping> pi2_reliable_fetcher =
+      pi2_event_loop->MakeFetcher<examples::Ping>("/reliable");
+
+  factory.RunFor(chrono::milliseconds(100));
+
+  {
+    aos::Sender<examples::Ping> pi1_reliable_sender =
+        pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+    for (int i = 0; i < 100; ++i) {
+      SendPing(&pi1_reliable_sender, i);
+      factory.RunFor(chrono::milliseconds(100));
+    }
+  }
+
+  factory.RunFor(chrono::milliseconds(50));
+
+  ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
+
+  pi1->Connect(pi2->node());
+  pi2->Connect(pi1->node());
+
+  factory.RunFor(chrono::milliseconds(1));
+
+  ASSERT_TRUE(pi2_reliable_fetcher.Fetch());
+  ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_time,
+            monotonic_clock::epoch() + chrono::milliseconds(10000));
+  ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_remote_transmit_time,
+            monotonic_clock::epoch() + chrono::milliseconds(10150));
+  ASSERT_EQ(pi2_reliable_fetcher.context().monotonic_event_time,
+            monotonic_clock::epoch() + chrono::milliseconds(10150) +
+                factory.network_delay());
+  ASSERT_EQ(pi2_reliable_fetcher->value(), 99);
+
+  ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
+}
+
 }  // namespace aos::testing