Logger: Pipe the monotonic_remote_transmit_time through event loop

Populate the field in all the network bridges and pipe it through
all the required spots in the event loop.  Update all the tests to match
the update.

As part of this, we realized that our modeling of network delay was
wrong.  Wakeups don't always take 50 uS if something else triggers the
wakeup and the message is ready in shared memory.  This wasn't being
handled properly.

Change-Id: Idf94c5c6d7c87f4d65868c71b1cceedca7bf3853
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 5c97292..82ee6d1 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -1,6 +1,7 @@
 #include "aos/events/simulated_network_bridge.h"
 
 #include "absl/strings/str_cat.h"
+#include "glog/logging.h"
 
 #include "aos/configuration.h"
 #include "aos/events/event_loop.h"
@@ -42,6 +43,8 @@
                          MessageBridgeServerStatus *server_status,
                          ChannelTimestampSender *timestamp_loggers) {
     sent_ = false;
+    reliable_scheduled_ = false;
+    published_ = false;
     fetch_event_loop_ = fetch_event_loop;
     if (fetch_event_loop_) {
       fetcher_ = fetch_event_loop_->MakeRawFetcher(channel_);
@@ -50,7 +53,7 @@
     }
 
     server_status_ = server_status;
-    if (server_status) {
+    if (server_status_) {
       server_connection_ =
           server_status_->FindServerConnection(send_node_factory_->node());
       server_index_ = configuration::GetNodeIndex(
@@ -84,7 +87,6 @@
 
   void SetSendEventLoop(aos::EventLoop *send_event_loop,
                         MessageBridgeClientStatus *client_status) {
-    sent_ = false;
     send_event_loop_ = send_event_loop;
     if (send_event_loop_ && !forwarding_disabled_) {
       sender_ = send_event_loop_->MakeRawSender(channel_);
@@ -126,14 +128,43 @@
         ->time_to_live();
   }
 
+  std::string Name() {
+    std::string result;
+    result +=
+        (fetch_event_loop_ ? fetch_event_loop_->node()->name()->string_view()
+                           : std::string_view("?"));
+    result += " -> ";
+    result +=
+        (send_event_loop_ ? send_event_loop_->node()->name()->string_view()
+                          : std::string_view("?"));
+    result += " ";
+    result += aos::configuration::StrippedChannelToString(channel());
+    return result;
+  }
+
   void ScheduleReliable() {
-    if (forwarding_disabled()) return;
+    if (forwarding_disabled()) {
+      return;
+    }
 
     if (!fetcher_) {
       return;
     }
     if (fetcher_->context().data == nullptr || sent_) {
-      sent_ = !fetcher_->Fetch();
+      fetcher_->Fetch();
+      sent_ = fetcher_->context().data == nullptr;
+      published_ = sent_;
+      reliable_scheduled_ = true;
+    }
+
+    if (!timer_) {
+      return;
+    }
+
+    if (server_connection_->state() != State::CONNECTED) {
+      reliable_scheduled_ = false;
+      sent_ = true;
+      return;
     }
 
     FetchNext();
@@ -154,17 +185,87 @@
         << " at " << fetch_node_factory_->monotonic_now();
 
     if (timer_) {
-      server_status_->AddSentPacket(server_index_, channel_);
-      timer_->Schedule(monotonic_delivered_time);
-      timer_scheduled_ = true;
+      if (!timer_scheduled_) {
+        server_status_->AddSentPacket(server_index_, channel_);
+        timer_->Schedule(monotonic_delivered_time);
+        timer_scheduled_ = true;
+
+        QueueTransmitTimestamp(fetcher_->context().queue_index,
+                               fetcher_->context().monotonic_event_time,
+                               fetch_event_loop_->monotonic_now());
+      }
     } else {
+      // TODO(austin): When do we hit this?  Can we add a test to make sure this
+      // is right?
       server_status_->AddDroppedPacket(server_index_, channel_);
       sent_ = true;
+      reliable_scheduled_ = false;
+      published_ = false;
     }
   }
 
   bool timer_scheduled_ = false;
 
+  void MessageWatcherCallback(uint32_t sent_queue_index,
+                              monotonic_clock::time_point monotonic_sent_time,
+                              monotonic_clock::time_point transmit_time) {
+    if (!reliable_scheduled_) {
+      QueueTransmitTimestamp(sent_queue_index, monotonic_sent_time,
+                             transmit_time);
+    } else {
+      reliable_scheduled_ = false;
+    }
+    Schedule();
+  }
+
+  void QueueTransmitTimestamp(uint32_t sent_queue_index,
+                              monotonic_clock::time_point monotonic_sent_time,
+                              monotonic_clock::time_point transmit_time) {
+    if (forwarding_disabled()) return;
+
+    if (monotonic_remote_transmit_times_.size() > 0u) {
+      // FetchNext can discover messages before we do in the same nanosecond. In
+      // that case, make sure the contents match and don't add it a second time.
+      auto back = monotonic_remote_transmit_times_
+          [monotonic_remote_transmit_times_.size() - 1];
+      if (back.sent_queue_index == sent_queue_index) {
+        CHECK_EQ(back.monotonic_sent_time, monotonic_sent_time) << this;
+        CHECK_EQ(back.transmit_time, transmit_time) << this;
+        return;
+      }
+    }
+
+    // Capture the time this message was published over the network on the
+    // remote node
+    monotonic_remote_transmit_times_.push_back(TransmitTime{
+        .monotonic_sent_time = monotonic_sent_time,
+        .sent_queue_index = sent_queue_index,
+        .transmit_time = transmit_time,
+    });
+  }
+
+  void Connect() {
+    if (time_to_live() == 0 && published_ == false) {
+      if (forwarding_disabled()) {
+        return;
+      }
+      CHECK(fetcher_);
+
+      fetcher_->Fetch();
+      sent_ = fetcher_->context().data == nullptr;
+      reliable_scheduled_ = true;
+
+      QueueTransmitTimestamp(fetcher_->context().queue_index,
+                             fetcher_->context().monotonic_event_time,
+                             fetch_event_loop_->monotonic_now());
+      Schedule();
+    }
+  }
+
+  bool SendingTo(const Node *destination) {
+    return send_event_loop_ && send_event_loop_->node() == destination;
+  }
+
   // Kicks us to re-fetch and schedule the timer.
   void Schedule() {
     CHECK(!forwarding_disabled());
@@ -179,9 +280,13 @@
       return;
     }
 
+    CHECK_GT(monotonic_remote_transmit_times_.size(), 0u) << this;
+    const monotonic_clock::time_point transmit_time =
+        monotonic_remote_transmit_times_[0].transmit_time;
+
     // Compute the time to publish this message.
     const monotonic_clock::time_point monotonic_delivered_time =
-        DeliveredTime(fetcher_->context());
+        DeliveredTime(transmit_time);
 
     CHECK_GE(monotonic_delivered_time, send_node_factory_->monotonic_now())
         << ": Trying to deliver message in the past on channel "
@@ -197,6 +302,8 @@
     } else {
       server_status_->AddDroppedPacket(server_index_, channel_);
       sent_ = true;
+      reliable_scheduled_ = false;
+      published_ = false;
       Schedule();
     }
   }
@@ -208,20 +315,42 @@
     while (true) {
       if (fetcher_->context().data == nullptr || sent_) {
         sent_ = !fetcher_->FetchNext();
+        if (!sent_) {
+          published_ = false;
+        }
+        if (!sent_) {
+          if (monotonic_remote_transmit_times_.size() == 0u) {
+            QueueTransmitTimestamp(fetcher_->context().queue_index,
+                                   fetcher_->context().monotonic_event_time,
+                                   fetch_event_loop_->monotonic_now());
+          }
+        }
       }
       if (sent_) {
         break;
       }
 
       if (server_connection_->state() != State::CONNECTED) {
+        CHECK_GT(monotonic_remote_transmit_times_.size(), 0u) << this;
+        CHECK_EQ(monotonic_remote_transmit_times_[0].monotonic_sent_time,
+                 fetcher_->context().monotonic_event_time)
+            << this << " " << Name();
+        CHECK_EQ(monotonic_remote_transmit_times_[0].sent_queue_index,
+                 fetcher_->context().queue_index)
+            << this << " " << Name();
+
+        monotonic_remote_transmit_times_.erase(
+            monotonic_remote_transmit_times_.begin());
         sent_ = true;
+        reliable_scheduled_ = false;
+        published_ = false;
         server_status_->AddDroppedPacket(server_index_, channel_);
         continue;
       }
 
       if (fetcher_->context().monotonic_event_time +
                   send_node_factory_->network_delay() +
-                  send_node_factory_->send_delay() >
+                  send_node_factory_->send_delay() >=
               fetch_node_factory_->monotonic_now() ||
           time_to_live() == 0) {
         break;
@@ -229,14 +358,14 @@
 
       // TODO(austin): Not cool.  We want to actually forward these.  This means
       // we need a more sophisticated concept of what is running.
-      // TODO(james): This fails if multiple messages are sent on the same
-      // channel within the same callback.
       LOG(WARNING) << "Not forwarding message on "
                    << configuration::CleanedChannelToString(fetcher_->channel())
                    << " because we aren't running.  Sent at "
                    << fetcher_->context().monotonic_event_time << " now is "
                    << fetch_node_factory_->monotonic_now();
       sent_ = true;
+      reliable_scheduled_ = false;
+      published_ = false;
       server_status_->AddDroppedPacket(server_index_, channel_);
     }
   }
@@ -246,22 +375,40 @@
     timer_scheduled_ = false;
     CHECK(sender_);
     CHECK(client_status_);
+
+    // Confirm that the first element in the times list is ours, and pull the
+    // transmit time out of it.
+    CHECK(!monotonic_remote_transmit_times_.empty());
+    CHECK_EQ(monotonic_remote_transmit_times_[0].monotonic_sent_time,
+             fetcher_->context().monotonic_event_time);
+    CHECK_EQ(monotonic_remote_transmit_times_[0].sent_queue_index,
+             fetcher_->context().queue_index);
+
+    const monotonic_clock::time_point monotonic_remote_transmit_time =
+        monotonic_remote_transmit_times_[0].transmit_time;
+
+    monotonic_remote_transmit_times_.erase(
+        monotonic_remote_transmit_times_.begin());
+
     if (server_connection_->state() != State::CONNECTED) {
       sent_ = true;
+      reliable_scheduled_ = false;
+      published_ = false;
       Schedule();
       return;
     }
+
     // Fill out the send times.
     sender_->CheckOk(sender_->Send(
         fetcher_->context().data, fetcher_->context().size,
         fetcher_->context().monotonic_event_time,
-        fetcher_->context().realtime_event_time,
+        fetcher_->context().realtime_event_time, monotonic_remote_transmit_time,
         fetcher_->context().queue_index, fetcher_->context().source_boot_uuid));
 
     // And simulate message_bridge's offset recovery.
-    client_status_->SampleFilter(
-        client_index_, fetcher_->context().monotonic_event_time,
-        sender_->monotonic_sent_time(), fetcher_->context().source_boot_uuid);
+    client_status_->SampleFilter(client_index_, monotonic_remote_transmit_time,
+                                 sender_->monotonic_sent_time(),
+                                 fetcher_->context().source_boot_uuid);
 
     client_connection_->mutate_received_packets(
         client_connection_->received_packets() + 1);
@@ -294,7 +441,8 @@
           fetcher_->context().realtime_event_time.time_since_epoch().count());
       message_header_builder.add_remote_queue_index(
           fetcher_->context().queue_index);
-
+      message_header_builder.add_monotonic_remote_transmit_time(
+          monotonic_remote_transmit_time.time_since_epoch().count());
       message_header_builder.add_monotonic_sent_time(
           sender_->monotonic_sent_time().time_since_epoch().count());
       message_header_builder.add_realtime_sent_time(
@@ -312,6 +460,8 @@
     }
 
     sent_ = true;
+    reliable_scheduled_ = false;
+    published_ = true;
     Schedule();
   }
 
@@ -354,13 +504,13 @@
   }
 
   // Converts from time on the sending node to time on the receiving node.
