Resend any reliable messages received before startup

We want to be able to publish (and receive) low frequency configuration
messages.  This lets us make those *not* periodic when we move them
across nodes, simplifying the system.

This takes some special tracking at startup.  We very much don't want to
re-send messages already sent.  That would result in 2 receive
timestamps for a single node for 1 send packet, probably breaking log
sorting.  I'm not interested in learning what would break...

Change-Id: I489460cd4919907516e504e6694d7cef544b0da6
diff --git a/aos/network/BUILD b/aos/network/BUILD
index b142967..8d49f20 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -299,10 +299,11 @@
         ":message_bridge_test_client_config",
         ":message_bridge_test_server_config",
     ],
-    shard_count = 4,
+    shard_count = 5,
     deps = [
         ":message_bridge_client_lib",
         ":message_bridge_server_lib",
+        "//aos:event",
         "//aos:json_to_flatbuffer",
         "//aos/events:ping_fbs",
         "//aos/events:pong_fbs",
diff --git a/aos/network/message_bridge_client.fbs b/aos/network/message_bridge_client.fbs
index 9dddd16..7056506 100644
--- a/aos/network/message_bridge_client.fbs
+++ b/aos/network/message_bridge_client.fbs
@@ -18,6 +18,9 @@
   // to get their monotonic time.
   monotonic_offset:int64 (id: 3);
 
+  // Number of duplicate packets we received and dropped.
+  duplicate_packets:uint (id: 4);
+
   // TODO(austin): Per channel counts?
 }
 
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index e5c18b6..99b2ac1 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -94,7 +94,7 @@
 SctpClientConnection::SctpClientConnection(
     aos::ShmEventLoop *const event_loop, std::string_view remote_name,
     const Node *my_node, std::string_view local_host,
-    std::vector<std::unique_ptr<aos::RawSender>> *channels, int client_index,
+    std::vector<SctpClientChannelState> *channels, int client_index,
     MessageBridgeClientStatus *client_status)
     : event_loop_(event_loop),
       connect_message_(MakeConnectMessage(event_loop->configuration(), my_node,
@@ -223,57 +223,71 @@
   CHECK_EQ(message->size, flatbuffers::GetPrefixedSize(message->data()) +
                               sizeof(flatbuffers::uoffset_t));
 
-  connection_->mutate_received_packets(connection_->received_packets() + 1);
-
   const int stream = message->header.rcvinfo.rcv_sid - kControlStreams();
+  SctpClientChannelState *channel_state = &((*channels_)[stream_to_channel_[stream]]);
 
-  // Publish the message.
-  RawSender *sender = (*channels_)[stream_to_channel_[stream]].get();
-  sender->Send(message_header->data()->data(), message_header->data()->size(),
-               aos::monotonic_clock::time_point(
-                   chrono::nanoseconds(message_header->monotonic_sent_time())),
-               aos::realtime_clock::time_point(
-                   chrono::nanoseconds(message_header->realtime_sent_time())),
-               message_header->queue_index());
-
-  client_status_->SampleFilter(
-      client_index_,
+  if (message_header->queue_index() == channel_state->last_queue_index &&
       aos::monotonic_clock::time_point(
-          chrono::nanoseconds(message_header->monotonic_sent_time())),
-      sender->monotonic_sent_time());
+          chrono::nanoseconds(message_header->monotonic_sent_time())) ==
+          channel_state->last_timestamp) {
+    LOG(INFO) << "Duplicate message from " << message->PeerAddress();
+    connection_->mutate_duplicate_packets(connection_->duplicate_packets() + 1);
+    // Duplicate message, ignore.
+  } else {
+    connection_->mutate_received_packets(connection_->received_packets() + 1);
 
-  if (stream_reply_with_timestamp_[stream]) {
-    // TODO(austin): Send back less if we are only acking.  Maybe only a
-    // stream id?  Nothing if we are only forwarding?
+    channel_state->last_queue_index = message_header->queue_index();
+    channel_state->last_timestamp = aos::monotonic_clock::time_point(
+        chrono::nanoseconds(message_header->monotonic_sent_time()));
 
-    // Now fill out the message received reply.  This uses a MessageHeader
-    // container so it can be directly logged.
-    message_reception_reply_.mutable_message()->mutate_channel_index(
-        message_header->channel_index());
-    message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
-        message_header->monotonic_sent_time());
-    message_reception_reply_.mutable_message()->mutate_realtime_sent_time(
-        message_header->realtime_sent_time());
-    message_reception_reply_.mutable_message()->mutate_queue_index(
-        message_header->queue_index());
+    // Publish the message.
+    RawSender *sender = channel_state->sender.get();
+    sender->Send(message_header->data()->data(), message_header->data()->size(),
+                 aos::monotonic_clock::time_point(chrono::nanoseconds(
+                     message_header->monotonic_sent_time())),
+                 aos::realtime_clock::time_point(
+                     chrono::nanoseconds(message_header->realtime_sent_time())),
+                 message_header->queue_index());
 
-    // And capture the relevant data needed to generate the forwarding
-    // MessageHeader.
-    message_reception_reply_.mutable_message()->mutate_monotonic_remote_time(
-        sender->monotonic_sent_time().time_since_epoch().count());
-    message_reception_reply_.mutable_message()->mutate_realtime_remote_time(
-        sender->realtime_sent_time().time_since_epoch().count());
-    message_reception_reply_.mutable_message()->mutate_remote_queue_index(
-        sender->sent_queue_index());
+    client_status_->SampleFilter(
+        client_index_,
+        aos::monotonic_clock::time_point(
+            chrono::nanoseconds(message_header->monotonic_sent_time())),
+        sender->monotonic_sent_time());
 
-    // Unique ID is channel_index and monotonic clock.
-    // TODO(austin): Depending on if we are the logger node or not, we need to
-    // guarentee that this ack gets received too...  Same path as the logger.
-    client_.Send(kTimestampStream(),
-                 std::string_view(reinterpret_cast<const char *>(
-                                      message_reception_reply_.span().data()),
-                                  message_reception_reply_.span().size()),
-                 0);
+    if (stream_reply_with_timestamp_[stream]) {
+      // TODO(austin): Send back less if we are only acking.  Maybe only a
+      // stream id?  Nothing if we are only forwarding?
+
+      // Now fill out the message received reply.  This uses a MessageHeader
+      // container so it can be directly logged.
+      message_reception_reply_.mutable_message()->mutate_channel_index(
+          message_header->channel_index());
+      message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
+          message_header->monotonic_sent_time());
+      message_reception_reply_.mutable_message()->mutate_realtime_sent_time(
+          message_header->realtime_sent_time());
+      message_reception_reply_.mutable_message()->mutate_queue_index(
+          message_header->queue_index());
+
+      // And capture the relevant data needed to generate the forwarding
+      // MessageHeader.
+      message_reception_reply_.mutable_message()->mutate_monotonic_remote_time(
+          sender->monotonic_sent_time().time_since_epoch().count());
+      message_reception_reply_.mutable_message()->mutate_realtime_remote_time(
+          sender->realtime_sent_time().time_since_epoch().count());
+      message_reception_reply_.mutable_message()->mutate_remote_queue_index(
+          sender->sent_queue_index());
+
+      // Unique ID is channel_index and monotonic clock.
+      // TODO(austin): Depending on if we are the logger node or not, we need to
+      // guarentee that this ack gets received too...  Same path as the logger.
+      client_.Send(kTimestampStream(),
+                   std::string_view(reinterpret_cast<const char *>(
+                                        message_reception_reply_.span().data()),
+                                    message_reception_reply_.span().size()),
+                   0);
+    }
   }
 
   VLOG(1) << "Received data of length " << message->size << " from "
