Fix transmit timestamp logic in SimulatedNetworkBridge

We were seeing a case when monotonic_remote_transmit_times_ got out of
sync with the fetcher.  This happened when another spurrious wakeup
triggered us to FetchNext out of sync with the actual wakeups.

This really just points to the logic being overly convoluted.  Use
monotonic_remote_transmit_times_ as the source of truth for everything
and sync the fetcher to it instead, and only feed it with timestamps on
the actual wakeups (reliable messages at startup + reconnect need some
special handling, but everything else is easy).

Change-Id: I9c1cabda98ea9765129e6b5347d2a676536d3872
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/simulated_event_loop_test.cc b/aos/events/simulated_event_loop_test.cc
index b717ea3..7a910b9 100644
--- a/aos/events/simulated_event_loop_test.cc
+++ b/aos/events/simulated_event_loop_test.cc
@@ -2495,6 +2495,63 @@
   }
 }
 
+// Simple class to call a function at a time with a timer.
+class FunctionScheduler {
+ public:
+  FunctionScheduler(aos::EventLoop *event_loop)
+      : event_loop_(event_loop), timer_(event_loop_->AddTimer([this]() {
+          IncrementTestTimer(event_loop_->context().monotonic_event_time);
+        })) {
+    timer_->set_name("function_timer");
+    event_loop_->OnRun([this]() {
+      IncrementTestTimer(event_loop_->context().monotonic_event_time);
+    });
+  }
+
+  // Schedules the function to be run at the provided time.
+  void ScheduleAt(std::function<void()> &&function,
+                  aos::monotonic_clock::time_point time) {
+    functions_.insert(std::make_pair(time, std::move(function)));
+    timer_->Schedule(functions_.begin()->first);
+  }
+
+ private:
+  void IncrementTestTimer(aos::monotonic_clock::time_point now) {
+    while (true) {
+      if (functions_.empty()) return;
+      if (functions_.begin()->first > now) {
+        break;
+      }
+      CHECK_EQ(functions_.begin()->first, now);
+
+      functions_.begin()->second();
+      functions_.erase(functions_.begin());
+    }
+    timer_->Schedule(functions_.begin()->first);
+  }
+
+  aos::EventLoop *event_loop_;
+  aos::TimerHandler *timer_;
+
+  std::multimap<aos::monotonic_clock::time_point, std::function<void()>>
+      functions_;
+};
+
+// Struct to capture the expected time a message should be received (and it's
+// value).  This is from the perspective of the node receiving the message.
+struct ExpectedTimestamps {
+  // The time that the message was published on the sending node's monotonic
+  // clock.
+  monotonic_clock::time_point remote_time;
+  // The time that the message was virtually transmitted over the virtual
+  // network on the sending node's monotonic clock.
+  monotonic_clock::time_point remote_transmit_time;
+  // The time that the message was received on the receiving node's clock.
+  monotonic_clock::time_point event_time;
+  // The value inside the message.
+  int value;
+};
+
 // Tests that rapidly sent messages get timestamped correctly.
 TEST(SimulatedEventLoopTest, TransmitTimestamps) {
   aos::FlatbufferDetachedBuffer<aos::Configuration> config =
@@ -2517,74 +2574,90 @@
 
   {
     ::std::unique_ptr<EventLoop> ping_event_loop = pi1->MakeEventLoop("ping");
+    FunctionScheduler run_at(ping_event_loop.get());
     aos::Sender<examples::Ping> test_message_sender =
         ping_event_loop->MakeSender<examples::Ping>("/reliable");
+    aos::monotonic_clock::time_point now = ping_event_loop->monotonic_now();
     for (const std::chrono::nanoseconds dt :
          {chrono::microseconds(5000), chrono::microseconds(1),
           chrono::microseconds(2), chrono::microseconds(70),
-          chrono::microseconds(63)}) {
-      factory.RunFor(dt);
-      SendPing(&test_message_sender, 1);
+          chrono::microseconds(63), chrono::microseconds(140)}) {
+      now += dt;
+      run_at.ScheduleAt([&]() { SendPing(&test_message_sender, 1); }, now);
     }
 
-    factory.RunFor(chrono::milliseconds(10));
+    now += chrono::milliseconds(10);
+
+    factory.RunFor(now - ping_event_loop->monotonic_now());
   }
 
