Logger: Pipe the monotonic_remote_transmit_time through event loop

Populate the field in all the network bridges and pipe it through
all the required spots in the event loop.  Update all the tests to match
the update.

As part of this, we realized that our modeling of network delay was
wrong.  Wakeups don't always take 50 uS if something else triggers the
wakeup and the message is ready in shared memory.  This wasn't being
handled properly.

Change-Id: Idf94c5c6d7c87f4d65868c71b1cceedca7bf3853
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index a3cda43..5dddc60 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -490,11 +490,18 @@
         EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
                   pi2_ping_count * chrono::milliseconds(10) +
                       realtime_clock::epoch());
-        EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time +
-                      chrono::microseconds(150),
+        // The message at the start of each second doesn't have wakeup latency
+        // since timing reports and server statistics wake us up already at that
+        // point in time.
+        chrono::nanoseconds offset = chrono::microseconds(150);
+        if (pi2_event_loop->context().monotonic_remote_time.time_since_epoch() %
+                chrono::seconds(1) ==
+            chrono::seconds(0)) {
+          offset = chrono::microseconds(100);
+        }
+        EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time + offset,
                   pi2_event_loop->context().monotonic_event_time);
-        EXPECT_EQ(pi2_event_loop->context().realtime_remote_time +
-                      chrono::microseconds(150),
+        EXPECT_EQ(pi2_event_loop->context().realtime_remote_time + offset,
                   pi2_event_loop->context().realtime_event_time);
         ++pi2_ping_count;
       });
@@ -502,63 +509,76 @@
   constexpr ssize_t kQueueIndexOffset = -9;
   // Confirm that the ping and pong counts both match, and the value also
   // matches.
-  pi1_event_loop->MakeWatcher(
-      "/test", [&pi1_event_loop, &pi1_ping_count,
-                &pi1_pong_count](const examples::Pong &pong) {
-        VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
-                << pi1_event_loop->context().monotonic_remote_time << " -> "
-                << pi1_event_loop->context().monotonic_event_time;
+  pi1_event_loop->MakeWatcher("/test", [&pi1_event_loop, &pi1_ping_count,
+                                        &pi1_pong_count](
+                                           const examples::Pong &pong) {
+    VLOG(1) << "Pi1 pong " << FlatbufferToJson(&pong) << " at "
+            << pi1_event_loop->context().monotonic_remote_time << " -> "
+            << pi1_event_loop->context().monotonic_event_time;
 
-        EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
-                  pi1_pong_count + kQueueIndexOffset);
-        EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
-                  chrono::microseconds(200) +
-                      pi1_pong_count * chrono::milliseconds(10) +
-                      monotonic_clock::epoch());
-        EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
-                  chrono::microseconds(200) +
-                      pi1_pong_count * chrono::milliseconds(10) +
-                      realtime_clock::epoch());
+    EXPECT_EQ(pi1_event_loop->context().remote_queue_index,
+              pi1_pong_count + kQueueIndexOffset);
 
-        EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
-                      chrono::microseconds(150),
-                  pi1_event_loop->context().monotonic_event_time);
-        EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
-                      chrono::microseconds(150),
-                  pi1_event_loop->context().realtime_event_time);
+    chrono::nanoseconds offset = chrono::microseconds(200);
+    if ((pi1_event_loop->context().monotonic_remote_time.time_since_epoch() -
+         chrono::microseconds(150)) %
+            chrono::seconds(1) ==
+        chrono::seconds(0)) {
+      offset = chrono::microseconds(150);
+    }
 
-        EXPECT_EQ(pong.value(), pi1_pong_count + 1);
-        ++pi1_pong_count;
-        EXPECT_EQ(pi1_ping_count, pi1_pong_count);
-      });
-  pi2_event_loop->MakeWatcher(
-      "/test", [&pi2_event_loop, &pi2_ping_count,
-                &pi2_pong_count](const examples::Pong &pong) {
-        VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
-                << pi2_event_loop->context().monotonic_remote_time << " -> "
-                << pi2_event_loop->context().monotonic_event_time;
+    EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time,
+              offset + pi1_pong_count * chrono::milliseconds(10) +
+                  monotonic_clock::epoch());
+    EXPECT_EQ(pi1_event_loop->context().realtime_remote_time,
+              offset + pi1_pong_count * chrono::milliseconds(10) +
+                  realtime_clock::epoch());
 
-        EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
-                  pi2_pong_count + kQueueIndexOffset);
+    EXPECT_EQ(pi1_event_loop->context().monotonic_remote_time +
+                  chrono::microseconds(150),
+              pi1_event_loop->context().monotonic_event_time);
+    EXPECT_EQ(pi1_event_loop->context().realtime_remote_time +
+                  chrono::microseconds(150),
+              pi1_event_loop->context().realtime_event_time);
 