@@ -308,7 +322,24 @@
               event_loop_->configuration(), channel->name()->string_view(),
               channel->type()->string_view(), event_loop_->name(),
               event_loop_->node());
-          channels_[channel_index] = event_loop_->MakeRawSender(mapped_channel);
+
+          channels_[channel_index].sender =
+              event_loop_->MakeRawSender(mapped_channel);
+
+          std::unique_ptr<aos::RawFetcher> raw_fetcher =
+              event_loop_->MakeRawFetcher(mapped_channel);
+          raw_fetcher->Fetch();
+
+          if (raw_fetcher->context().data != nullptr) {
+            VLOG(1) << "Found data on "
+                    << configuration::CleanedChannelToString(channel)
+                    << ", won't resend it.";
+            channels_[channel_index].last_queue_index =
+                raw_fetcher->context().queue_index;
+            channels_[channel_index].last_timestamp =
+                raw_fetcher->context().monotonic_remote_time;
+          }
+
           break;
         }
       }
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
index 8134cbd..2b48906 100644
--- a/aos/network/message_bridge_client_lib.h
+++ b/aos/network/message_bridge_client_lib.h
@@ -15,6 +15,16 @@
 namespace aos {
 namespace message_bridge {
 
+// Structure to hold per channel state.
+struct SctpClientChannelState {
+  // The sender for a channel.
+  std::unique_ptr<aos::RawSender> sender;
+  // The last queue index of a message sent.  Used for detecting duplicates.
+  uint32_t last_queue_index = 0xffffffff;
+  // The last timestamp of a message sent.  Used for detecting duplicates.
+  monotonic_clock::time_point last_timestamp = monotonic_clock::min_time;
+};
+
 // See message_bridge_protocol.h for more details about the protocol.
 
 // This class encapsulates all the state required to connect to a server and
@@ -24,7 +34,7 @@
   SctpClientConnection(aos::ShmEventLoop *const event_loop,
                        std::string_view remote_name, const Node *my_node,
                        std::string_view local_host,
-                       std::vector<std::unique_ptr<aos::RawSender>> *channels,
+                       std::vector<SctpClientChannelState> *channels,
                        int client_index,
                        MessageBridgeClientStatus *client_status);
 
@@ -62,7 +72,7 @@
   SctpClient client_;
 
   // Channels to send received messages on.
-  std::vector<std::unique_ptr<aos::RawSender>> *channels_;
+  std::vector<SctpClientChannelState> *channels_;
   // Stream number -> channel lookup.
   std::vector<int> stream_to_channel_;
   // Bitmask signaling if we should be replying back with delivery times.
@@ -97,7 +107,7 @@
   MessageBridgeClientStatus client_status_;
 
   // Channels to send data over.
-  std::vector<std::unique_ptr<aos::RawSender>> channels_;
+  std::vector<SctpClientChannelState> channels_;
 
   // List of connections.  These correspond to the nodes in source_node_names_
   std::vector<std::unique_ptr<SctpClientConnection>> connections_;
diff --git a/aos/network/message_bridge_client_status.cc b/aos/network/message_bridge_client_status.cc
index e7107ea..9406d4b 100644
--- a/aos/network/message_bridge_client_status.cc
+++ b/aos/network/message_bridge_client_status.cc
@@ -31,6 +31,7 @@
     connection_builder.add_state(State::DISCONNECTED);
     // TODO(austin): Track dropped packets.
     connection_builder.add_received_packets(0);
+    connection_builder.add_duplicate_packets(0);
     connection_builder.add_monotonic_offset(0);
     connection_offsets.emplace_back(connection_builder.Finish());
   }
@@ -90,6 +91,10 @@
     client_connection_builder.add_state(connection->state());
     client_connection_builder.add_received_packets(
         connection->received_packets());
+    if (connection->duplicate_packets() != 0) {
+      client_connection_builder.add_duplicate_packets(
+          connection->duplicate_packets());
+    }
 
     // Strip out the monotonic offset if it isn't populated.
     TimestampFilter *filter = &filters_[client_connection_offsets_.size()];
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 50ac97e..b6c94e5 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -24,9 +24,8 @@
       channel_->max_size() == other_channel->max_size());
 }
 
