Add retry logic to message bridge server

This makes it so that message bridge will attempt to retry failed sends
of any reliable connections. This helps to improve our guarantees around
reliable message channels. This is particularly relevant during startup
when there may be large bursts of reliable messages being sent.

Wrote a manual test that triggers the retry logic on my laptop. I have
not attempted to run the test on any other platforms, but am operating
on the assumption that it'll be absurdly flaky and so should only be run
by engineers wanting to exercise this logic themselves.

Change-Id: Ic4bdfe4799b902883e1626542dff77fcbf3fb913
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/multinode_logger_test_lib.h b/aos/events/logging/multinode_logger_test_lib.h
index a2aa4af..aaba8cb 100644
--- a/aos/events/logging/multinode_logger_test_lib.h
+++ b/aos/events/logging/multinode_logger_test_lib.h
@@ -60,13 +60,13 @@
 };
 
 constexpr std::string_view kCombinedConfigSha1() {
-  return "a72e2a1e21ac07b27648825151ff9b436fd80b62254839d4ac47ee3400fa9dc1";
+  return "433bcf2bddfbbd2745a4e0c3c9dda2f9832bb61c5b311e3efdd357b9a19e1b76";
 }
 constexpr std::string_view kSplitConfigSha1() {
-  return "6e585268f58791591f48b1e6d00564f49e6dcec46d18c4809ec49d94afbb3b1c";
+  return "6956d86e4eeda28d6857c3365f79a7fb0344c74de44bcb5ebe4d51398a4a26d5";
 }
 constexpr std::string_view kReloggedSplitConfigSha1() {
-  return "6aa4cbc21e2382ea8b9ef0145e9031bf542827e29b93995dd5e203ed0c198ef7";
+  return "db53e99234ecec2cde4d6b9f7b77c8f5150e0a58f6a441030eebfc1e76a2c89c";
 }
 
 LoggerState MakeLoggerState(NodeEventLoopFactory *node,
diff --git a/aos/events/simulated_network_bridge.cc b/aos/events/simulated_network_bridge.cc
index 35cc3a6..c8a8150 100644
--- a/aos/events/simulated_network_bridge.cc
+++ b/aos/events/simulated_network_bridge.cc
@@ -515,7 +515,7 @@
 
     if (channel == timestamp_channel) {
       source_event_loop->second.SetSendData(
-          [captured_delayers = delayers.get()](const Context &) {
+          [captured_delayers = delayers.get()]() {
             for (std::unique_ptr<RawMessageDelayer> &delayer :
                  captured_delayers->v) {
               delayer->Schedule();
diff --git a/aos/events/simulated_network_bridge.h b/aos/events/simulated_network_bridge.h
index 9565029..faa6398 100644
--- a/aos/events/simulated_network_bridge.h
+++ b/aos/events/simulated_network_bridge.h
@@ -99,7 +99,7 @@
 
     void SetEventLoop(std::unique_ptr<aos::EventLoop> loop);
 
-    void SetSendData(std::function<void(const Context &)> fn) {
+    void SetSendData(std::function<void()> fn) {
       CHECK(!fn_);
       fn_ = std::move(fn);
       if (server_status) {
@@ -209,7 +209,7 @@
 
     std::vector<std::pair<const Channel *, DelayersVector *>> delayer_watchers_;
 
-    std::function<void(const Context &)> fn_;
+    std::function<void()> fn_;
 
     NodeEventLoopFactory *node_factory_;
     std::unique_ptr<aos::EventLoop> event_loop;
diff --git a/aos/network/BUILD b/aos/network/BUILD
index d8d5756..a6e2c51 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -408,6 +408,50 @@
     deps = ["//aos/events:aos_config"],
 )
 
+cc_library(
+    name = "message_bridge_test_lib",
+    testonly = True,
+    srcs = ["message_bridge_test_lib.cc"],
+    hdrs = ["message_bridge_test_lib.h"],
+    deps = [
+        ":message_bridge_client_lib",
+        ":message_bridge_server_lib",
+        "//aos:json_to_flatbuffer",
+        "//aos:sha256",
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+        "//aos/events:shm_event_loop",
+        "//aos/ipc_lib:event",
+        "//aos/testing:googletest",
+        "//aos/testing:path",
+    ],
+)
+
+cc_test(
+    name = "message_bridge_retry_test",
+    srcs = [
+        "message_bridge_retry_test.cc",
+    ],
+    data = [
+        ":message_bridge_test_common_config",
+    ],
+    # Somewhat flaky due to relying on the behavior & timing of the host system's network stack.
+    flaky = True,
+    target_compatible_with = ["@platforms//os:linux"],
+    deps = [
+        ":message_bridge_server_lib",
+        ":message_bridge_test_lib",
+        "//aos:json_to_flatbuffer",
+        "//aos:sha256",
+        "//aos/events:ping_fbs",
+        "//aos/events:pong_fbs",
+        "//aos/events:shm_event_loop",
+        "//aos/ipc_lib:event",
+        "//aos/testing:googletest",
+        "//aos/testing:path",
+    ],
+)
+
 cc_test(
     name = "message_bridge_test",
     srcs = [
@@ -418,11 +462,12 @@
         ":message_bridge_test_common_config",
     ],
     flaky = True,
-    shard_count = 10,
+    shard_count = 16,
     target_compatible_with = ["@platforms//os:linux"],
     deps = [
         ":message_bridge_client_lib",
         ":message_bridge_server_lib",
+        ":message_bridge_test_lib",
         "//aos:json_to_flatbuffer",
         "//aos:sha256",
         "//aos/events:ping_fbs",
diff --git a/aos/network/message_bridge_retry_test.cc b/aos/network/message_bridge_retry_test.cc
new file mode 100644
index 0000000..4110a76
--- /dev/null
+++ b/aos/network/message_bridge_retry_test.cc
@@ -0,0 +1,141 @@
+#include <chrono>
+#include <thread>
+
+#include "absl/strings/str_cat.h"
+#include "gtest/gtest.h"
+
+#include "aos/events/ping_generated.h"
+#include "aos/events/pong_generated.h"
+#include "aos/ipc_lib/event.h"
+#include "aos/network/message_bridge_client_lib.h"
+#include "aos/network/message_bridge_protocol.h"
+#include "aos/network/message_bridge_server_lib.h"
+#include "aos/network/message_bridge_test_lib.h"
+#include "aos/network/team_number.h"
+#include "aos/sha256.h"
+#include "aos/testing/path.h"
+#include "aos/util/file.h"
+
+DECLARE_int32(force_wmem_max);
+
+namespace aos {
+
+namespace message_bridge {
+namespace testing {
+
+void SendPing(aos::Sender<examples::Ping> *sender, int value) {
+  aos::Sender<examples::Ping>::Builder builder = sender->MakeBuilder();
+  // Artificially inflate message size by adding a bunch of padding.
+  builder.fbb()->CreateVector(std::vector<int>(1000, 0));
+  examples::Ping::Builder ping_builder = builder.MakeBuilder<examples::Ping>();
+  ping_builder.add_value(value);
+  builder.CheckOk(builder.Send(ping_builder.Finish()));
+}
+
+// Test that if we fill up the kernel buffers then the message bridge code does
+// indeed trigger (and succeed at triggering) its retry logic. Separated from
+// the normal message bridge tests because triggering this originally seemed
+// likely to be prone to extreme flakiness depending on the platform it is run
+// on. In practice, it actually seems to be *more* reliable than the normal
+// message_bridge_test, so we kept it separate.
+TEST_P(MessageBridgeParameterizedTest, ReliableRetries) {
+  // Set an absurdly small wmem max. This will help to trigger retries.
+  FLAGS_force_wmem_max = 1024;
+  OnPi1();
+
+  FLAGS_application_name = "sender";
+  aos::ShmEventLoop send_event_loop(&config.message());
+  aos::Sender<examples::Ping> ping_sender =
+      send_event_loop.MakeSender<examples::Ping>("/test");
+  SendPing(&ping_sender, 1);
+  aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
+      send_event_loop.MakeFetcher<ServerStatistics>("/aos");
+
+  MakePi1Server();
+  MakePi1Client();
+
+  // Now do it for "raspberrypi2", the client.
+  OnPi2();
+
+  MakePi2Server();
+
+  aos::ShmEventLoop receive_event_loop(&config.message());
+  aos::Fetcher<examples::Ping> ping_fetcher =
+      receive_event_loop.MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
+      receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
+
+  // Before everything starts up, confirm there is no message.
+  EXPECT_FALSE(ping_fetcher.Fetch());
+
+  // Spin up the persistent pieces.
+  StartPi1Server();
+  StartPi1Client();
+  StartPi2Server();
+
+  {
+    constexpr size_t kNumPingMessages = 25;
+    // Now, spin up a client for 2 seconds.
+    MakePi2Client();
+    StartPi2Client();
+
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+
+    for (size_t i = 0; i < kNumPingMessages; ++i) {
+      SendPing(&ping_sender, i);
+    }
+
+    // Give plenty of time for retries to succeed.
+    std::this_thread::sleep_for(std::chrono::seconds(5));
+
+    StopPi2Client();
+
+    // Confirm there is no detected duplicate packet.
+    EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+    EXPECT_GT(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->received_packets(),
+              kNumPingMessages);
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->duplicate_packets(),
+              0u);
+
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->partial_deliveries(),
+              0u);
+
+    // Check that we received the reliable message that was sent before
+    // starting.
+    EXPECT_TRUE(ping_fetcher.FetchNext());
+    EXPECT_EQ(ping_fetcher->value(), 1);
+
+    // Check that we got all the messages sent while running.
+    for (size_t i = 0; i < kNumPingMessages; ++i) {
+      EXPECT_TRUE(ping_fetcher.FetchNext());
+      EXPECT_EQ(ping_fetcher->value(), i);
+    }
+
+    EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+    EXPECT_GT(
+        pi1_server_statistics_fetcher->connections()->Get(0)->sent_packets(),
+        kNumPingMessages);
+    EXPECT_GT(
+        pi1_server_statistics_fetcher->connections()->Get(0)->retry_count(), 0u)
+        << FlatbufferToJson(pi1_server_statistics_fetcher.get());
+  }
+
+  // Shut everyone else down.
+  StopPi1Client();
+  StopPi2Server();
+  StopPi1Server();
+}
+
+INSTANTIATE_TEST_SUITE_P(MessageBridgeTests, MessageBridgeParameterizedTest,
+                         ::testing::Values(Param{
+                             "message_bridge_test_common_config.json", false}));
+
+}  // namespace testing
+}  // namespace message_bridge
+}  // namespace aos
diff --git a/aos/network/message_bridge_server.fbs b/aos/network/message_bridge_server.fbs
index e936828..b05d933 100644
--- a/aos/network/message_bridge_server.fbs
+++ b/aos/network/message_bridge_server.fbs
@@ -13,6 +13,8 @@
   // Total number of messages that were dropped while sending (e.g.,
   // those dropped by the kernel).
   dropped_packets:uint (id: 2);
+  // Count of the total number of retries attempted on this channel.
+  retry_count:uint (id: 3);
 }
 
 // State of the connection.
@@ -62,6 +64,10 @@
   // Statistics for every channel being forwarded to this node. Ordering is arbitrary;
   // the channels are identified by an index in the ServerChannelStatistics.
   channels:[ServerChannelStatistics] (id: 10);
+
+  // Total number of retries attempted on all channels. Typically due to kernel
+  // send buffers filling up.
+  retry_count:uint (id: 11);
 }
 
 // Statistics for all connections to all the clients.
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 22b2f3f..debfeeb 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -16,6 +16,39 @@
 #include "aos/network/sctp_server.h"
 #include "aos/network/timestamp_channel.h"
 
+// For retrying sends on reliable channels, we will do an additive backoff
+// strategy where we start at FLAGS_min_retry_period_ms and then add
+// kRetryAdditivePeriod every time the retry fails, up until
+// FLAGS_max_retry_period_ms. These numbers are somewhat arbitrarily chosen.
+// For the minimum retry period, choose 10ms as that is slow enough that the
+// kernel should have had time to clear its buffers, while being fast enough
+// that hopefully it is a relatively minor blip for anything that isn't
+// timing-critical (and timing-critical things that hit the retry logic are
+// probably in trouble).
+DEFINE_uint32(min_retry_period_ms, 10,
+              "Maximum retry timer period--the exponential backoff will not "
+              "exceed this period, in milliseconds.");
+// Amount of backoff to add every time a retry fails. Chosen semi-arbitrarily;
+// 100ms is large enough that the backoff actually does increase at a reasonable
+// rate, while preventing the period from growing so fast that it can readily
+// take multiple seconds for a retry to occur.
+DEFINE_uint32(retry_period_additive_backoff_ms, 100,
+              "Amount of time to add to the retry period every time a retry "
+              "fails, in milliseconds.");
+// Max out retry period at 10 seconds---this is generally a much longer
+// timescale than anything normally happening on our systems, while still being
+// short enough that the retries will regularly happen (basically, the maximum
+// should be short enough that a human trying to debug issues with the system
+// will still see the retries regularly happening as they debug, rather than
+// having to wait minutes or hours for a retry to occur).
+DEFINE_uint32(max_retry_period_ms, 10000,
+              "Maximum retry timer period--the additive backoff will not "
+              "exceed this period, in milliseconds.");
+
+DEFINE_int32(force_wmem_max, -1,
+             "If set to a nonnegative numbers, the wmem buffer size to use, in "
+             "bytes. Intended solely for testing purposes.");
+
 namespace aos {
 namespace message_bridge {
 namespace chrono = std::chrono;
@@ -28,9 +61,9 @@
 }
 
 flatbuffers::FlatBufferBuilder ChannelState::PackContext(
-    FixedAllocator *allocator, const Context &context) {
+    const Context &context) {
   flatbuffers::FlatBufferBuilder fbb(
-      channel_->max_size() + kHeaderSizeOverhead(), allocator);
+      channel_->max_size() + kHeaderSizeOverhead(), allocator_);
   fbb.ForceDefaults(true);
   VLOG(2) << "Found " << peers_.size() << " peers on channel "
           << channel_->name()->string_view() << " "
@@ -60,34 +93,137 @@
   return fbb;
 }
 
-void ChannelState::SendData(SctpServer *server, FixedAllocator *allocator,
-                            const Context &context) {
-  // TODO(austin): I don't like allocating this buffer when we are just freeing
-  // it at the end of the function.
-  flatbuffers::FlatBufferBuilder fbb = PackContext(allocator, context);
+ChannelState::ChannelState(aos::EventLoop *event_loop, const Channel *channel,
+                           int channel_index, SctpServer *server,
+                           FixedAllocator *allocator)
+    : event_loop_(event_loop),
+      channel_index_(channel_index),
+      channel_(channel),
+      server_(server),
+      allocator_(allocator),
+      last_message_fetcher_(event_loop->MakeRawFetcher(channel)),
+      retry_timer_(event_loop->AddTimer([this]() { SendData(); })),
+      retry_period_(std::chrono::milliseconds(FLAGS_min_retry_period_ms)) {
+  retry_timer_->set_name(absl::StrFormat("retry%d", channel_index));
+}
 
-  // TODO(austin): Track which connections need to be reliable and handle
-  // resending properly.
+bool ChannelState::PeerReadyToFetchNext(const Peer &peer,
+                                        const Context &context) const {
+  if (peer.sac_assoc_id == 0) {
+    // The peer is not connected; don't wait for it.
+    return true;
+  }
+  if (context.data == nullptr) {
+    // There is nothing on the Fetcher that we could still be trying to send.
+    return true;
+  }
+  if (!peer.last_sent_index.has_value()) {
+    // Nothing has been sent yet, so we can't possibly be ready to fetch the
+    // next message.
+    return false;
+  }
+  if (peer.last_sent_index.value() != context.queue_index) {
+    return false;
+  }
+  return true;
+}
+
+bool ChannelState::ReadyToFetchNext() const {
+  for (const Peer &peer : peers_) {
+    if (!PeerReadyToFetchNext(peer, last_message_fetcher_->context())) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool ChannelState::AnyNodeConnected() const {
+  for (const Peer &peer : peers_) {
+    if (peer.sac_assoc_id != 0) {
+      return true;
+    }
+  }
+  return false;
+}
+
+void ChannelState::SendData() {
+  // The goal of this logic is to make it so that we continually send out any
+  // message data available on the current channel, until we reach a point where
+  // either (a) we run out of messages to send or (b) sends start to fail.
+  do {
+    if (ReadyToFetchNext()) {
+      retry_period_ = std::chrono::milliseconds(FLAGS_min_retry_period_ms);
+      if (!last_message_fetcher_->FetchNext()) {
+        return;
+      }
+    }
+  } while (TrySendData(last_message_fetcher_->context()));
+}
+
+bool ChannelState::TrySendData(const Context &context) {
+  CHECK(context.data != nullptr)
+      << configuration::StrippedChannelToString(channel_);
+  // TODO(austin): I don't like allocating this buffer when we are just
+  // freeing it at the end of the function.
+  flatbuffers::FlatBufferBuilder fbb = PackContext(context);
+
   size_t sent_count = 0;
   bool logged_remotely = false;
+  bool retry_required = false;
+  VLOG(1) << "Send for " << configuration::StrippedChannelToString(channel_)
+          << " with " << context.queue_index << " and data " << context.data;
   for (Peer &peer : peers_) {
+    if (PeerReadyToFetchNext(peer, context)) {
+      VLOG(1) << "Skipping send for "
+              << configuration::StrippedChannelToString(channel_) << " to "
+              << FlatbufferToJson(peer.connection) << " with queue index of "
+              << context.queue_index;
+      // Either:
+      // * We already sent on this connection; we do not need to do anything
+      //   further.
+      // * The peer in question is not connected.
+      continue;
+    }
     logged_remotely = logged_remotely || peer.logged_remotely;
 
+    const int time_to_live_ms = peer.connection->time_to_live() / 1000000;
+    CHECK((time_to_live_ms == 0) == (peer.connection->time_to_live() == 0))
+        << ": TTLs below 1ms are not supported, as they would get rounded "
+           "down "
+           "to zero, which is used to indicate a reliable connection.";
+
     if (peer.sac_assoc_id != 0 &&
-        server->Send(std::string_view(
-                         reinterpret_cast<const char *>(fbb.GetBufferPointer()),
-                         fbb.GetSize()),
-                     peer.sac_assoc_id, peer.stream,
-                     peer.connection->time_to_live() / 1000000)) {
+        server_->Send(std::string_view(reinterpret_cast<const char *>(
+                                           fbb.GetBufferPointer()),
+                                       fbb.GetSize()),
+                      peer.sac_assoc_id, peer.stream, time_to_live_ms)) {
       peer.server_status->AddSentPacket(peer.node_index, channel_);
       if (peer.logged_remotely) {
         ++sent_count;
       }
+      peer.last_sent_index = context.queue_index;
     } else {
-      peer.server_status->AddDroppedPacket(peer.node_index, channel_);
+      if (time_to_live_ms == 0) {
+        // Reliable connection that failed to send; set a retry timer.
+        retry_required = true;
+        peer.server_status->AddPacketRetry(peer.node_index, channel_);
+      } else {
+        // Unreliable connection that failed to send; losses
+        // are permitted, so just mark it as sent.
+        peer.server_status->AddDroppedPacket(peer.node_index, channel_);
+        peer.last_sent_index = context.queue_index;
+      }
     }
   }
 
+  if (retry_required) {
+    retry_timer_->Schedule(event_loop_->monotonic_now() + retry_period_);
+    retry_period_ = std::min(
+        retry_period_ +
+            std::chrono::milliseconds(FLAGS_retry_period_additive_backoff_ms),
+        std::chrono::milliseconds(FLAGS_max_retry_period_ms));
+  }
+
   if (logged_remotely) {
     if (sent_count == 0) {
       VLOG(1)
@@ -110,6 +246,7 @@
   // TODO(austin): ~10 MB chunks on disk and push them over the logging
   // channel?  Threadsafe disk backed queue object which can handle restarts
   // and flushes.  Whee.
+  return !retry_required;
 }
 
 void ChannelState::HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t /*ssn*/,
@@ -181,13 +318,17 @@
 }
 
 int ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
-  VLOG(1) << "Disconnected " << assoc_id;
+  VLOG(1) << "Disconnected " << assoc_id << " for "
+          << configuration::StrippedChannelToString(channel_);
   for (ChannelState::Peer &peer : peers_) {
     if (peer.sac_assoc_id == assoc_id) {
       // TODO(austin): This will not handle multiple clients from
       // a single node.  But that should be rare.
       peer.sac_assoc_id = 0;
       peer.stream = 0;
+      // We do not guarantee the consistent delivery of reliable channels
+      // through node disconnects.
+      peer.last_sent_index.reset();
       return peer.node_index;
     }
   }
@@ -195,8 +336,7 @@
 }
 
 int ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
-                                int stream, SctpServer *server,
-                                FixedAllocator *allocator,
+                                int stream,
                                 aos::monotonic_clock::time_point monotonic_now,
                                 std::vector<sctp_assoc_t> *reconnected) {
   VLOG(1) << "Channel " << channel_->name()->string_view() << " "
@@ -226,38 +366,35 @@
                 << " already connected on " << peer.sac_assoc_id
                 << " aborting old connection and switching to " << assoc_id;
           }
-          server->Abort(peer.sac_assoc_id);
+          server_->Abort(peer.sac_assoc_id);
+          // sac_assoc_id will be set again before NodeConnected() exits, but we
+          // clear it here so that AnyNodeConnected() (or any other similar
+          // function calls) observe the node as having had an aborted
+          // connection.
+          peer.sac_assoc_id = 0;
         }
       }
 
+      // Clear the last sent index to force a retry of any reliable channels.
+      peer.last_sent_index.reset();
+
+      if (!AnyNodeConnected()) {
+        // If no one else is connected yet, reset the Fetcher.
+        last_message_fetcher_->Fetch();
+        retry_period_ = std::chrono::milliseconds(FLAGS_min_retry_period_ms);
+      }
+      // Unreliable channels aren't supposed to send out the latest fetched
+      // message.
+      if (peer.connection->time_to_live() != 0 &&
+          last_message_fetcher_->context().data != nullptr) {
+        peer.last_sent_index = last_message_fetcher_->context().queue_index;
+      }
       peer.sac_assoc_id = assoc_id;
       peer.stream = stream;
       peer.server_status->Connect(peer.node_index, monotonic_now);
 
-      server->SetStreamPriority(assoc_id, stream, peer.connection->priority());
-      if (last_message_fetcher_ && peer.connection->time_to_live() == 0) {
-        last_message_fetcher_->Fetch();
-        VLOG(1) << "Got a connection on a reliable channel "
-                << configuration::StrippedChannelToString(
-                       last_message_fetcher_->channel())
-                << ", sending? "
-                << (last_message_fetcher_->context().data != nullptr);
-        if (last_message_fetcher_->context().data != nullptr) {
-          // SendData sends to all...  Only send to the new one.
-          flatbuffers::FlatBufferBuilder fbb =
-              PackContext(allocator, last_message_fetcher_->context());
-
-          if (server->Send(std::string_view(reinterpret_cast<const char *>(
-                                                fbb.GetBufferPointer()),
-                                            fbb.GetSize()),
-                           peer.sac_assoc_id, peer.stream,
-                           peer.connection->time_to_live() / 1000000)) {
-            peer.server_status->AddSentPacket(peer.node_index, channel_);
-          } else {
-            peer.server_status->AddDroppedPacket(peer.node_index, channel_);
-          }
-        }
-      }
+      server_->SetStreamPriority(assoc_id, stream, peer.connection->priority());
+      SendData();
       return peer.node_index;
     }
   }
@@ -270,11 +407,7 @@
       timestamp_loggers_(event_loop_),
       server_(max_channels() + kControlStreams(), "",
               event_loop->node()->port()),
-      server_status_(event_loop,
-                     [this](const Context &context) {
-                       timestamp_state_->SendData(&server_, &allocator_,
-                                                  context);
-                     }),
+      server_status_(event_loop, [this]() { timestamp_state_->SendData(); }),
       config_sha256_(std::move(config_sha256)),
       allocator_(0) {
   CHECK_EQ(config_sha256_.size(), 64u) << ": Wrong length sha256sum";
@@ -333,12 +466,10 @@
 
     if (configuration::ChannelIsForwardedFromNode(channel,
                                                   event_loop_->node())) {
-      bool any_reliable = false;
       for (const Connection *connection : *channel->destination_nodes()) {
         if (connection->time_to_live() == 0) {
           reliable_buffer_size +=
               static_cast<size_t>(channel->max_size() + kHeaderSizeOverhead());
-          any_reliable = true;
         }
       }
 
@@ -351,8 +482,7 @@
           max_channel_buffer_size);
 
       std::unique_ptr<ChannelState> state(new ChannelState{
-          channel, channel_index,
-          any_reliable ? event_loop_->MakeRawFetcher(channel) : nullptr});
+          event_loop, channel, channel_index, &server_, &allocator_});
 
       for (const Connection *connection : *channel->destination_nodes()) {
         const Node *other_node = configuration::GetNode(
@@ -380,11 +510,8 @@
       if (channel != timestamp_channel) {
         // Call SendData for every message.
         ChannelState *state_ptr = state.get();
-        event_loop_->MakeRawWatcher(
-            channel, [this, state_ptr](const Context &context,
-                                       const void * /*message*/) {
-              state_ptr->SendData(&server_, &allocator_, context);
-            });
+        event_loop_->MakeRawNoArgWatcher(
+            channel, [state_ptr](const Context &) { state_ptr->SendData(); });
       } else {
         for (const Connection *connection : *channel->destination_nodes()) {
           CHECK_GE(connection->time_to_live(), 1000u);
@@ -394,8 +521,8 @@
       }
       channels_.emplace_back(std::move(state));
     } else if (channel == timestamp_channel) {
-      std::unique_ptr<ChannelState> state(
-          new ChannelState{channel, channel_index, nullptr});
+      std::unique_ptr<ChannelState> state(new ChannelState{
+          event_loop_, channel, channel_index, &server_, &allocator_});
       for (const Connection *connection : *channel->destination_nodes()) {
         CHECK_GE(connection->time_to_live(), 1000u);
       }
@@ -415,8 +542,12 @@
   LOG(INFO) << "Reliable buffer size for all clients is "
             << reliable_buffer_size;
   server_.SetMaxReadSize(max_size);
-  server_.SetMaxWriteSize(
-      std::max(max_channel_buffer_size, reliable_buffer_size));
+  if (FLAGS_force_wmem_max >= 0) {
+    server_.SetMaxWriteSize(FLAGS_force_wmem_max);
+  } else {
+    server_.SetMaxWriteSize(
+        std::max(max_channel_buffer_size, reliable_buffer_size));
+  }
 
   // Since we are doing interleaving mode 1, we will see at most 1 message being
   // delivered at a time for an association.  That means, if a message is
@@ -624,8 +755,7 @@
         if (channel_state->Matches(channel)) {
           node_index = channel_state->NodeConnected(
               connect->node(), message->header.rcvinfo.rcv_assoc_id,
-              channel_index, &server_, &allocator_, monotonic_now,
-              &reconnected_);
+              channel_index, monotonic_now, &reconnected_);
           CHECK_NE(node_index, -1)
               << ": Failed to find node "
               << aos::FlatbufferToJson(connect->node()) << " for connection "
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index d74a847..6a3fdd8 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -28,11 +28,9 @@
 // new message from the event loop.
 class ChannelState {
  public:
-  ChannelState(const Channel *channel, int channel_index,
-               std::unique_ptr<aos::RawFetcher> last_message_fetcher)
-      : channel_index_(channel_index),
-        channel_(channel),
-        last_message_fetcher_(std::move(last_message_fetcher)) {}
+  ChannelState(aos::EventLoop *event_loop, const Channel *channel,
+               int channel_index, SctpServer *server,
+               FixedAllocator *allocator);
 
   // Class to encapsulate all the state per client on a channel.  A client may
   // be subscribed to multiple channels.
@@ -61,6 +59,12 @@
     // If true, this message will be logged on a receiving node.  We need to
     // keep it around to log it locally if that fails.
     bool logged_remotely = false;
+
+    // Last "successfully" sent message for this connection. For reliable
+    // connections, this being set to a value will indicate that the message was
+    // truly successfully sent. For unreliable connections, this will get set as
+    // soon as we've attempted to send it.
+    std::optional<size_t> last_sent_index = std::nullopt;
   };
 
   // Needs to be called when a node (might have) disconnected.
@@ -70,7 +74,6 @@
   // reconnects.
   int NodeDisconnected(sctp_assoc_t assoc_id);
   int NodeConnected(const Node *node, sctp_assoc_t assoc_id, int stream,
-                    SctpServer *server, FixedAllocator *allocator,
                     aos::monotonic_clock::time_point monotonic_now,
                     std::vector<sctp_assoc_t> *reconnected);
 
@@ -84,13 +87,12 @@
   // channel.
   bool Matches(const Channel *other_channel);
 
-  // Sends the data in context using the provided server.
-  void SendData(SctpServer *server, FixedAllocator *allocator,
-                const Context &context);
+  // Sends as much data on this channel as is possible using the internal
+  // fetcher.
+  void SendData();
 
   // Packs a context into a size prefixed message header for transmission.
-  flatbuffers::FlatBufferBuilder PackContext(FixedAllocator *allocator,
-                                             const Context &context);
+  flatbuffers::FlatBufferBuilder PackContext(const Context &context);
 
   // Handles reception of delivery times.
   void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
@@ -99,14 +101,66 @@
                       MessageBridgeServerStatus *server_status);
 
  private:
+  // When sending a message, we must guarantee that reliable messages make it to
+  // their destinations. Unfortunately, we cannot purely rely on the kernel to
+  // provide this guarantee, as the internal send buffer can fill up, resulting
+  // in Send() calls failing. To guarantee that we end up sending reliable
+  // messages, we do the following:
+  // * For channels with no reliable connections, we send the message and do not
+  //   retry if the kernel rejects it.
+  // * For channels with at least one reliable connection:
+  //   * We will always attempt to retry failed sends on reliable connections
+  //     (if a channel has mixed reliable/unreliable connections, the unreliable
+  //     connections are not retried).
+  //   * Until we have successfully sent message X on every single reliable
+  //     connection, we will not progress to sending X+1 on *any* connection.
+  //     This reduces the number of Fetchers that we must maintain for each
+  //     channel.
+  //   * If a given client node is not connected (or becomes disconnected), then
+  //     it will be ignored and will not block the progression of sending of
+  //     reliable messages to other nodes (connection state is tracked through
+  //     Peer::sac_assoc_id).
+  //   * Retries will be performed with an additive backoff up to a set
+  //     maximum. The backoff duration resets once the retry succeeds.
+  //   * If we fall so far behind that the Fetcher drops off the end of the
+  //     queue, then we kill the message bridge.
+
+  // Returns false if a retry will be required for the message in question, and
+  // true if it was sent "successfully" (note that for unreliable messages, we
+  // may drop the message but still return true here).
+  bool TrySendData(const Context &context);
+
+  // Returns true if all of the peer connections are in a state where we are
+  // permitted to progress to sending the next message. As described above, this
+  // will never block on any unreliable connections, but will not return true
+  // until every reliable connection has successfully sent the currently fetched
+  // message.
+  bool ReadyToFetchNext() const;
+  // Returns true if the given peer can move to the next message (used by
+  // ReadyToFetchNext()).
+  bool PeerReadyToFetchNext(const Peer &peer, const Context &context) const;
+
+  bool AnyNodeConnected() const;
+
+  aos::EventLoop *const event_loop_;
   const int channel_index_;
   const Channel *const channel_;
 
+  SctpServer *server_;
+  FixedAllocator *allocator_;
+
   std::vector<Peer> peers_;
 
-  // A fetcher to use to send the last message when a node connects and is
-  // reliable.
+  // A fetcher to use to send the message. For reliable channels this is
+  // used both on startup to fetch the latest message as well as to
+  // support retries of messages. For unreliable channels, we use the
+  // Fetcher to minimize the diff with the reliable codepath, but it
+  // provides no utility over just using a Watcher directly.
   std::unique_ptr<aos::RawFetcher> last_message_fetcher_;
+  // For reliable channels, the timer to use to retry sends on said channel.
+  aos::TimerHandler *retry_timer_;
+  // Current retry period.
+  std::chrono::milliseconds retry_period_;
 };
 
 // This encapsulates the state required to talk to *all* the clients from this
@@ -115,6 +169,13 @@
  public:
   MessageBridgeServer(aos::ShmEventLoop *event_loop, std::string config_sha256);
 
+  // Delete copy/move constructors explicitly--we internally pass around
+  // pointers to internal state.
+  MessageBridgeServer(MessageBridgeServer &&) = delete;
+  MessageBridgeServer(const MessageBridgeServer &) = delete;
+  MessageBridgeServer &operator=(MessageBridgeServer &&) = delete;
+  MessageBridgeServer &operator=(const MessageBridgeServer &) = delete;
+
   ~MessageBridgeServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
 
  private:
diff --git a/aos/network/message_bridge_server_status.cc b/aos/network/message_bridge_server_status.cc
index 5607d4a..22270d9 100644
--- a/aos/network/message_bridge_server_status.cc
+++ b/aos/network/message_bridge_server_status.cc
@@ -49,6 +49,7 @@
     connection_builder.add_node(node_offset);
     connection_builder.add_state(State::DISCONNECTED);
     connection_builder.add_dropped_packets(0);
+    connection_builder.add_retry_count(0);
     connection_builder.add_sent_packets(0);
     connection_builder.add_monotonic_offset(0);
     connection_builder.add_partial_deliveries(0);
@@ -90,7 +91,7 @@
 }  // namespace
 
 MessageBridgeServerStatus::MessageBridgeServerStatus(
-    aos::EventLoop *event_loop, std::function<void(const Context &)> send_data)
+    aos::EventLoop *event_loop, std::function<void()> send_data)
     : event_loop_(event_loop),
       sender_(event_loop_->MakeSender<ServerStatistics>("/aos")),
       statistics_(MakeServerStatistics(
@@ -137,6 +138,7 @@
           configuration::ChannelIndex(event_loop_->configuration(), channel);
       initial_statistics.sent_packets = 0;
       initial_statistics.dropped_packets = 0;
+      initial_statistics.retry_count = 0;
       channel_statistics[channel] = initial_statistics;
     }
 
@@ -182,6 +184,7 @@
   connection->mutate_sent_packets(connection->sent_packets() + 1);
   node.channel_statistics[channel].sent_packets++;
 }
+
 void MessageBridgeServerStatus::AddDroppedPacket(int node_index,
                                                  const aos::Channel *channel) {
   CHECK(nodes_[node_index].has_value());
@@ -191,6 +194,14 @@
   node.channel_statistics[channel].dropped_packets++;
 }
 
+void MessageBridgeServerStatus::AddPacketRetry(int node_index,
+                                               const aos::Channel *channel) {
+  NodeState &node = nodes_[node_index].value();
+  ServerConnection *connection = node.server_connection;
+  connection->mutate_retry_count(connection->retry_count() + 1);
+  node.channel_statistics[channel].retry_count++;
+}
+
 void MessageBridgeServerStatus::SetBootUUID(int node_index,
                                             const UUID &boot_uuid) {
   nodes_[node_index].value().boot_uuid = boot_uuid;
@@ -281,6 +292,7 @@
     server_connection_builder.add_dropped_packets(
         connection->dropped_packets());
     server_connection_builder.add_sent_packets(connection->sent_packets());
+    server_connection_builder.add_retry_count(connection->retry_count());
     server_connection_builder.add_partial_deliveries(
         PartialDeliveries(node_index));
     server_connection_builder.add_channels(channels_offset);
@@ -465,18 +477,10 @@
   timestamp_failure_counter_.Count(err);
   // Reply only if we successfully sent the timestamp
   if (err == RawSender::Error::kOk) {
-    Context context;
-    context.monotonic_event_time = timestamp_sender_.monotonic_sent_time();
-    context.realtime_event_time = timestamp_sender_.realtime_sent_time();
-    context.queue_index = timestamp_sender_.sent_queue_index();
-    context.size = timestamp_copy.span().size();
-    context.source_boot_uuid = event_loop_->boot_uuid();
-    context.data = timestamp_copy.span().data();
-
     // Since we are building up the timestamp to send here, we need to trigger
     // the SendData call ourselves.
     if (send_data_) {
-      send_data_(context);
+      send_data_();
     }
   }
 }
diff --git a/aos/network/message_bridge_server_status.h b/aos/network/message_bridge_server_status.h
index d6edb89..3945d57 100644
--- a/aos/network/message_bridge_server_status.h
+++ b/aos/network/message_bridge_server_status.h
@@ -51,9 +51,9 @@
     uint32_t partial_deliveries = 0;
   };
 
-  MessageBridgeServerStatus(aos::EventLoop *event_loop,
-                            std::function<void(const Context &)> send_data =
-                                std::function<void(const Context &)>());
+  MessageBridgeServerStatus(
+      aos::EventLoop *event_loop,
+      std::function<void()> send_data = std::function<void()>());
 
   MessageBridgeServerStatus(const MessageBridgeServerStatus &) = delete;
   MessageBridgeServerStatus(MessageBridgeServerStatus &&) = delete;
@@ -61,7 +61,7 @@
       delete;
   MessageBridgeServerStatus &operator=(MessageBridgeServerStatus &&) = delete;
 
-  void set_send_data(std::function<void(const Context &)> send_data) {
+  void set_send_data(std::function<void()> send_data) {
     send_data_ = send_data;
   }
 
@@ -97,6 +97,7 @@
   // node_index must be a valid client node.
   void AddSentPacket(int node_index, const aos::Channel *channel);
   void AddDroppedPacket(int node_index, const aos::Channel *channel);
+  void AddPacketRetry(int node_index, const aos::Channel *channel);
 
   // Returns the ServerConnection message which is updated by the server.
   ServerConnection *FindServerConnection(std::string_view node_name);
@@ -148,7 +149,7 @@
   aos::monotonic_clock::time_point last_statistics_send_time_ =
       aos::monotonic_clock::min_time;
 
-  std::function<void(const Context &)> send_data_;
+  std::function<void()> send_data_;
 
   bool send_ = true;
 
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 991d618..4eb5e8b 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -10,323 +10,21 @@
 #include "aos/network/message_bridge_client_lib.h"
 #include "aos/network/message_bridge_protocol.h"
 #include "aos/network/message_bridge_server_lib.h"
+#include "aos/network/message_bridge_test_lib.h"
 #include "aos/network/team_number.h"
 #include "aos/sha256.h"
 #include "aos/testing/path.h"
 #include "aos/util/file.h"
 
-DECLARE_string(boot_uuid);
-
 namespace aos {
-void SetShmBase(const std::string_view base);
 
 namespace message_bridge {
 namespace testing {
 
-using aos::testing::ArtifactPath;
-
-namespace chrono = std::chrono;
-
-std::string ShmBase(const std::string_view node) {
-  const char *tmpdir_c_str = getenv("TEST_TMPDIR");
-  if (tmpdir_c_str != nullptr) {
-    return absl::StrCat(tmpdir_c_str, "/", node);
-  } else {
-    return absl::StrCat("/dev/shm/", node);
-  }
-}
-
-void DoSetShmBase(const std::string_view node) {
-  aos::SetShmBase(ShmBase(node));
-}
-
-// Class to manage starting and stopping a thread with an event loop in it.  The
-// thread is guarenteed to be running before the constructor exits.
-class ThreadedEventLoopRunner {
- public:
-  ThreadedEventLoopRunner(aos::ShmEventLoop *event_loop)
-      : event_loop_(event_loop), my_thread_([this]() {
-          LOG(INFO) << "Started " << event_loop_->name();
-          event_loop_->OnRun([this]() { event_.Set(); });
-          event_loop_->Run();
-        }) {
-    event_.Wait();
-  }
-
-  ~ThreadedEventLoopRunner() { Exit(); }
-
-  void Exit() {
-    if (my_thread_.joinable()) {
-      event_loop_->Exit();
-      my_thread_.join();
-      my_thread_ = std::thread();
-    }
-  }
-
- private:
-  aos::Event event_;
-  aos::ShmEventLoop *event_loop_;
-  std::thread my_thread_;
-};
-
-// Parameters to run all the tests with.
-struct Param {
-  // The config file to use.
-  std::string config;
-  // If true, the RemoteMessage channel should be shared between all the remote
-  // channels.  If false, there will be 1 RemoteMessage channel per remote
-  // channel.
-  bool shared;
-};
-
-class MessageBridgeParameterizedTest
-    : public ::testing::TestWithParam<struct Param> {
- public:
-  MessageBridgeParameterizedTest()
-      : config(aos::configuration::ReadConfig(
-            ArtifactPath(absl::StrCat("aos/network/", GetParam().config)))),
-        config_sha256(Sha256(config.span())),
-        pi1_boot_uuid_(UUID::Random()),
-        pi2_boot_uuid_(UUID::Random()) {
-    util::UnlinkRecursive(ShmBase("pi1"));
-    util::UnlinkRecursive(ShmBase("pi2"));
-  }
-
-  bool shared() const { return GetParam().shared; }
-
-  void OnPi1() {
-    DoSetShmBase("pi1");
-    FLAGS_override_hostname = "raspberrypi";
-    FLAGS_boot_uuid = pi1_boot_uuid_.ToString();
-  }
-
-  void OnPi2() {
-    DoSetShmBase("pi2");
-    FLAGS_override_hostname = "raspberrypi2";
-    FLAGS_boot_uuid = pi2_boot_uuid_.ToString();
-  }
-
-  void MakePi1Server(std::string server_config_sha256 = "") {
-    OnPi1();
-    FLAGS_application_name = "pi1_message_bridge_server";
-    pi1_server_event_loop =
-        std::make_unique<aos::ShmEventLoop>(&config.message());
-    pi1_server_event_loop->SetRuntimeRealtimePriority(1);
-    pi1_message_bridge_server = std::make_unique<MessageBridgeServer>(
-        pi1_server_event_loop.get(), server_config_sha256.size() == 0
-                                         ? config_sha256
-                                         : server_config_sha256);
-  }
-
-  void RunPi1Server(chrono::nanoseconds duration) {
-    // Set up a shutdown callback.
-    aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
-        [this]() { pi1_server_event_loop->Exit(); });
-    pi1_server_event_loop->OnRun([this, quit, duration]() {
-      // Stop between timestamps, not exactly on them.
-      quit->Schedule(pi1_server_event_loop->monotonic_now() + duration);
-    });
-
-    pi1_server_event_loop->Run();
-  }
-
-  void StartPi1Server() {
-    pi1_server_thread =
-        std::make_unique<ThreadedEventLoopRunner>(pi1_server_event_loop.get());
-  }
-
-  void StopPi1Server() {
-    pi1_server_thread.reset();
-    pi1_message_bridge_server.reset();
-    pi1_server_event_loop.reset();
-  }
-
-  void MakePi1Client() {
-    OnPi1();
-    FLAGS_application_name = "pi1_message_bridge_client";
-    pi1_client_event_loop =
-        std::make_unique<aos::ShmEventLoop>(&config.message());
-    pi1_client_event_loop->SetRuntimeRealtimePriority(1);
-    pi1_message_bridge_client = std::make_unique<MessageBridgeClient>(
-        pi1_client_event_loop.get(), config_sha256);
-  }
-
-  void StartPi1Client() {
-    pi1_client_thread =
-        std::make_unique<ThreadedEventLoopRunner>(pi1_client_event_loop.get());
-  }
-
-  void StopPi1Client() {
-    pi1_client_thread.reset();
-    pi1_message_bridge_client.reset();
-    pi1_client_event_loop.reset();
-  }
-
-  void MakePi1Test() {
-    OnPi1();
-    FLAGS_application_name = "test1";
-    pi1_test_event_loop =
-        std::make_unique<aos::ShmEventLoop>(&config.message());
-
-    pi1_test_event_loop->MakeWatcher(
-        "/pi1/aos", [](const ServerStatistics &stats) {
-          VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
-        });
-
-    pi1_test_event_loop->MakeWatcher(
-        "/pi1/aos", [](const ClientStatistics &stats) {
-          VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
-        });
-
-    pi1_test_event_loop->MakeWatcher(
-        "/pi1/aos", [](const Timestamp &timestamp) {
-          VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
-        });
-    pi1_test_event_loop->MakeWatcher(
-        "/pi2/aos", [this](const Timestamp &timestamp) {
-          VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
-          EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid,
-                    pi2_boot_uuid_);
-        });
-  }
-
-  void StartPi1Test() {
-    pi1_test_thread =
-        std::make_unique<ThreadedEventLoopRunner>(pi1_test_event_loop.get());
-  }
-
-  void StopPi1Test() { pi1_test_thread.reset(); }
-
-  void MakePi2Server() {
-    OnPi2();
-    FLAGS_application_name = "pi2_message_bridge_server";
-    pi2_server_event_loop =
-        std::make_unique<aos::ShmEventLoop>(&config.message());
-    pi2_server_event_loop->SetRuntimeRealtimePriority(1);
-    pi2_message_bridge_server = std::make_unique<MessageBridgeServer>(
-        pi2_server_event_loop.get(), config_sha256);
-  }
-
-  void RunPi2Server(chrono::nanoseconds duration) {
-    // Set up a shutdown callback.
-    aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
-        [this]() { pi2_server_event_loop->Exit(); });
-    pi2_server_event_loop->OnRun([this, quit, duration]() {
-      // Stop between timestamps, not exactly on them.
-      quit->Schedule(pi2_server_event_loop->monotonic_now() + duration);
-    });
-
-    pi2_server_event_loop->Run();
-  }
-
-  void StartPi2Server() {
-    pi2_server_thread =
-        std::make_unique<ThreadedEventLoopRunner>(pi2_server_event_loop.get());
-  }
-
-  void StopPi2Server() {
-    pi2_server_thread.reset();
-    pi2_message_bridge_server.reset();
-    pi2_server_event_loop.reset();
-  }
-
-  void MakePi2Client() {
-    OnPi2();
-    FLAGS_application_name = "pi2_message_bridge_client";
-    pi2_client_event_loop =
-        std::make_unique<aos::ShmEventLoop>(&config.message());
-    pi2_client_event_loop->SetRuntimeRealtimePriority(1);
-    pi2_message_bridge_client = std::make_unique<MessageBridgeClient>(
-        pi2_client_event_loop.get(), config_sha256);
-  }
-
-  void RunPi2Client(chrono::nanoseconds duration) {
-    // Run for 5 seconds to make sure we have time to estimate the offset.
-    aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
-        [this]() { pi2_client_event_loop->Exit(); });
-    pi2_client_event_loop->OnRun([this, quit, duration]() {
-      // Stop between timestamps, not exactly on them.
-      quit->Schedule(pi2_client_event_loop->monotonic_now() + duration);
-    });
-
-    // And go!
-    pi2_client_event_loop->Run();
-  }
-
-  void StartPi2Client() {
-    pi2_client_thread =
-        std::make_unique<ThreadedEventLoopRunner>(pi2_client_event_loop.get());
-  }
-
-  void StopPi2Client() {
-    pi2_client_thread.reset();
-    pi2_message_bridge_client.reset();
-    pi2_client_event_loop.reset();
-  }
-
-  void MakePi2Test() {
-    OnPi2();
-    FLAGS_application_name = "test2";
-    pi2_test_event_loop =
-        std::make_unique<aos::ShmEventLoop>(&config.message());
-
-    pi2_test_event_loop->MakeWatcher(
-        "/pi2/aos", [](const ServerStatistics &stats) {
-          VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
-        });
-
-    pi2_test_event_loop->MakeWatcher(
-        "/pi2/aos", [](const ClientStatistics &stats) {
-          VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
-        });
-
-    pi2_test_event_loop->MakeWatcher(
-        "/pi1/aos", [this](const Timestamp &timestamp) {
-          VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
-          EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid,
-                    pi1_boot_uuid_);
-        });
-    pi2_test_event_loop->MakeWatcher(
-        "/pi2/aos", [](const Timestamp &timestamp) {
-          VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
-        });
-  }
-
-  void StartPi2Test() {
-    pi2_test_thread =
-        std::make_unique<ThreadedEventLoopRunner>(pi2_test_event_loop.get());
-  }
-
-  void StopPi2Test() { pi2_test_thread.reset(); }
-
-  aos::FlatbufferDetachedBuffer<aos::Configuration> config;
-  std::string config_sha256;
-
-  const UUID pi1_boot_uuid_;
-  const UUID pi2_boot_uuid_;
-
-  std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
-  std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
-  std::unique_ptr<ThreadedEventLoopRunner> pi1_server_thread;
-
-  std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
-  std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
-  std::unique_ptr<ThreadedEventLoopRunner> pi1_client_thread;
-
-  std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
-  std::unique_ptr<ThreadedEventLoopRunner> pi1_test_thread;
-
-  std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
-  std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
-  std::unique_ptr<ThreadedEventLoopRunner> pi2_server_thread;
-
-  std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
-  std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
-  std::unique_ptr<ThreadedEventLoopRunner> pi2_client_thread;
-
-  std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
-  std::unique_ptr<ThreadedEventLoopRunner> pi2_test_thread;
-};
+// Note: All of these tests spin up ShmEventLoop's in separate threads to allow
+// us to run the "real" message bridge. This requires extra threading and timing
+// coordination to make happen, which is the reason for some of the extra
+// complexity in these tests.
 
 // Test that we can send a ping message over sctp and receive it.
 TEST_P(MessageBridgeParameterizedTest, PingPong) {
@@ -913,7 +611,7 @@
     StopPi2Client();
   }
 