-        EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
-                  chrono::microseconds(200) +
-                      pi2_pong_count * chrono::milliseconds(10) +
-                      monotonic_clock::epoch());
-        EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
-                  chrono::microseconds(200) +
-                      pi2_pong_count * chrono::milliseconds(10) +
-                      realtime_clock::epoch());
+    EXPECT_EQ(pong.value(), pi1_pong_count + 1);
+    ++pi1_pong_count;
+    EXPECT_EQ(pi1_ping_count, pi1_pong_count);
+  });
+  pi2_event_loop->MakeWatcher("/test", [&pi2_event_loop, &pi2_ping_count,
+                                        &pi2_pong_count](
+                                           const examples::Pong &pong) {
+    VLOG(1) << "Pi2 pong " << FlatbufferToJson(&pong) << " at "
+            << pi2_event_loop->context().monotonic_remote_time << " -> "
+            << pi2_event_loop->context().monotonic_event_time;
 
-        EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
-                  pi2_event_loop->context().monotonic_event_time);
-        EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
-                  pi2_event_loop->context().realtime_event_time);
+    EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
+              pi2_pong_count + kQueueIndexOffset);
 
-        EXPECT_EQ(pong.value(), pi2_pong_count + 1);
-        ++pi2_pong_count;
-        EXPECT_EQ(pi2_ping_count, pi2_pong_count);
-      });
+    chrono::nanoseconds offset = chrono::microseconds(200);
+    if ((pi2_event_loop->context().monotonic_remote_time.time_since_epoch() -
+         chrono::microseconds(150)) %
+            chrono::seconds(1) ==
+        chrono::seconds(0)) {
+      offset = chrono::microseconds(150);
+    }
+
+    EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
+              offset + pi2_pong_count * chrono::milliseconds(10) +
+                  monotonic_clock::epoch());
+    EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
+              offset + pi2_pong_count * chrono::milliseconds(10) +
+                  realtime_clock::epoch());
+
+    EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
+              pi2_event_loop->context().monotonic_event_time);
+    EXPECT_EQ(pi2_event_loop->context().realtime_remote_time,
+              pi2_event_loop->context().realtime_event_time);
+
+    EXPECT_EQ(pong.value(), pi2_pong_count + 1);
+    ++pi2_pong_count;
+    EXPECT_EQ(pi2_ping_count, pi2_pong_count);
+  });
 
   log_reader_factory.Run();
   EXPECT_EQ(pi1_ping_count, 2010);
@@ -1908,11 +1928,28 @@
             ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
             pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
             pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
+            // Timestamps don't have wakeup delay, so they show back up after 2
+            // times the network delay on the source node.  Confirm that matches
+            // when we are reading the log.
+            EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
+                      pi1_context->monotonic_event_time + 2 * network_delay);
           } else if (header.channel_index() == ping_timestamp_channel) {
             ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
             ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
             pi1_context = &ping_on_pi1_fetcher.context();
             pi2_context = &ping_on_pi2_fetcher.context();
+            // Ping messages get picked up faster at the start of each message
+            // when timers wake up.  Verify all that behavior matches exactly as
+            // expected when reading the log.
+            EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
+                      pi1_context->monotonic_event_time + 2 * network_delay +
+                          ((pi1_event_loop->context().monotonic_event_time -
+                            2 * network_delay)
+                                           .time_since_epoch() %
+                                       chrono::nanoseconds(1000000000) ==
+                                   chrono::nanoseconds(0)
+                               ? chrono::nanoseconds(0)
+                               : send_delay));
           } else {
             LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
                        << configuration::CleanedChannelToString(
@@ -1942,13 +1979,6 @@
                     header_realtime_remote_time);
           EXPECT_EQ(pi1_context->monotonic_event_time,
                     header_monotonic_remote_time);
-
-          // Time estimation isn't perfect, but we know the clocks were
-          // identical when logged, so we know when this should have come back.
-          // Confirm we got it when we expected.
-          EXPECT_EQ(pi1_event_loop->context().monotonic_event_time,
-                    pi1_context->monotonic_event_time + 2 * network_delay +
-                        send_delay);
         });
   }
   for (std::pair<int, std::string> channel :
@@ -1988,11 +2018,20 @@
             ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
             pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
             pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
+            // Again, timestamps don't have wakeup delay, so they show back up
+            // after 2 times the network delay on the source node.
+            EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
+                      pi2_context->monotonic_event_time + 2 * network_delay);
           } else if (header.channel_index() == pong_timestamp_channel) {
             ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
             ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
             pi2_context = &pong_on_pi2_fetcher.context();
             pi1_context = &pong_on_pi1_fetcher.context();
+            // And Pong messages come back repeatably since they aren't at the
+            // start of a second.
+            EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
+                      pi2_context->monotonic_event_time + 2 * network_delay +
+                          send_delay);
           } else {
             LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
                        << configuration::CleanedChannelToString(
@@ -2022,13 +2061,6 @@
                     header_realtime_remote_time);
           EXPECT_EQ(pi2_context->monotonic_event_time,
                     header_monotonic_remote_time);
-
-          // Time estimation isn't perfect, but we know the clocks were
-          // identical when logged, so we know when this should have come back.
-          // Confirm we got it when we expected.
-          EXPECT_EQ(pi2_event_loop->context().monotonic_event_time,
-                    pi2_context->monotonic_event_time + 2 * network_delay +
-                        send_delay);
         });
   }
 