-void ChannelState::SendData(SctpServer *server, const Context &context) {
-  // TODO(austin): I don't like allocating this buffer when we are just freeing
-  // it at the end of the function.
+flatbuffers::FlatBufferBuilder ChannelState::PackContext(
+    const Context &context) {
   flatbuffers::FlatBufferBuilder fbb(channel_->max_size() + 100);
   fbb.ForceDefaults(true);
   VLOG(1) << "Found " << peers_.size() << " peers on channel "
@@ -38,6 +37,14 @@
   fbb.FinishSizePrefixed(logger::PackMessage(&fbb, context, channel_index_,
                                              logger::LogType::kLogMessage));
 
+  return fbb;
+}
+
+void ChannelState::SendData(SctpServer *server, const Context &context) {
+  // TODO(austin): I don't like allocating this buffer when we are just freeing
+  // it at the end of the function.
+  flatbuffers::FlatBufferBuilder fbb = PackContext(context);
+
   // TODO(austin): Track which connections need to be reliable and handle
   // resending properly.
   size_t sent_count = 0;
@@ -175,6 +182,7 @@
 }
 
 int ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
+  VLOG(1) << "Disconnected " << assoc_id;
   for (ChannelState::Peer &peer : peers_) {
     if (peer.sac_assoc_id == assoc_id) {
       // TODO(austin): This will not handle multiple clients from
@@ -190,12 +198,47 @@
 
 int ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
                                 int stream, SctpServer *server) {
+  VLOG(1) << "Connected to assoc_id: " << assoc_id;
   for (ChannelState::Peer &peer : peers_) {
     if (peer.connection->name()->string_view() == node->name()->string_view()) {
+      // There's a peer already connected.  Disconnect them and take over.
+      if (peer.sac_assoc_id != 0) {
+        LOG(WARNING) << "Peer " << peer.sac_assoc_id
+                     << " already connected, aborting old connection.";
+        server->Abort(peer.sac_assoc_id);
+      }
+
       peer.sac_assoc_id = assoc_id;
       peer.stream = stream;
       peer.server_connection_statistics->mutate_state(State::CONNECTED);
       server->SetStreamPriority(assoc_id, stream, peer.connection->priority());
+      if (last_message_fetcher_ && peer.connection->time_to_live() == 0) {
+        last_message_fetcher_->Fetch();
+        VLOG(1) << "Got a connection on a reliable channel "
+                << configuration::StrippedChannelToString(
+                       last_message_fetcher_->channel())
+                << ", sending? "
+                << (last_message_fetcher_->context().data != nullptr);
+        if (last_message_fetcher_->context().data != nullptr) {
+          // SendData sends to all...  Only send to the new one.
+          // TODO(austin): I don't like allocating this buffer when we are just
+          // freeing it at the end of the function.
+          flatbuffers::FlatBufferBuilder fbb =
+              PackContext(last_message_fetcher_->context());
+
+          if (server->Send(std::string_view(reinterpret_cast<const char *>(
+                                                fbb.GetBufferPointer()),
+                                            fbb.GetSize()),
+                           peer.sac_assoc_id, peer.stream,
+                           peer.connection->time_to_live() / 1000000)) {
+            peer.server_connection_statistics->mutate_sent_packets(
+                peer.server_connection_statistics->sent_packets() + 1);
+          } else {
+            peer.server_connection_statistics->mutate_dropped_packets(
+                peer.server_connection_statistics->dropped_packets() + 1);
+          }
+        }
+      }
       return peer.node_index;
     }
   }
@@ -261,9 +304,17 @@
 
     if (configuration::ChannelIsSendableOnNode(channel, event_loop_->node()) &&
         channel->has_destination_nodes()) {
+
+      bool any_reliable = false;
+      for (const Connection *connection : *channel->destination_nodes()) {
+        if (connection->time_to_live() == 0) {
+          any_reliable = true;
+        }
+      }
       max_size = std::max(channel->max_size(), max_size);
-      std::unique_ptr<ChannelState> state(
-          new ChannelState{channel, channel_index});
+      std::unique_ptr<ChannelState> state(new ChannelState{
+          channel, channel_index,
+          any_reliable ? event_loop_->MakeRawFetcher(channel) : nullptr});
 
       for (const Connection *connection : *channel->destination_nodes()) {
         const Node *other_node = configuration::GetNode(
@@ -305,13 +356,19 @@
               state_ptr->SendData(&server_, context);
             });
       } else {
+        for (const Connection *connection : *channel->destination_nodes()) {
+          CHECK_GE(connection->time_to_live(), 1000u);
+        }
         CHECK(timestamp_state_ == nullptr);
         timestamp_state_ = state.get();
       }
       channels_.emplace_back(std::move(state));
     } else if (channel == timestamp_channel) {
       std::unique_ptr<ChannelState> state(
-          new ChannelState{channel, channel_index});
+          new ChannelState{channel, channel_index, nullptr});
+      for (const Connection *connection : *channel->destination_nodes()) {
+        CHECK_GE(connection->time_to_live(), 1000u);
+      }
       timestamp_state_ = state.get();
       channels_.emplace_back(std::move(state));
     } else {
@@ -338,8 +395,10 @@
       continue;
     }
 
-    node_index = channel_state->NodeDisconnected(assoc_id);
-    CHECK_NE(node_index, -1);
+    int new_node_index = channel_state->NodeDisconnected(assoc_id);
+    if (new_node_index != -1) {
+      node_index = new_node_index;
+    }
   }
 
   if (node_index != -1) {
@@ -360,6 +419,10 @@
     const union sctp_notification *snp =
         (const union sctp_notification *)message->data();
 
+    if (VLOG_IS_ON(2)) {
+      PrintNotification(message.get());
+    }
+
     switch (snp->sn_header.sn_type) {
       case SCTP_ASSOC_CHANGE: {
         const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
@@ -380,10 +443,6 @@
         }
       } break;
     }