-  // Shut everyone else down
+  // Shut everyone else down.
   StopPi1Server();
   StopPi1Client();
   StopPi2Server();
@@ -1073,7 +771,7 @@
     StopPi2Server();
   }
 
-  // Shut everyone else down
+  // Shut everyone else down.
   StopPi1Server();
   StopPi1Client();
   StopPi2Client();
@@ -1129,6 +827,8 @@
   const size_t ping_channel_index = configuration::ChannelIndex(
       receive_event_loop.configuration(), ping_fetcher.channel());
 
+  // ping_timestamp_count is accessed from multiple threads (the Watcher that
+  // triggers it is in a separate thread), so make it atomic.
   std::atomic<int> ping_timestamp_count{0};
   const std::string channel_name =
       shared() ? "/pi1/aos/remote_timestamps/pi2"
@@ -1150,7 +850,7 @@
   EXPECT_FALSE(ping_fetcher.Fetch());
   EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
 
-  // Spin up the persistant pieces.
+  // Spin up the persistent pieces.
   StartPi1Server();
   StartPi1Client();
   StartPi2Server();
@@ -1161,7 +861,7 @@
           &pi1_remote_timestamp_event_loop);
 
   {
-    // Now, spin up a client for 2 seconds.
+    // Now spin up a client for 2 seconds.
     MakePi2Client();
 
     RunPi2Client(chrono::milliseconds(2050));
@@ -1210,7 +910,7 @@
     StopPi2Client();
   }
 