-  ASSERT_TRUE(fetcher.FetchNext());
-  EXPECT_EQ(fetcher.context().monotonic_remote_time,
-            monotonic_clock::epoch() + chrono::microseconds(5000));
-  // First message shows up after wakeup + network delay as expected.
-  EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
-            monotonic_clock::epoch() + chrono::microseconds(5000) +
-                factory.send_delay());
-  EXPECT_EQ(fetcher.context().monotonic_event_time,
-            monotonic_clock::epoch() + chrono::microseconds(5000) +
-                factory.send_delay() + factory.network_delay());
+  const monotonic_clock::time_point e = monotonic_clock::epoch();
+  const chrono::nanoseconds send_delay = factory.send_delay();
+  const chrono::nanoseconds network_delay = factory.network_delay();
 
-  ASSERT_TRUE(fetcher.FetchNext());
-  EXPECT_EQ(fetcher.context().monotonic_remote_time,
-            monotonic_clock::epoch() + chrono::microseconds(5001));
-  // Next message is close enough that it gets picked up at the same wakeup.
-  EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
-            monotonic_clock::epoch() + chrono::microseconds(5000) +
-                factory.send_delay());
-  EXPECT_EQ(fetcher.context().monotonic_event_time,
-            monotonic_clock::epoch() + chrono::microseconds(5000) +
-                factory.send_delay() + factory.network_delay());
+  const std::vector<ExpectedTimestamps> expected_values = {
+      // First message shows up after wakeup + network delay as expected.
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(5000),
+          .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
+          .event_time =
+              e + chrono::microseconds(5000) + send_delay + network_delay,
+          .value = 1,
+      },
+      // Next message is close enough that it gets picked up at the same wakeup.
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(5001),
+          .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
+          .event_time =
+              e + chrono::microseconds(5000) + send_delay + network_delay,
+          .value = 1,
+      },
+      // Same for the third.
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(5003),
+          .remote_transmit_time = e + chrono::microseconds(5000) + send_delay,
+          .event_time =
+              e + chrono::microseconds(5000) + send_delay + network_delay,
+          .value = 1,
+      },
+      // Fourth waits long enough to do the right thing.
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(5073),
+          .remote_transmit_time = e + chrono::microseconds(5073) + send_delay,
+          .event_time =
+              e + chrono::microseconds(5073) + send_delay + network_delay,
+          .value = 1,
+      },
+      // Fifth waits long enough to do the right thing as well (but kicks off
+      // while the fourth is in flight over the network).
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(5136),
+          .remote_transmit_time = e + chrono::microseconds(5136) + send_delay,
+          .event_time =
+              e + chrono::microseconds(5136) + send_delay + network_delay,
+          .value = 1,
+      },
+      // Sixth waits long enough to do the right thing as well (but kicks off
+      // while the fifth is in flight over the network and has almost landed).
+      // The timer wakeup for the Timestamp message coming back will find the
+      // sixth message a little bit early.
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(5276),
+          .remote_transmit_time = e + chrono::microseconds(5273) + send_delay,
+          .event_time =
+              e + chrono::microseconds(5273) + send_delay + network_delay,
+          .value = 1,
+      },
+  };
 
-  ASSERT_TRUE(fetcher.FetchNext());
-  EXPECT_EQ(fetcher.context().monotonic_remote_time,
-            monotonic_clock::epoch() + chrono::microseconds(5003));
-  // Same for the third.
-  EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
-            monotonic_clock::epoch() + chrono::microseconds(5000) +
-                factory.send_delay());
-  EXPECT_EQ(fetcher.context().monotonic_event_time,
-            monotonic_clock::epoch() + chrono::microseconds(5000) +
-                factory.send_delay() + factory.network_delay());
-
-  ASSERT_TRUE(fetcher.FetchNext());
-  EXPECT_EQ(fetcher.context().monotonic_remote_time,
-            monotonic_clock::epoch() + chrono::microseconds(5073));
-  // Fourth waits long enough to do the right thing.
-  EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
-            monotonic_clock::epoch() + chrono::microseconds(5073) +
-                factory.send_delay());
-  EXPECT_EQ(fetcher.context().monotonic_event_time,
-            monotonic_clock::epoch() + chrono::microseconds(5073) +
-                factory.send_delay() + factory.network_delay());
-
-  ASSERT_TRUE(fetcher.FetchNext());
-  EXPECT_EQ(fetcher.context().monotonic_remote_time,
-            monotonic_clock::epoch() + chrono::microseconds(5136));
-  // Fifth waits long enough to do the right thing as well (but kicks off while
-  // the fourth is in flight over the network).
-  EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
-            monotonic_clock::epoch() + chrono::microseconds(5136) +
-                factory.send_delay());
-  EXPECT_EQ(fetcher.context().monotonic_event_time,
-            monotonic_clock::epoch() + chrono::microseconds(5136) +
-                factory.send_delay() + factory.network_delay());
+  for (const ExpectedTimestamps value : expected_values) {
+    ASSERT_TRUE(fetcher.FetchNext());
+    EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
+    EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+              value.remote_transmit_time);
+    EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
+    EXPECT_EQ(fetcher->value(), value.value);
+  }
 
   ASSERT_FALSE(fetcher.FetchNext());
 }