@@ -2266,7 +2298,7 @@
           case 3:
             EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
             ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
-                                                chrono::nanoseconds(2322999462))
+                                                chrono::nanoseconds(2323000000))
                 << " on " << file;
             break;
           default:
@@ -2293,7 +2325,7 @@
           case 5:
             EXPECT_EQ(source_node_boot_uuid, pi2_boot1);
             ASSERT_EQ(monotonic_start_time, monotonic_clock::epoch() +
-                                                chrono::nanoseconds(2322999462))
+                                                chrono::nanoseconds(2323000000))
                 << " on " << file;
             break;
           default:
@@ -2439,7 +2471,7 @@
                     monotonic_clock::time_point(chrono::microseconds(100000)))
               << file;
           EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                    monotonic_clock::time_point(chrono::microseconds(100150)))
+                    monotonic_clock::time_point(chrono::microseconds(100100)))
               << file;
           break;
         case 2:
@@ -2479,7 +2511,7 @@
                     monotonic_clock::time_point(chrono::microseconds(1423000)))
               << file;
           EXPECT_EQ(oldest_local_reliable_monotonic_timestamps,
-                    monotonic_clock::time_point(chrono::microseconds(10200150)))
+                    monotonic_clock::time_point(chrono::microseconds(10200100)))
               << file;
           break;
         default:
@@ -3427,7 +3459,7 @@
                                                       chrono::seconds(1)));
   EXPECT_THAT(result[2].second,
               ::testing::ElementsAre(realtime_clock::epoch() +
-                                     chrono::microseconds(34900150)));
+                                     chrono::microseconds(34900100)));
 }
 
 // Tests that local data before remote data after reboot is properly replayed.
@@ -3579,7 +3611,7 @@
   EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
   EXPECT_THAT(result[0].second,
               ::testing::ElementsAre(realtime_clock::epoch() +
-                                     chrono::microseconds(11000350)));
+                                     chrono::microseconds(11000300)));
 
   EXPECT_THAT(result[1].first,
               ::testing::ElementsAre(
@@ -3587,13 +3619,13 @@
                   realtime_clock::epoch() + chrono::microseconds(107005000)));
   EXPECT_THAT(result[1].second,
               ::testing::ElementsAre(
-                  realtime_clock::epoch() + chrono::microseconds(4000150),
-                  realtime_clock::epoch() + chrono::microseconds(111000200)));
+                  realtime_clock::epoch() + chrono::microseconds(4000100),
+                  realtime_clock::epoch() + chrono::microseconds(111000150)));
 
   EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
   EXPECT_THAT(result[2].second,
               ::testing::ElementsAre(realtime_clock::epoch() +
-                                     chrono::microseconds(11000150)));
+                                     chrono::microseconds(11000100)));
 
   auto start_stop_result = ConfirmReadable(
       filenames, realtime_clock::epoch() + chrono::milliseconds(2000),
@@ -3737,7 +3769,7 @@
   EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
   EXPECT_THAT(result[0].second,
               ::testing::ElementsAre(realtime_clock::epoch() +
-                                     chrono::microseconds(11000350)));
+                                     chrono::microseconds(11000300)));
 
   EXPECT_THAT(result[1].first,
               ::testing::ElementsAre(
@@ -3745,13 +3777,13 @@
                   realtime_clock::epoch() + chrono::microseconds(6005000)));
   EXPECT_THAT(result[1].second,
               ::testing::ElementsAre(
-                  realtime_clock::epoch() + chrono::microseconds(4900150),
-                  realtime_clock::epoch() + chrono::microseconds(11000200)));
+                  realtime_clock::epoch() + chrono::microseconds(4900100),
+                  realtime_clock::epoch() + chrono::microseconds(11000150)));
 
   EXPECT_THAT(result[2].first, ::testing::ElementsAre(realtime_clock::epoch()));
   EXPECT_THAT(result[2].second,
               ::testing::ElementsAre(realtime_clock::epoch() +
-                                     chrono::microseconds(11000150)));
+                                     chrono::microseconds(11000100)));
 
   // Confirm we observed the correct start and stop times.  We should see the
   // reboot here.
@@ -3771,7 +3803,7 @@
                   realtime_clock::epoch() + chrono::microseconds(6005000)));
   EXPECT_THAT(start_stop_result[1].second,
               ::testing::ElementsAre(
-                  realtime_clock::epoch() + chrono::microseconds(4900150),
+                  realtime_clock::epoch() + chrono::microseconds(4900100),
                   realtime_clock::epoch() + chrono::seconds(8)));
   EXPECT_THAT(
       start_stop_result[2].first,