-  // Shut everyone else down
+  // Shut everyone else down.
   StopPi1Client();
   StopPi2Server();
   pi1_remote_timestamp_thread.reset();
@@ -1258,6 +958,8 @@
   const size_t ping_channel_index = configuration::ChannelIndex(
       receive_event_loop.configuration(), ping_fetcher.channel());
 
+  // ping_timestamp_count is accessed from multiple threads (the Watcher that
+  // triggers it is in a separate thread), so make it atomic.
   std::atomic<int> ping_timestamp_count{0};
   const std::string channel_name =
       shared() ? "/pi1/aos/remote_timestamps/pi2"
@@ -1279,7 +981,7 @@
   EXPECT_FALSE(ping_fetcher.Fetch());
   EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
 
-  // Spin up the persistant pieces.
+  // Spin up the persistent pieces.
   StartPi1Client();
   StartPi2Server();
   StartPi2Client();
@@ -1339,13 +1041,144 @@
     StopPi1Server();
   }
 
-  // Shut everyone else down
+  // Shut everyone else down.
   StopPi1Client();
   StopPi2Server();
   StopPi2Client();
   pi1_remote_timestamp_thread.reset();
 }
 
+// Tests that when multiple reliable messages are sent during a time when the
+// client is restarting that only the final of those messages makes it to the
+// client. This ensures that we handle a disconnecting & reconnecting client
+// correctly in the server reliable connection retry logic.
+TEST_P(MessageBridgeParameterizedTest, ReliableSentDuringClientReboot) {
+  OnPi1();
+
+  FLAGS_application_name = "sender";
+  aos::ShmEventLoop send_event_loop(&config.message());
+  aos::Sender<examples::Ping> ping_sender =
+      send_event_loop.MakeSender<examples::Ping>("/test");
+  size_t ping_index = 0;
+  SendPing(&ping_sender, ++ping_index);
+
+  MakePi1Server();
+  MakePi1Client();
+
+  FLAGS_application_name = "pi1_timestamp";
+  aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
+
+  // Now do it for "raspberrypi2", the client.
+  OnPi2();
+
+  MakePi2Server();
+
+  aos::ShmEventLoop receive_event_loop(&config.message());
+  aos::Fetcher<examples::Ping> ping_fetcher =
+      receive_event_loop.MakeFetcher<examples::Ping>("/test");
+  aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
+      receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
+
+  const size_t ping_channel_index = configuration::ChannelIndex(
+      receive_event_loop.configuration(), ping_fetcher.channel());
+
+  // ping_timestamp_count is accessed from multiple threads (the Watcher that
+  // triggers it is in a separate thread), so make it atomic.
+  std::atomic<int> ping_timestamp_count{0};
+  const std::string channel_name =
+      shared() ? "/pi1/aos/remote_timestamps/pi2"
+               : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
+  pi1_remote_timestamp_event_loop.MakeWatcher(
+      channel_name, [this, channel_name, ping_channel_index,
+                     &ping_timestamp_count](const RemoteMessage &header) {
+        VLOG(1) << channel_name << " RemoteMessage "
+                << aos::FlatbufferToJson(&header);
+        EXPECT_TRUE(header.has_boot_uuid());
+        if (shared() && header.channel_index() != ping_channel_index) {
+          return;
+        }
+        CHECK_EQ(header.channel_index(), ping_channel_index);
+        ++ping_timestamp_count;
+      });
+
+  // Before everything starts up, confirm there is no message.
+  EXPECT_FALSE(ping_fetcher.Fetch());
+
+  // Spin up the persistent pieces.
+  StartPi1Server();
+  StartPi1Client();
+  StartPi2Server();
+
+  // Event used to wait for the timestamp counting thread to start.
+  std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
+      std::make_unique<ThreadedEventLoopRunner>(
+          &pi1_remote_timestamp_event_loop);
+
+  {
+    // Now, spin up a client for 2 seconds.
+    MakePi2Client();
+
+    RunPi2Client(chrono::milliseconds(2050));
+
+    // Confirm there is no detected duplicate packet.
+    EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->duplicate_packets(),
+              0u);
+
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->partial_deliveries(),
+              0u);
+
+    EXPECT_TRUE(ping_fetcher.Fetch());
+    EXPECT_EQ(ping_timestamp_count, 1);
+
+    StopPi2Client();
+  }
+
+  // Send some reliable messages while the client is dead. Only the final one
+  // should make it through.
+  while (ping_index < 10) {
+    SendPing(&ping_sender, ++ping_index);
+  }
+
+  {
+    // Now, spin up a client for 2 seconds.
+    MakePi2Client();
+
+    RunPi2Client(chrono::milliseconds(5050));
+
+    // No duplicate packets should have appeared.
+    EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->duplicate_packets(),
+              0u);
+
+    EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+                  ->Get(0)
+                  ->partial_deliveries(),
+              0u);
+
+    EXPECT_EQ(ping_timestamp_count, 2);
+    // We should have gotten precisely one more ping message--the latest one
+    // sent should've made it, but no previous ones.
+    EXPECT_TRUE(ping_fetcher.FetchNext());
+    EXPECT_EQ(ping_index, ping_fetcher->value());
+    EXPECT_FALSE(ping_fetcher.FetchNext());
+
+    StopPi2Client();
+  }
+
+  // Shut everyone else down.
+  StopPi1Client();
+  StopPi2Server();
+  pi1_remote_timestamp_thread.reset();
+  StopPi1Server();
+}
+
 // Test that differing config sha256's result in no connection.
 TEST_P(MessageBridgeParameterizedTest, MismatchedSha256) {
   // This is rather annoying to set up.  We need to start up a client and
@@ -1441,7 +1274,7 @@
     StopPi2Client();
   }
 