-  monotonic_clock::time_point DeliveredTime(const Context &context) const {
+  monotonic_clock::time_point DeliveredTime(
+      const monotonic_clock::time_point transmit_time) const {
     const distributed_clock::time_point distributed_sent_time =
-        fetch_node_factory_->ToDistributedClock(context.monotonic_event_time);
+        fetch_node_factory_->ToDistributedClock(transmit_time);
 
     const logger::BootTimestamp t = send_node_factory_->FromDistributedClock(
-        distributed_sent_time + send_node_factory_->network_delay() +
-        send_node_factory_->send_delay());
+        distributed_sent_time + send_node_factory_->network_delay());
     CHECK_EQ(t.boot, send_node_factory_->boot_count());
     return t.time;
   }
@@ -393,6 +543,8 @@
   const size_t destination_node_index_;
   // True if we have sent the message in the fetcher.
   bool sent_ = false;
+  bool published_ = false;
+  bool reliable_scheduled_ = false;
 
   ServerConnection *server_connection_ = nullptr;
   int server_index_ = -1;
@@ -403,6 +555,16 @@
   size_t channel_index_;
   aos::Sender<RemoteMessage> *timestamp_logger_ = nullptr;
 
+  struct TransmitTime {
+    monotonic_clock::time_point monotonic_sent_time;
+    uint32_t sent_queue_index;
+    monotonic_clock::time_point transmit_time;
+  };
+
+  // Stores tthe time the message was handed to the kernel to be published on
+  // the remote node over the network for all forwarded relevant messages.
+  std::vector<TransmitTime> monotonic_remote_transmit_times_;
+
   struct Timestamp {
     Timestamp(FlatbufferDetachedBuffer<RemoteMessage> new_remote_message,
               monotonic_clock::time_point new_monotonic_timestamp_time)
@@ -440,9 +602,9 @@
 
           size_t node_index = 0;
           for (const std::optional<MessageBridgeServerStatus::NodeState>
-                   &connection : node_state->server_status->nodes()) {
+                   &connection : node_state->server_status_->nodes()) {
             if (connection.has_value()) {
-              node_state->server_status->ResetFilter(node_index);
+              node_state->server_status_->ResetFilter(node_index);
             }
             ++node_index;
           }
@@ -514,9 +676,14 @@
 
     if (channel == timestamp_channel) {
       source_event_loop->second.SetSendData(
-          [captured_delayers = delayers.get()]() {
+          [source_event_loop, captured_delayers = delayers.get()](
+              uint32_t sent_queue_index,
+              monotonic_clock::time_point monotonic_sent_time) {
             for (std::unique_ptr<RawMessageDelayer> &delayer :
                  captured_delayers->v) {
+              delayer->QueueTransmitTimestamp(
+                  sent_queue_index, monotonic_sent_time,
+                  source_event_loop->second.event_loop->monotonic_now());
               delayer->Schedule();
             }
           });
@@ -588,11 +755,21 @@
   it->second.EnableStatistics();
 }
 
+void SimulatedMessageBridge::State::MakeEventLoop() {
+  // Message bridge isn't the thing that should be catching sent-too-fast,
+  // and may need to be able to forward too-fast messages replayed from old
+  // logfiles.
+  SetEventLoop(node_factory_->MakeEventLoop(
+      "message_bridge", {NodeEventLoopFactory::CheckSentTooFast::kNo,
+                         NodeEventLoopFactory::ExclusiveSenders::kNo,
+                         {}}));
+}
+
 void SimulatedMessageBridge::State::SetEventLoop(
     std::unique_ptr<aos::EventLoop> loop) {
   if (!loop) {
     timestamp_loggers = ChannelTimestampSender(nullptr);
-    server_status.reset();
+    server_status_.reset();
     client_status.reset();
     for (RawMessageDelayer *source_delayer : source_delayers_) {
       source_delayer->SetFetchEventLoop(nullptr, nullptr, nullptr);
@@ -615,38 +792,42 @@
     // Don't register watchers if we know we aren't forwarding.
     if (watcher.second->disable_forwarding) continue;
     event_loop->MakeRawNoArgWatcher(
-        watcher.first, [captured_delayers = watcher.second](const Context &) {
+        watcher.first,
+        [this, captured_delayers = watcher.second](const Context &context) {
           // We might get told after registering, so don't forward at that point
           // too.
           for (std::unique_ptr<RawMessageDelayer> &delayer :
                captured_delayers->v) {
-            delayer->Schedule();
+            delayer->MessageWatcherCallback(context.queue_index,
+                                            context.monotonic_event_time,
+                                            event_loop->monotonic_now());
           }
         });
   }
 
   timestamp_loggers = ChannelTimestampSender(event_loop.get());
-  server_status = std::make_unique<MessageBridgeServerStatus>(event_loop.get());
+  server_status_ =
+      std::make_unique<MessageBridgeServerStatus>(event_loop.get());
   if (disable_statistics_) {
-    server_status->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
+    server_status_->DisableStatistics(destroy_senders_ == DestroySenders::kYes);
   }
 
   {
     size_t node_index = 0;
     for (const std::optional<MessageBridgeServerStatus::NodeState> &connection :
-         server_status->nodes()) {
+         server_status_->nodes()) {
       if (connection.has_value()) {
         if (boot_uuids_[node_index] != UUID::Zero()) {
           switch (server_state_[node_index]) {
             case message_bridge::State::DISCONNECTED:
-              server_status->Disconnect(node_index);
+              server_status_->Disconnect(node_index);
               break;
             case message_bridge::State::CONNECTED:
-              server_status->Connect(node_index, event_loop->monotonic_now());
+              server_status_->Connect(node_index, event_loop->monotonic_now());
               break;
           }
         } else {
-          server_status->Disconnect(node_index);
+          server_status_->Disconnect(node_index);
         }
       }
       ++node_index;
@@ -655,11 +836,11 @@
 
   for (size_t i = 0; i < boot_uuids_.size(); ++i) {
     if (boot_uuids_[i] != UUID::Zero()) {
-      server_status->SetBootUUID(i, boot_uuids_[i]);
+      server_status_->SetBootUUID(i, boot_uuids_[i]);
     }
   }
   if (fn_) {
-    server_status->set_send_data(fn_);
+    server_status_->set_send_data(fn_);
   }
   client_status = std::make_unique<MessageBridgeClientStatus>(event_loop.get());
   if (disable_statistics_) {
@@ -724,7 +905,7 @@
   }
 
   for (RawMessageDelayer *source_delayer : source_delayers_) {
-    source_delayer->SetFetchEventLoop(event_loop.get(), server_status.get(),
+    source_delayer->SetFetchEventLoop(event_loop.get(), server_status_.get(),
                                       &timestamp_loggers);
   }
   for (RawMessageDelayer *destination_delayer : destination_delayers_) {
@@ -759,4 +940,110 @@
   });
 }
 
+void SimulatedMessageBridge::State::SetSendData(
+    std::function<void(uint32_t, monotonic_clock::time_point)> fn) {
+  CHECK(!fn_);
+  fn_ = std::move(fn);
+  if (server_status_) {
+    server_status_->set_send_data(fn_);
+  }
+}
+
+void SimulatedMessageBridge::State::SetBootUUID(size_t node_index,
+                                                const UUID &boot_uuid) {
+  boot_uuids_[node_index] = boot_uuid;
+  const Node *node = node_factory_->configuration()->nodes()->Get(node_index);
+  if (server_status_) {
+    ServerConnection *connection = server_status_->FindServerConnection(node);
+    if (connection) {
+      if (boot_uuid == UUID::Zero()) {
+        server_status_->Disconnect(node_index);
+        server_status_->ResetFilter(node_index);
+      } else {
+        switch (server_state_[node_index]) {
+          case message_bridge::State::DISCONNECTED:
+            server_status_->Disconnect(node_index);
+            break;
+          case message_bridge::State::CONNECTED:
+            server_status_->Connect(node_index, event_loop->monotonic_now());
+            break;
+        }
+        server_status_->ResetFilter(node_index);
+        server_status_->SetBootUUID(node_index, boot_uuid);
+      }
+    }
+  }
+  if (client_status) {
+    const int client_index =
+        client_status->FindClientIndex(node->name()->string_view());
+    client_status->SampleReset(client_index);
+    if (boot_uuid == UUID::Zero()) {
+      client_status->Disconnect(client_index);
+    } else {
+      switch (client_state_[node_index]) {
+        case message_bridge::State::CONNECTED:
+          client_status->Connect(client_index);
+          break;
+        case message_bridge::State::DISCONNECTED:
+          client_status->Disconnect(client_index);
+          break;
+      }
+    }
+  }
+}
+
+void SimulatedMessageBridge::State::SetServerState(
+    const Node *destination, message_bridge::State state) {
+  const size_t node_index =
+      configuration::GetNodeIndex(node_factory_->configuration(), destination);
+  server_state_[node_index] = state;
+  if (server_status_) {
+    ServerConnection *connection =
+        server_status_->FindServerConnection(destination);
+    if (connection == nullptr) return;
+
+    if (state == connection->state()) {
+      return;
+    }
+    switch (state) {
+      case message_bridge::State::DISCONNECTED:
+        server_status_->Disconnect(node_index);
+        break;
+      case message_bridge::State::CONNECTED:
+        server_status_->Connect(node_index, event_loop->monotonic_now());
+        for (RawMessageDelayer *delayer : source_delayers_) {
+          if (delayer->SendingTo(destination)) {
+            delayer->Connect();
+          }
+        }
+        break;
+    }
+  }
+}
+
+void SimulatedMessageBridge::State::SetClientState(
+    const Node *source, message_bridge::State state) {
+  const size_t node_index =
+      configuration::GetNodeIndex(node_factory_->configuration(), source);
+  client_state_[node_index] = state;
+  if (client_status) {
+    const int client_index =
+        client_status->FindClientIndex(source->name()->string_view());
+    ClientConnection *connection = client_status->GetClientConnection(source);
+
+    // TODO(austin): Are there cases where we want to dedup 2 CONNECTED
+    // calls?
+    if (connection->state() != state) {
+      switch (state) {
+        case message_bridge::State::CONNECTED:
+          client_status->Connect(client_index);
+          break;
+        case message_bridge::State::DISCONNECTED:
+          client_status->Disconnect(client_index);
+          break;
+      }
+    }
+  }
+}
+
 }  // namespace aos::message_bridge