@@ -2615,13 +2688,17 @@
   {
     aos::Sender<examples::Ping> pi1_reliable_sender =
         pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+    FunctionScheduler run_at(pi1_event_loop.get());
+    aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
     for (int i = 0; i < 100; ++i) {
-      SendPing(&pi1_reliable_sender, i);
-      factory.RunFor(chrono::milliseconds(100));
+      run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_reliable_sender, i); },
+                        now);
+      now += chrono::milliseconds(100);
     }
-  }
+    now += chrono::milliseconds(50);
 
-  factory.RunFor(chrono::milliseconds(50));
+    factory.RunFor(now - pi1_event_loop->monotonic_now());
+  }
 
   ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
 
@@ -2640,7 +2717,345 @@
                 factory.network_delay());
   ASSERT_EQ(pi2_reliable_fetcher->value(), 99);
 
+  // TODO(austin): Verify that the dropped packet count increases.
+
   ASSERT_FALSE(pi2_reliable_fetcher.Fetch());
 }
 
+// Tests that if we disconnect while a message is in various states of being
+// queued, it gets either dropped or sent as expected.
+TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringDisconnect) {
+  time.StartEqual();
+  factory.SkipTimingReport();
+  factory.DisableStatistics();
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
+
+  std::unique_ptr<aos::EventLoop> pi2_event_loop =
+      pi2->MakeEventLoop("fetcher");
+  aos::Fetcher<examples::Ping> fetcher =
+      pi2_event_loop->MakeFetcher<examples::Ping>("/unreliable");
+
+  ASSERT_FALSE(fetcher.Fetch());
+
+  aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
+  {
+    FunctionScheduler run_at(pi1_event_loop.get());
+    aos::Sender<examples::Ping> pi1_sender =
+        pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
+
+    int i = 0;
+    for (const std::chrono::nanoseconds dt :
+         {chrono::microseconds(5000), chrono::microseconds(1),
+          chrono::microseconds(2), chrono::microseconds(70),
+          chrono::microseconds(63), chrono::microseconds(140),
+          chrono::microseconds(160)}) {
+      run_at.ScheduleAt(
+          [&]() {
+            pi1->Connect(pi2->node());
+            pi2->Connect(pi1->node());
+          },
+          now);
+
+      now += chrono::milliseconds(100);
+
+      run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); }, now);
+
+      now += dt;
+
+      run_at.ScheduleAt(
+          [&]() {
+            // Fully disconnect the nodes.
+            pi1->Disconnect(pi2->node());
+            pi2->Disconnect(pi1->node());
+          },
+          now);
+
+      now += chrono::milliseconds(100) - dt;
+      ++i;
+    }
+
+    factory.RunFor(now - pi1_event_loop->monotonic_now());
+  }
+
+  const monotonic_clock::time_point e = monotonic_clock::epoch();
+  const chrono::nanoseconds send_delay = factory.send_delay();
+  const chrono::nanoseconds network_delay = factory.network_delay();
+
+  const std::vector<ExpectedTimestamps> expected_values = {
+      ExpectedTimestamps{
+          .remote_time = e + chrono::milliseconds(100),
+          .remote_transmit_time = e + chrono::milliseconds(100) + send_delay,
+          .event_time =
+              e + chrono::milliseconds(100) + send_delay + network_delay,
+          .value = 0,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::milliseconds(1300),
+          .remote_transmit_time = e + chrono::milliseconds(1300) + send_delay,
+          .event_time =
+              e + chrono::milliseconds(1300) + send_delay + network_delay,
+          .value = 6,
+      },
+  };
+
+  for (const ExpectedTimestamps value : expected_values) {
+    ASSERT_TRUE(fetcher.FetchNext());
+    EXPECT_EQ(fetcher.context().monotonic_remote_time, value.remote_time);
+    EXPECT_EQ(fetcher.context().monotonic_remote_transmit_time,
+              value.remote_transmit_time);
+    EXPECT_EQ(fetcher.context().monotonic_event_time, value.event_time);
+    EXPECT_EQ(fetcher->value(), value.value);
+  }
+
+  // TODO(austin): Verify that the dropped packet count increases.
+
+  ASSERT_FALSE(fetcher.Fetch());
+}
+
+class PingLogger {
+ public:
+  PingLogger(aos::EventLoop *event_loop, std::string_view channel,
+             std::vector<std::pair<aos::Context, int>> *msgs)
+      : event_loop_(event_loop),
+        fetcher_(event_loop_->MakeFetcher<examples::Ping>(channel)),
+        msgs_(msgs) {
+    event_loop_->OnRun([this]() { CHECK(!fetcher_.Fetch()); });
+  }
+
+  ~PingLogger() {
+    while (fetcher_.FetchNext()) {
+      msgs_->emplace_back(fetcher_.context(), fetcher_->value());
+    }
+  }
+
+ private:
+  aos::EventLoop *event_loop_;
+  aos::Fetcher<examples::Ping> fetcher_;
+  std::vector<std::pair<aos::Context, int>> *msgs_;
+};
+
+// Tests that rebooting while a message is in flight works as expected.
+TEST_F(SimulatedEventLoopDisconnectTest, MessageInFlightDuringReboot) {
+  time.StartEqual();
+  for (int i = 0; i < 8; ++i) {
+    time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
+  }
+
+  factory.SkipTimingReport();
+  factory.DisableStatistics();
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
+
+  aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
+  FunctionScheduler run_at(pi1_event_loop.get());
+  aos::Sender<examples::Ping> pi1_sender =
+      pi1_event_loop->MakeSender<examples::Ping>("/unreliable");
+
+  int i = 0;
+  for (const std::chrono::nanoseconds dt :
+       {chrono::microseconds(5000), chrono::microseconds(1),
+        chrono::microseconds(2), chrono::microseconds(70),
+        chrono::microseconds(63), chrono::microseconds(140),
+        chrono::microseconds(160)}) {
+    run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
+                      now + chrono::seconds(10) - dt);
+
+    now += chrono::seconds(10);
+    ++i;
+  }
+
+  std::vector<std::pair<aos::Context, int>> msgs;
+
+  pi2->OnStartup([pi2, &msgs]() {
+    pi2->AlwaysStart<PingLogger>("ping_logger", "/unreliable", &msgs);
+  });
+
+  factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
+
+  const monotonic_clock::time_point e = monotonic_clock::epoch();
+  const chrono::nanoseconds send_delay = factory.send_delay();
+  const chrono::nanoseconds network_delay = factory.network_delay();
+
+  const std::vector<ExpectedTimestamps> expected_values = {
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(9995000),
+          .remote_transmit_time =
+              e + chrono::microseconds(9995000) + send_delay,
+          .event_time =
+              e + chrono::microseconds(9995000) + send_delay + network_delay,
+          .value = 0,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(19999999),
+          .remote_transmit_time =
+              e + chrono::microseconds(19999999) + send_delay,
+          .event_time =
+              e + chrono::microseconds(-1) + send_delay + network_delay,
+          .value = 1,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(29999998),
+          .remote_transmit_time =
+              e + chrono::microseconds(29999998) + send_delay,
+          .event_time =
+              e + chrono::microseconds(-2) + send_delay + network_delay,
+          .value = 2,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(69999840),
+          .remote_transmit_time =
+              e + chrono::microseconds(69999840) + send_delay,
+          .event_time =
+              e + chrono::microseconds(9999840) + send_delay + network_delay,
+          .value = 6,
+      },
+  };
+
+  ASSERT_EQ(msgs.size(), expected_values.size());
+
+  for (size_t i = 0; i < msgs.size(); ++i) {
+    EXPECT_EQ(msgs[i].first.monotonic_remote_time,
+              expected_values[i].remote_time);
+    EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
+              expected_values[i].remote_transmit_time);
+    EXPECT_EQ(msgs[i].first.monotonic_event_time,
+              expected_values[i].event_time);
+    EXPECT_EQ(msgs[i].second, expected_values[i].value);
+  }
+
+  // TODO(austin): Verify that the dropped packet count increases.
+}
+
+// Tests that rebooting while a message is in flight works as expected.
+TEST_F(SimulatedEventLoopDisconnectTest, ReliableMessageInFlightDuringReboot) {
+  time.StartEqual();
+  for (int i = 0; i < 8; ++i) {
+    time.RebootAt(1, distributed_clock::epoch() + chrono::seconds(10 * i));
+  }
+
+  factory.SkipTimingReport();
+  factory.DisableStatistics();
+
+  NodeEventLoopFactory *pi1 = factory.GetNodeEventLoopFactory("pi1");
+  NodeEventLoopFactory *pi2 = factory.GetNodeEventLoopFactory("pi2");
+
+  std::unique_ptr<aos::EventLoop> pi1_event_loop = pi1->MakeEventLoop("sender");
+
+  aos::monotonic_clock::time_point now = pi1_event_loop->monotonic_now();
+  FunctionScheduler run_at(pi1_event_loop.get());
+  aos::Sender<examples::Ping> pi1_sender =
+      pi1_event_loop->MakeSender<examples::Ping>("/reliable");
+
+  int i = 0;
+  for (const std::chrono::nanoseconds dt :
+       {chrono::microseconds(5000), chrono::microseconds(1),
+        chrono::microseconds(2), chrono::microseconds(70),
+        chrono::microseconds(63), chrono::microseconds(140),
+        chrono::microseconds(160)}) {
+    run_at.ScheduleAt([&, i = i]() { SendPing(&pi1_sender, i); },
+                      now + chrono::seconds(10) - dt);
+
+    now += chrono::seconds(10);
+    ++i;
+  }
+
+  std::vector<std::pair<aos::Context, int>> msgs;
+
+  PingLogger *logger;
+  pi2->OnStartup([pi2, &msgs, &logger]() {
+    logger = pi2->AlwaysStart<PingLogger>("ping_logger", "/reliable", &msgs);
+  });
+
+  factory.RunFor(now - pi1_event_loop->monotonic_now() + chrono::seconds(10));
+
+  // Stop the logger to flush the last boot of data.
+  pi2->Stop(logger);
+
+  const monotonic_clock::time_point e = monotonic_clock::epoch();
+  const chrono::nanoseconds send_delay = factory.send_delay();
+  const chrono::nanoseconds network_delay = factory.network_delay();
+
+  // Verified using --vmodule=simulated_event_loop=1 and looking at the actual
+  // event times to confirm what should have been forwarded when.
+  const std::vector<ExpectedTimestamps> expected_values = {
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(9995000),
+          .remote_transmit_time =
+              e + chrono::microseconds(9995000) + send_delay,
+          .event_time =
+              e + chrono::microseconds(9995000) + send_delay + network_delay,
+          .value = 0,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(9995000),
+          .remote_transmit_time = e + chrono::microseconds(10000000),
+          .event_time = e + network_delay,
+          .value = 0,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(19999999),
+          .remote_transmit_time = e + chrono::microseconds(20000000),
+          .event_time = e + network_delay,
+          .value = 1,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(29999998),
+          .remote_transmit_time = e + chrono::microseconds(30000000),
+          .event_time = e + network_delay,
+          .value = 2,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(39999930),
+          .remote_transmit_time = e + chrono::microseconds(40000000),
+          .event_time = e + network_delay,
+          .value = 3,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(49999937),
+          .remote_transmit_time = e + chrono::microseconds(50000000),
+          .event_time = e + network_delay,
+          .value = 4,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(59999860),
+          .remote_transmit_time = e + chrono::microseconds(60000000),
+          .event_time = e + network_delay,
+          .value = 5,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(69999840),
+          .remote_transmit_time = e + chrono::microseconds(69999890),
+          .event_time = e + chrono::microseconds(9999890) + network_delay,
+          .value = 6,
+      },
+      ExpectedTimestamps{
+          .remote_time = e + chrono::microseconds(69999840),
+          .remote_transmit_time = e + chrono::microseconds(70000000),
+          .event_time = e + network_delay,
+          .value = 6,
+      },
+  };
+
+  ASSERT_EQ(msgs.size(), expected_values.size());
+
+  for (size_t i = 0; i < msgs.size(); ++i) {
+    EXPECT_EQ(msgs[i].first.monotonic_remote_time,
+              expected_values[i].remote_time);
+    EXPECT_EQ(msgs[i].first.monotonic_remote_transmit_time,
+              expected_values[i].remote_transmit_time);
+    EXPECT_EQ(msgs[i].first.monotonic_event_time,
+              expected_values[i].event_time);
+    EXPECT_EQ(msgs[i].second, expected_values[i].value);
+  }
+
+  // TODO(austin): Verify that the dropped packet count increases.
+}
+
 }  // namespace aos::testing