-
-    if (VLOG_IS_ON(2)) {
-      PrintNotification(message.get());
-    }
   } else if (message->message_type == Message::kMessage) {
     HandleData(message.get());
   }
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index ad384f4..2a75ad6 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -24,8 +24,11 @@
 // new message from the event loop.
 class ChannelState {
  public:
-  ChannelState(const Channel *channel, int channel_index)
-      : channel_index_(channel_index), channel_(channel) {}
+  ChannelState(const Channel *channel, int channel_index,
+               std::unique_ptr<aos::RawFetcher> last_message_fetcher)
+      : channel_index_(channel_index),
+        channel_(channel),
+        last_message_fetcher_(std::move(last_message_fetcher)) {}
 
   // Class to encapsulate all the state per client on a channel.  A client may
   // be subscribed to multiple channels.
@@ -73,6 +76,9 @@
   // Sends the data in context using the provided server.
   void SendData(SctpServer *server, const Context &context);
 
+  // Packs a context into a size prefixed message header for transmission.
+  flatbuffers::FlatBufferBuilder PackContext(const Context &context);
+
   // Handles reception of delivery times.
   void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
                       absl::Span<const uint8_t> data);
@@ -85,10 +91,16 @@
   const int channel_index_;
   const Channel *const channel_;
 