-  // Shut everyone else down
+  // Shut everyone else down.
   StopPi1Server();
   StopPi1Client();
   StopPi2Server();
@@ -1590,7 +1423,7 @@
     StopPi2Client();
   }
 
-  // Shut everyone else down
+  // Shut everyone else down.
   StopPi1Server();
   StopPi1Client();
   StopPi2Server();
diff --git a/aos/network/message_bridge_test_lib.cc b/aos/network/message_bridge_test_lib.cc
new file mode 100644
index 0000000..e818075
--- /dev/null
+++ b/aos/network/message_bridge_test_lib.cc
@@ -0,0 +1,265 @@
+#include "aos/network/message_bridge_test_lib.h"
+
+DECLARE_string(boot_uuid);
+
+namespace aos {
+void SetShmBase(const std::string_view base);
+
+namespace message_bridge::testing {
+
+namespace chrono = std::chrono;
+using aos::testing::ArtifactPath;
+
+std::string ShmBase(const std::string_view node) {
+  const char *const tmpdir_c_str = getenv("TEST_TMPDIR");
+  if (tmpdir_c_str != nullptr) {
+    return absl::StrCat(tmpdir_c_str, "/", node);
+  } else {
+    return absl::StrCat("/dev/shm/", node);
+  }
+}
+
+void DoSetShmBase(const std::string_view node) {
+  aos::SetShmBase(ShmBase(node));
+}
+
+ThreadedEventLoopRunner::ThreadedEventLoopRunner(aos::ShmEventLoop *event_loop)
+    : event_loop_(event_loop), my_thread_([this]() {
+        LOG(INFO) << "Started " << event_loop_->name();
+        event_loop_->OnRun([this]() { event_.Set(); });
+        event_loop_->Run();
+      }) {
+  event_.Wait();
+}
+
+ThreadedEventLoopRunner::~ThreadedEventLoopRunner() { Exit(); }
+
+void ThreadedEventLoopRunner::Exit() {
+  if (my_thread_.joinable()) {
+    event_loop_->Exit();
+    my_thread_.join();
+    my_thread_ = std::thread();
+  }
+}
+
+MessageBridgeParameterizedTest::MessageBridgeParameterizedTest()
+    : config(aos::configuration::ReadConfig(
+          ArtifactPath(absl::StrCat("aos/network/", GetParam().config)))),
+      config_sha256(Sha256(config.span())),
+      pi1_boot_uuid_(UUID::Random()),
+      pi2_boot_uuid_(UUID::Random()) {
+  // Make sure that we clean up all the shared memory queues so that we cannot
+  // inadvertently be influenced other tests or by previously run AOS
+  // applications (in a fully sharded test running inside the bazel sandbox,
+  // this should not matter).
+  util::UnlinkRecursive(ShmBase("pi1"));
+  util::UnlinkRecursive(ShmBase("pi2"));
+}
+
+bool MessageBridgeParameterizedTest::shared() const {
+  return GetParam().shared;
+}
+
+void MessageBridgeParameterizedTest::OnPi1() {
+  DoSetShmBase("pi1");
+  FLAGS_override_hostname = "raspberrypi";
+  FLAGS_boot_uuid = pi1_boot_uuid_.ToString();
+}
+
+void MessageBridgeParameterizedTest::OnPi2() {
+  DoSetShmBase("pi2");
+  FLAGS_override_hostname = "raspberrypi2";
+  FLAGS_boot_uuid = pi2_boot_uuid_.ToString();
+}
+
+void MessageBridgeParameterizedTest::MakePi1Server(
+    std::string server_config_sha256) {
+  OnPi1();
+  FLAGS_application_name = "pi1_message_bridge_server";
+  pi1_server_event_loop =
+      std::make_unique<aos::ShmEventLoop>(&config.message());
+  pi1_server_event_loop->SetRuntimeRealtimePriority(1);
+  pi1_message_bridge_server = std::make_unique<MessageBridgeServer>(
+      pi1_server_event_loop.get(),
+      server_config_sha256.size() == 0 ? config_sha256 : server_config_sha256);
+}
+
+void MessageBridgeParameterizedTest::RunPi1Server(
+    chrono::nanoseconds duration) {
+  // Set up a shutdown callback.
+  aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
+      [this]() { pi1_server_event_loop->Exit(); });
+  pi1_server_event_loop->OnRun([this, quit, duration]() {
+    // Stop between timestamps, not exactly on them.
+    quit->Schedule(pi1_server_event_loop->monotonic_now() + duration);
+  });
+
+  pi1_server_event_loop->Run();
+}
+
+void MessageBridgeParameterizedTest::StartPi1Server() {
+  pi1_server_thread =
+      std::make_unique<ThreadedEventLoopRunner>(pi1_server_event_loop.get());
+}
+
+void MessageBridgeParameterizedTest::StopPi1Server() {
+  pi1_server_thread.reset();
+  pi1_message_bridge_server.reset();
+  pi1_server_event_loop.reset();
+}
+
+void MessageBridgeParameterizedTest::MakePi1Client() {
+  OnPi1();
+  FLAGS_application_name = "pi1_message_bridge_client";
+  pi1_client_event_loop =
+      std::make_unique<aos::ShmEventLoop>(&config.message());
+  pi1_client_event_loop->SetRuntimeRealtimePriority(1);
+  pi1_message_bridge_client = std::make_unique<MessageBridgeClient>(
+      pi1_client_event_loop.get(), config_sha256);
+}
+
+void MessageBridgeParameterizedTest::StartPi1Client() {
+  pi1_client_thread =
+      std::make_unique<ThreadedEventLoopRunner>(pi1_client_event_loop.get());
+}
+
+void MessageBridgeParameterizedTest::StopPi1Client() {
+  pi1_client_thread.reset();
+  pi1_message_bridge_client.reset();
+  pi1_client_event_loop.reset();
+}
+
+void MessageBridgeParameterizedTest::MakePi1Test() {
+  OnPi1();
+  FLAGS_application_name = "test1";
+  pi1_test_event_loop = std::make_unique<aos::ShmEventLoop>(&config.message());
+
+  pi1_test_event_loop->MakeWatcher(
+      "/pi1/aos", [](const ServerStatistics &stats) {
+        VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi1_test_event_loop->MakeWatcher(
+      "/pi1/aos", [](const ClientStatistics &stats) {
+        VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi1_test_event_loop->MakeWatcher("/pi1/aos", [](const Timestamp &timestamp) {
+    VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
+  });
+  pi1_test_event_loop->MakeWatcher("/pi2/aos", [this](
+                                                   const Timestamp &timestamp) {
+    VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
+    EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid, pi2_boot_uuid_);
+  });
+}
+
+void MessageBridgeParameterizedTest::StartPi1Test() {
+  pi1_test_thread =
+      std::make_unique<ThreadedEventLoopRunner>(pi1_test_event_loop.get());
+}
+
+void MessageBridgeParameterizedTest::StopPi1Test() { pi1_test_thread.reset(); }
+
+void MessageBridgeParameterizedTest::MakePi2Server() {
+  OnPi2();
+  FLAGS_application_name = "pi2_message_bridge_server";
+  pi2_server_event_loop =
+      std::make_unique<aos::ShmEventLoop>(&config.message());
+  pi2_server_event_loop->SetRuntimeRealtimePriority(1);
+  pi2_message_bridge_server = std::make_unique<MessageBridgeServer>(
+      pi2_server_event_loop.get(), config_sha256);
+}
+
+void MessageBridgeParameterizedTest::RunPi2Server(
+    chrono::nanoseconds duration) {
+  // Schedule a shutdown callback.
+  aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
+      [this]() { pi2_server_event_loop->Exit(); });
+  pi2_server_event_loop->OnRun([this, quit, duration]() {
+    // Stop between timestamps, not exactly on them.
+    quit->Schedule(pi2_server_event_loop->monotonic_now() + duration);
+  });
+
+  pi2_server_event_loop->Run();
+}
+
+void MessageBridgeParameterizedTest::StartPi2Server() {
+  pi2_server_thread =
+      std::make_unique<ThreadedEventLoopRunner>(pi2_server_event_loop.get());
+}
+
+void MessageBridgeParameterizedTest::StopPi2Server() {
+  pi2_server_thread.reset();
+  pi2_message_bridge_server.reset();
+  pi2_server_event_loop.reset();
+}
+
+void MessageBridgeParameterizedTest::MakePi2Client() {
+  OnPi2();
+  FLAGS_application_name = "pi2_message_bridge_client";
+  pi2_client_event_loop =
+      std::make_unique<aos::ShmEventLoop>(&config.message());
+  pi2_client_event_loop->SetRuntimeRealtimePriority(1);
+  pi2_message_bridge_client = std::make_unique<MessageBridgeClient>(
+      pi2_client_event_loop.get(), config_sha256);
+}
+
+void MessageBridgeParameterizedTest::RunPi2Client(
+    chrono::nanoseconds duration) {
+  // Run for 5 seconds to make sure we have time to estimate the offset.
+  aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
+      [this]() { pi2_client_event_loop->Exit(); });
+  pi2_client_event_loop->OnRun([this, quit, duration]() {
+    // Stop between timestamps, not exactly on them.
+    quit->Schedule(pi2_client_event_loop->monotonic_now() + duration);
+  });
+
+  // And go!
+  pi2_client_event_loop->Run();
+}
+
+void MessageBridgeParameterizedTest::StartPi2Client() {
+  pi2_client_thread =
+      std::make_unique<ThreadedEventLoopRunner>(pi2_client_event_loop.get());
+}
+
+void MessageBridgeParameterizedTest::StopPi2Client() {
+  pi2_client_thread.reset();
+  pi2_message_bridge_client.reset();
+  pi2_client_event_loop.reset();
+}
+
+void MessageBridgeParameterizedTest::MakePi2Test() {
+  OnPi2();
+  FLAGS_application_name = "test2";
+  pi2_test_event_loop = std::make_unique<aos::ShmEventLoop>(&config.message());
+
+  pi2_test_event_loop->MakeWatcher(
+      "/pi2/aos", [](const ServerStatistics &stats) {
+        VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi2_test_event_loop->MakeWatcher(
+      "/pi2/aos", [](const ClientStatistics &stats) {
+        VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
+      });
+
+  pi2_test_event_loop->MakeWatcher("/pi1/aos", [this](
+                                                   const Timestamp &timestamp) {
+    VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(&timestamp);
+    EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid, pi1_boot_uuid_);
+  });
+  pi2_test_event_loop->MakeWatcher("/pi2/aos", [](const Timestamp &timestamp) {
+    VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(&timestamp);
+  });
+}
+
+void MessageBridgeParameterizedTest::StartPi2Test() {
+  pi2_test_thread =
+      std::make_unique<ThreadedEventLoopRunner>(pi2_test_event_loop.get());
+}
+
+void MessageBridgeParameterizedTest::StopPi2Test() { pi2_test_thread.reset(); }
+}  // namespace message_bridge::testing
+}  // namespace aos
diff --git a/aos/network/message_bridge_test_lib.h b/aos/network/message_bridge_test_lib.h
new file mode 100644
index 0000000..047cff5
--- /dev/null
+++ b/aos/network/message_bridge_test_lib.h
@@ -0,0 +1,137 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_TEST_LIB_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_TEST_LIB_H_
+#include <chrono>
+#include <thread>
+
+#include "absl/strings/str_cat.h"
+#include "gtest/gtest.h"
+
+#include "aos/events/ping_generated.h"
+#include "aos/events/pong_generated.h"
+#include "aos/ipc_lib/event.h"
+#include "aos/network/message_bridge_client_lib.h"
+#include "aos/network/message_bridge_protocol.h"
+#include "aos/network/message_bridge_server_lib.h"
+#include "aos/network/team_number.h"
+#include "aos/sha256.h"
+#include "aos/testing/path.h"
+#include "aos/util/file.h"
+namespace aos::message_bridge::testing {
+
+namespace chrono = std::chrono;
+
+// Class to manage starting and stopping a thread with an event loop in it.  The
+// thread is guarenteed to be running before the constructor exits.
+class ThreadedEventLoopRunner {
+ public:
+  ThreadedEventLoopRunner(aos::ShmEventLoop *event_loop);
+
+  ~ThreadedEventLoopRunner();
+
+  void Exit();
+
+ private:
+  aos::Event event_;
+  aos::ShmEventLoop *event_loop_;
+  std::thread my_thread_;
+};
+
+// Parameters to run all the tests with.
+struct Param {
+  // The config file to use.
+  std::string config;
+  // If true, the RemoteMessage channel should be shared between all the remote
+  // channels.  If false, there will be 1 RemoteMessage channel per remote
+  // channel.
+  bool shared;
+};
+
+class MessageBridgeParameterizedTest
+    : public ::testing::TestWithParam<struct Param> {
+ protected:
+  MessageBridgeParameterizedTest();
+
+  bool shared() const;
+
+  // OnPi* sets the global state necessary to pretend that a ShmEventLoop is on
+  // the requisite system.
+  void OnPi1();
+
+  void OnPi2();
+
+  void MakePi1Server(std::string server_config_sha256 = "");
+
+  void RunPi1Server(chrono::nanoseconds duration);
+
+  void StartPi1Server();
+
+  void StopPi1Server();
+
+  void MakePi1Client();
+
+  void StartPi1Client();
+
+  void StopPi1Client();
+
+  void MakePi1Test();
+
+  void StartPi1Test();
+
+  void StopPi1Test();
+
+  void MakePi2Server();
+
+  void RunPi2Server(chrono::nanoseconds duration);
+
+  void StartPi2Server();
+
+  void StopPi2Server();
+
+  void MakePi2Client();
+
+  void RunPi2Client(chrono::nanoseconds duration);
+
+  void StartPi2Client();
+
+  void StopPi2Client();
+
+  void MakePi2Test();
+
+  void StartPi2Test();
+
+  void StopPi2Test();
+
+  gflags::FlagSaver flag_saver_;
+
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config;
+  std::string config_sha256;
+
+  const UUID pi1_boot_uuid_;
+  const UUID pi2_boot_uuid_;
+
+  std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
+  std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
+  std::unique_ptr<ThreadedEventLoopRunner> pi1_server_thread;
+
+  std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
+  std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
+  std::unique_ptr<ThreadedEventLoopRunner> pi1_client_thread;
+
+  std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
+  std::unique_ptr<ThreadedEventLoopRunner> pi1_test_thread;
+
+  std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
+  std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
+  std::unique_ptr<ThreadedEventLoopRunner> pi2_server_thread;
+
+  std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
+  std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
+  std::unique_ptr<ThreadedEventLoopRunner> pi2_client_thread;
+
+  std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
+  std::unique_ptr<ThreadedEventLoopRunner> pi2_test_thread;
+};
+
+}  // namespace aos::message_bridge::testing
+
+#endif  // AOS_NETWORK_MESSAGE_BRIDGE_TEST_LIB_H_