+  std::unique_ptr<aos::RawFetcher> fetcher_;
+
   std::vector<Peer> peers_;
 
   std::deque<SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader>>
       sent_messages_;
+
+  // A fetcher to use to send the last message when a node connects and is
+  // reliable.
+  std::unique_ptr<aos::RawFetcher> last_message_fetcher_;
 };
 
 // This encapsulates the state required to talk to *all* the clients from this
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index a3e7f63..36bdde9 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -4,6 +4,7 @@
 #include <thread>
 
 #include "absl/strings/str_cat.h"
+#include "aos/event.h"
 #include "aos/events/ping_generated.h"
 #include "aos/events/pong_generated.h"
 #include "aos/network/message_bridge_client_lib.h"
@@ -817,11 +818,323 @@
   pi2_test_thread.join();
 }
 
-// TODO(austin): This test confirms that the external state does the right
+// TODO(austin): The above test confirms that the external state does the right
 // thing, but doesn't confirm that the internal state does.  We either need to
 // expose a way to check the state in a thread-safe way, or need a way to jump
 // time for one node to do that.
 
+void SendPing(aos::Sender<examples::Ping> *sender, int value) {
+  aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
+  examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
+  ping_builder.add_value(value);
+  builder.Send(ping_builder.Finish());
+}
+
+// Tests that when a message is sent before the bridge starts up, but is
+// configured as reliable, we forward it.  Confirm this survives a client reset.
+TEST_F(MessageBridgeTest, ReliableSentBeforeClientStartup) {
+  DoSetShmBase("pi1");
+  // Force ourselves to be "raspberrypi" and allocate everything.
+  FLAGS_override_hostname = "raspberrypi";
+
+  FLAGS_application_name = "sender";
+  aos::ShmEventLoop send_event_loop(&pi1_config.message());
+  aos::Sender<examples::Ping> ping_sender =
+      send_event_loop.MakeSender<examples::Ping>("/test");
+  SendPing(&ping_sender, 1);
+  aos::Sender<examples::Ping> unreliable_ping_sender =
+      send_event_loop.MakeSender<examples::Ping>("/unreliable");
+  SendPing(&unreliable_ping_sender, 1);
+
+  FLAGS_application_name = "pi1_message_bridge_server";
+  aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+  MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+  FLAGS_application_name = "pi1_message_bridge_client";
+  aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
+  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+  FLAGS_application_name = "pi1_timestamp";
+  aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
+
+  // Now do it for "raspberrypi2", the client.
+  DoSetShmBase("pi2");
+  FLAGS_override_hostname = "raspberrypi2";
+
+  FLAGS_application_name = "pi2_message_bridge_server";
+  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+  MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+  aos::ShmEventLoop receive_event_loop(&pi2_config.message());
+  aos::Fetcher<examples::Ping> ping_fetcher =
+      receive_event_loop.MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
+      receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
+  aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
+      receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
+
+  const size_t ping_channel_index = configuration::ChannelIndex(
+      receive_event_loop.configuration(), ping_fetcher.channel());
+
+  std::atomic<int> ping_timestamp_count{0};
+  pi1_remote_timestamp_event_loop.MakeWatcher(
+      "/pi1/aos/remote_timestamps/pi2",
+      [ping_channel_index,
+       &ping_timestamp_count](const logger::MessageHeader &header) {
+        VLOG(1) << aos::FlatbufferToJson(&header);
+        if (header.channel_index() == ping_channel_index) {
+          ++ping_timestamp_count;
+        }
+      });
+
+  // Before everything starts up, confirm there is no message.
+  EXPECT_FALSE(ping_fetcher.Fetch());
+  EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+
+  // Spin up the persistant pieces.
+  std::thread pi1_server_thread(
+      [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+  std::thread pi1_client_thread(
+      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+  std::thread pi2_server_thread(
+      [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+
+  // Event used to wait for the timestamp counting thread to start.
+  aos::Event event;
+  std::thread pi1_remote_timestamp_thread(
+      [&pi1_remote_timestamp_event_loop, &event]() {
+        pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
+        pi1_remote_timestamp_event_loop.Run();
+      });
+
+  event.Wait();
+
+  {
+    // Now, spin up a client for 2 seconds.
+    LOG(INFO) << "Starting first pi2 MessageBridgeClient";
+    FLAGS_application_name = "pi2_message_bridge_client";
+    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+    aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
+        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_client_event_loop.monotonic_now() +
+                  chrono::milliseconds(2050));
+    });
+
+    // And go!
+    pi2_client_event_loop.Run();
+
+    // Confirm there is no detected duplicate packet.
+    EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->duplicate_packets(),
+              0u);
+
+    EXPECT_TRUE(ping_fetcher.Fetch());
+    EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+    EXPECT_EQ(ping_timestamp_count, 1);
+    LOG(INFO) << "Shutting down first pi2 MessageBridgeClient";
+  }
+
+  {
+    // Now, spin up a second client for 2 seconds.
+    LOG(INFO) << "Starting second pi2 MessageBridgeClient";
+    FLAGS_application_name = "pi2_message_bridge_client";
+    aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+    MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+    aos::TimerHandler *quit = pi2_client_event_loop.AddTimer(
+        [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+    pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi2_client_event_loop.monotonic_now() +
+                  chrono::milliseconds(5050));
+    });
+
+    // And go!
+    pi2_client_event_loop.Run();
+
+    // Confirm we detect the duplicate packet correctly.
+    EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->duplicate_packets(),
+              1u);
+
+    EXPECT_EQ(ping_timestamp_count, 1);
+    EXPECT_FALSE(ping_fetcher.Fetch());
+    EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+  }
+
+  // Shut everyone else down
+  pi1_server_event_loop.Exit();
+  pi1_client_event_loop.Exit();
+  pi2_server_event_loop.Exit();
+  pi1_remote_timestamp_event_loop.Exit();
+  pi1_remote_timestamp_thread.join();
+  pi1_server_thread.join();
+  pi1_client_thread.join();
+  pi2_server_thread.join();
+}
+
+// Tests that when a message is sent before the bridge starts up, but is
+// configured as reliable, we forward it.  Confirm this works across server
+// resets.
+TEST_F(MessageBridgeTest, ReliableSentBeforeServerStartup) {
+  // Now do it for "raspberrypi2", the client.
+  DoSetShmBase("pi2");
+  FLAGS_override_hostname = "raspberrypi2";
+
+  FLAGS_application_name = "pi2_message_bridge_server";
+  aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+  MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+  FLAGS_application_name = "pi2_message_bridge_client";
+  aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+  MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+  aos::ShmEventLoop receive_event_loop(&pi2_config.message());
+  aos::Fetcher<examples::Ping> ping_fetcher =
+      receive_event_loop.MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<examples::Ping> unreliable_ping_fetcher =
+      receive_event_loop.MakeFetcher<examples::Ping>("/unreliable");
+  aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
+      receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
+
+  DoSetShmBase("pi1");
+  // Force ourselves to be "raspberrypi" and allocate everything.
+  FLAGS_override_hostname = "raspberrypi";
+
+  FLAGS_application_name = "sender";
+  aos::ShmEventLoop send_event_loop(&pi1_config.message());
+  aos::Sender<examples::Ping> ping_sender =
+      send_event_loop.MakeSender<examples::Ping>("/test");
+  {
+    aos::Sender<examples::Ping>::Builder builder = ping_sender.MakeBuilder();
+    examples::Ping::Builder ping_builder =
+        builder.MakeBuilder<examples::Ping>();
+    ping_builder.add_value(1);
+    builder.Send(ping_builder.Finish());
+  }
+
+  FLAGS_application_name = "pi1_message_bridge_client";
+  aos::ShmEventLoop pi1_client_event_loop(&pi1_config.message());
+  MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+  FLAGS_application_name = "pi1_timestamp";
+  aos::ShmEventLoop pi1_remote_timestamp_event_loop(&pi1_config.message());
+
+  const size_t ping_channel_index = configuration::ChannelIndex(
+      receive_event_loop.configuration(), ping_fetcher.channel());
+
+  std::atomic<int> ping_timestamp_count{0};
+  pi1_remote_timestamp_event_loop.MakeWatcher(
+      "/pi1/aos/remote_timestamps/pi2",
+      [ping_channel_index,
+       &ping_timestamp_count](const logger::MessageHeader &header) {
+        VLOG(1) << aos::FlatbufferToJson(&header);
+        if (header.channel_index() == ping_channel_index) {
+          ++ping_timestamp_count;
+        }
+      });
+
+  // Before everything starts up, confirm there is no message.
+  EXPECT_FALSE(ping_fetcher.Fetch());
+  EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+
+  // Spin up the persistant pieces.
+  std::thread pi1_client_thread(
+      [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+  std::thread pi2_server_thread(
+      [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+  std::thread pi2_client_thread(
+      [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+
+  // Event used to wait for the timestamp counting thread to start.
+  aos::Event event;
+  std::thread pi1_remote_timestamp_thread(
+      [&pi1_remote_timestamp_event_loop, &event]() {
+        pi1_remote_timestamp_event_loop.OnRun([&event]() { event.Set(); });
+        pi1_remote_timestamp_event_loop.Run();
+      });
+
+  event.Wait();
+
+  {
+    // Now, spin up a server for 2 seconds.
+    FLAGS_application_name = "pi1_message_bridge_server";
+    aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+    MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+    aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
+        [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
+    pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi1_server_event_loop.monotonic_now() +
+                  chrono::milliseconds(2050));
+    });
+
+    // And go!
+    pi1_server_event_loop.Run();
+
+    // Confirm there is no detected duplicate packet.
+    EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->duplicate_packets(),
+              0u);
+
+    EXPECT_TRUE(ping_fetcher.Fetch());
+    EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+    EXPECT_EQ(ping_timestamp_count, 1);
+    LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
+  }
+
+  {
+    // Now, spin up a second server for 2 seconds.
+    FLAGS_application_name = "pi1_message_bridge_server";
+    aos::ShmEventLoop pi1_server_event_loop(&pi1_config.message());
+    MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+    aos::TimerHandler *quit = pi1_server_event_loop.AddTimer(
+        [&pi1_server_event_loop]() { pi1_server_event_loop.Exit(); });
+    pi1_server_event_loop.OnRun([quit, &pi1_server_event_loop]() {
+      // Stop between timestamps, not exactly on them.
+      quit->Setup(pi1_server_event_loop.monotonic_now() +
+                  chrono::milliseconds(2050));
+    });
+
+    // And go!
+    pi1_server_event_loop.Run();
+
+    // Confirm we detect the duplicate packet correctly.
+    EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->duplicate_packets(),
+              1u);
+
+    EXPECT_EQ(ping_timestamp_count, 1);
+    EXPECT_FALSE(ping_fetcher.Fetch());
+    EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
+    LOG(INFO) << "Shutting down first pi1 MessageBridgeServer";
+  }
+
+  // Shut everyone else down
+  pi1_client_event_loop.Exit();
+  pi2_server_event_loop.Exit();
+  pi2_client_event_loop.Exit();
+  pi1_remote_timestamp_event_loop.Exit();
+  pi1_remote_timestamp_thread.join();
+  pi1_client_thread.join();
+  pi2_server_thread.join();
+  pi2_client_thread.join();
+}
+
 }  // namespace testing
 }  // namespace message_bridge
 }  // namespace aos
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index 3ac2796..a7c0d1b 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -27,7 +27,8 @@
           "name": "pi2",
           "priority": 1,
           "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
-          "timestamp_logger_nodes": ["pi1"]
+          "timestamp_logger_nodes": ["pi1"],
+          "time_to_live": 5000000
         }
       ]
     },
@@ -42,7 +43,8 @@
           "name": "pi1",
           "priority": 1,
           "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
-          "timestamp_logger_nodes": ["pi2"]
+          "timestamp_logger_nodes": ["pi2"],
+          "time_to_live": 5000000
         }
       ]
     },
@@ -125,6 +127,20 @@
           "timestamp_logger_nodes": ["pi1"]
         }
       ]
+    },
+    {
+      "name": "/unreliable",
+      "type": "aos.examples.Ping",
+      "source_node": "pi1",
+      "destination_nodes": [
+        {
+          "name": "pi2",
+          "priority": 1,
+          "timestamp_logger": "REMOTE_LOGGER",
+          "timestamp_logger_nodes": ["pi1"],
+          "time_to_live": 5000000
+        }
+      ]
     }
   ],
   "maps": [
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
index c8563d5..e18d0d8 100644
--- a/aos/network/sctp_server.cc
+++ b/aos/network/sctp_server.cc
@@ -80,6 +80,44 @@
   return ReadSctpMessage(fd_, max_size_);
 }
 
+bool SctpServer::Abort(sctp_assoc_t snd_assoc_id) {
+  // Use the assoc_id for the destination instead of the msg_name.
+  struct msghdr outmsg;
+  outmsg.msg_namelen = 0;
+
+  outmsg.msg_iovlen = 0;
+
+  // Build up the sndinfo message.
+  char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+  outmsg.msg_control = outcmsg;
+  outmsg.msg_controllen = CMSG_SPACE(sizeof(struct sctp_sndrcvinfo));
+  outmsg.msg_flags = 0;
+
+  struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+  cmsg->cmsg_level = IPPROTO_SCTP;
+  cmsg->cmsg_type = SCTP_SNDRCV;
+  cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+  struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+  memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+  sinfo->sinfo_ppid = ++ppid_;
+  sinfo->sinfo_stream = 0;
+  sinfo->sinfo_flags = SCTP_ABORT;
+  sinfo->sinfo_assoc_id = snd_assoc_id;
+
+  // And send.
+  const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+  if (size == -1) {
+    if (errno == EPIPE || errno == EAGAIN || errno == ESHUTDOWN) {
+      return false;
+    }
+    return false;
+  } else {
+    CHECK_EQ(0, size);
+    return true;
+  }
+}
+
 bool SctpServer::Send(std::string_view data, sctp_assoc_t snd_assoc_id,
                       int stream, int timetolive) {
   struct iovec iov;
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
index 8fa3d15..e18517a 100644
--- a/aos/network/sctp_server.h
+++ b/aos/network/sctp_server.h
@@ -36,6 +36,9 @@
   bool Send(std::string_view data, sctp_assoc_t snd_assoc_id, int stream,
             int timetolive);
 
+  // Aborts a connection.  Returns true on success.
+  bool Abort(sctp_assoc_t snd_assoc_id);
+
   int fd() { return fd_; }
 
   // Enables the priority scheduler.  This is a SCTP feature which lets us