Add retry logic to message bridge server
This makes it so that message bridge will attempt to retry failed sends
of any reliable connections. This helps to improve our guarantees around
reliable message channels. This is particularly relevant during startup
when there may be large bursts of reliable messages being sent.
Wrote a manual test that triggers the retry logic on my laptop. I have
not attempted to run the test on any other platforms, but am operating
on the assumption that it'll be absurdly flaky and so should only be run
by engineers wanting to exercise this logic themselves.
Change-Id: Ic4bdfe4799b902883e1626542dff77fcbf3fb913
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 991d618..4eb5e8b 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -10,323 +10,21 @@
#include "aos/network/message_bridge_client_lib.h"
#include "aos/network/message_bridge_protocol.h"
#include "aos/network/message_bridge_server_lib.h"
+#include "aos/network/message_bridge_test_lib.h"
#include "aos/network/team_number.h"
#include "aos/sha256.h"
#include "aos/testing/path.h"
#include "aos/util/file.h"
-DECLARE_string(boot_uuid);
-
namespace aos {
-void SetShmBase(const std::string_view base);
namespace message_bridge {
namespace testing {
-using aos::testing::ArtifactPath;
-
-namespace chrono = std::chrono;
-
-std::string ShmBase(const std::string_view node) {
- const char *tmpdir_c_str = getenv("TEST_TMPDIR");
- if (tmpdir_c_str != nullptr) {
- return absl::StrCat(tmpdir_c_str, "/", node);
- } else {
- return absl::StrCat("/dev/shm/", node);
- }
-}
-
-void DoSetShmBase(const std::string_view node) {
- aos::SetShmBase(ShmBase(node));
-}
-
-// Class to manage starting and stopping a thread with an event loop in it. The
-// thread is guarenteed to be running before the constructor exits.
-class ThreadedEventLoopRunner {
- public:
- ThreadedEventLoopRunner(aos::ShmEventLoop *event_loop)
- : event_loop_(event_loop), my_thread_([this]() {
- LOG(INFO) << "Started " << event_loop_->name();
- event_loop_->OnRun([this]() { event_.Set(); });
- event_loop_->Run();
- }) {
- event_.Wait();
- }
-
- ~ThreadedEventLoopRunner() { Exit(); }
-
- void Exit() {
- if (my_thread_.joinable()) {
- event_loop_->Exit();
- my_thread_.join();
- my_thread_ = std::thread();
- }
- }
-
- private:
- aos::Event event_;
- aos::ShmEventLoop *event_loop_;
- std::thread my_thread_;
-};
-
-// Parameters to run all the tests with.
-struct Param {
- // The config file to use.
- std::string config;
- // If true, the RemoteMessage channel should be shared between all the remote
- // channels. If false, there will be 1 RemoteMessage channel per remote
- // channel.
- bool shared;
-};
-
-class MessageBridgeParameterizedTest
- : public ::testing::TestWithParam<struct Param> {
- public:
- MessageBridgeParameterizedTest()
- : config(aos::configuration::ReadConfig(
- ArtifactPath(absl::StrCat("aos/network/", GetParam().config)))),
- config_sha256(Sha256(config.span())),
- pi1_boot_uuid_(UUID::Random()),
- pi2_boot_uuid_(UUID::Random()) {
- util::UnlinkRecursive(ShmBase("pi1"));
- util::UnlinkRecursive(ShmBase("pi2"));
- }
-
- bool shared() const { return GetParam().shared; }
-
- void OnPi1() {
- DoSetShmBase("pi1");
- FLAGS_override_hostname = "raspberrypi";
- FLAGS_boot_uuid = pi1_boot_uuid_.ToString();
- }
-
- void OnPi2() {
- DoSetShmBase("pi2");
- FLAGS_override_hostname = "raspberrypi2";
- FLAGS_boot_uuid = pi2_boot_uuid_.ToString();
- }
-
- void MakePi1Server(std::string server_config_sha256 = "") {
- OnPi1();
- FLAGS_application_name = "pi1_message_bridge_server";
- pi1_server_event_loop =
- std::make_unique<aos::ShmEventLoop>(&config.message());
- pi1_server_event_loop->SetRuntimeRealtimePriority(1);
- pi1_message_bridge_server = std::make_unique<MessageBridgeServer>(
- pi1_server_event_loop.get(), server_config_sha256.size() == 0
- ? config_sha256
- : server_config_sha256);
- }
-
- void RunPi1Server(chrono::nanoseconds duration) {
- // Set up a shutdown callback.
- aos::TimerHandler *const quit = pi1_server_event_loop->AddTimer(
- [this]() { pi1_server_event_loop->Exit(); });
- pi1_server_event_loop->OnRun([this, quit, duration]() {
- // Stop between timestamps, not exactly on them.
- quit->Schedule(pi1_server_event_loop->monotonic_now() + duration);
- });
-
- pi1_server_event_loop->Run();
- }
-
- void StartPi1Server() {
- pi1_server_thread =
- std::make_unique<ThreadedEventLoopRunner>(pi1_server_event_loop.get());
- }
-
- void StopPi1Server() {
- pi1_server_thread.reset();
- pi1_message_bridge_server.reset();
- pi1_server_event_loop.reset();
- }
-
- void MakePi1Client() {
- OnPi1();
- FLAGS_application_name = "pi1_message_bridge_client";
- pi1_client_event_loop =
- std::make_unique<aos::ShmEventLoop>(&config.message());
- pi1_client_event_loop->SetRuntimeRealtimePriority(1);
- pi1_message_bridge_client = std::make_unique<MessageBridgeClient>(
- pi1_client_event_loop.get(), config_sha256);
- }
-
- void StartPi1Client() {
- pi1_client_thread =
- std::make_unique<ThreadedEventLoopRunner>(pi1_client_event_loop.get());
- }
-
- void StopPi1Client() {
- pi1_client_thread.reset();
- pi1_message_bridge_client.reset();
- pi1_client_event_loop.reset();
- }
-
- void MakePi1Test() {
- OnPi1();
- FLAGS_application_name = "test1";
- pi1_test_event_loop =
- std::make_unique<aos::ShmEventLoop>(&config.message());
-
- pi1_test_event_loop->MakeWatcher(
- "/pi1/aos", [](const ServerStatistics &stats) {
- VLOG(1) << "/pi1/aos ServerStatistics " << FlatbufferToJson(&stats);
- });
-
- pi1_test_event_loop->MakeWatcher(
- "/pi1/aos", [](const ClientStatistics &stats) {
- VLOG(1) << "/pi1/aos ClientStatistics " << FlatbufferToJson(&stats);
- });
-
- pi1_test_event_loop->MakeWatcher(
- "/pi1/aos", [](const Timestamp ×tamp) {
- VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(×tamp);
- });
- pi1_test_event_loop->MakeWatcher(
- "/pi2/aos", [this](const Timestamp ×tamp) {
- VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(×tamp);
- EXPECT_EQ(pi1_test_event_loop->context().source_boot_uuid,
- pi2_boot_uuid_);
- });
- }
-
- void StartPi1Test() {
- pi1_test_thread =
- std::make_unique<ThreadedEventLoopRunner>(pi1_test_event_loop.get());
- }
-
- void StopPi1Test() { pi1_test_thread.reset(); }
-
- void MakePi2Server() {
- OnPi2();
- FLAGS_application_name = "pi2_message_bridge_server";
- pi2_server_event_loop =
- std::make_unique<aos::ShmEventLoop>(&config.message());
- pi2_server_event_loop->SetRuntimeRealtimePriority(1);
- pi2_message_bridge_server = std::make_unique<MessageBridgeServer>(
- pi2_server_event_loop.get(), config_sha256);
- }
-
- void RunPi2Server(chrono::nanoseconds duration) {
- // Set up a shutdown callback.
- aos::TimerHandler *const quit = pi2_server_event_loop->AddTimer(
- [this]() { pi2_server_event_loop->Exit(); });
- pi2_server_event_loop->OnRun([this, quit, duration]() {
- // Stop between timestamps, not exactly on them.
- quit->Schedule(pi2_server_event_loop->monotonic_now() + duration);
- });
-
- pi2_server_event_loop->Run();
- }
-
- void StartPi2Server() {
- pi2_server_thread =
- std::make_unique<ThreadedEventLoopRunner>(pi2_server_event_loop.get());
- }
-
- void StopPi2Server() {
- pi2_server_thread.reset();
- pi2_message_bridge_server.reset();
- pi2_server_event_loop.reset();
- }
-
- void MakePi2Client() {
- OnPi2();
- FLAGS_application_name = "pi2_message_bridge_client";
- pi2_client_event_loop =
- std::make_unique<aos::ShmEventLoop>(&config.message());
- pi2_client_event_loop->SetRuntimeRealtimePriority(1);
- pi2_message_bridge_client = std::make_unique<MessageBridgeClient>(
- pi2_client_event_loop.get(), config_sha256);
- }
-
- void RunPi2Client(chrono::nanoseconds duration) {
- // Run for 5 seconds to make sure we have time to estimate the offset.
- aos::TimerHandler *const quit = pi2_client_event_loop->AddTimer(
- [this]() { pi2_client_event_loop->Exit(); });
- pi2_client_event_loop->OnRun([this, quit, duration]() {
- // Stop between timestamps, not exactly on them.
- quit->Schedule(pi2_client_event_loop->monotonic_now() + duration);
- });
-
- // And go!
- pi2_client_event_loop->Run();
- }
-
- void StartPi2Client() {
- pi2_client_thread =
- std::make_unique<ThreadedEventLoopRunner>(pi2_client_event_loop.get());
- }
-
- void StopPi2Client() {
- pi2_client_thread.reset();
- pi2_message_bridge_client.reset();
- pi2_client_event_loop.reset();
- }
-
- void MakePi2Test() {
- OnPi2();
- FLAGS_application_name = "test2";
- pi2_test_event_loop =
- std::make_unique<aos::ShmEventLoop>(&config.message());
-
- pi2_test_event_loop->MakeWatcher(
- "/pi2/aos", [](const ServerStatistics &stats) {
- VLOG(1) << "/pi2/aos ServerStatistics " << FlatbufferToJson(&stats);
- });
-
- pi2_test_event_loop->MakeWatcher(
- "/pi2/aos", [](const ClientStatistics &stats) {
- VLOG(1) << "/pi2/aos ClientStatistics " << FlatbufferToJson(&stats);
- });
-
- pi2_test_event_loop->MakeWatcher(
- "/pi1/aos", [this](const Timestamp ×tamp) {
- VLOG(1) << "/pi1/aos Timestamp " << FlatbufferToJson(×tamp);
- EXPECT_EQ(pi2_test_event_loop->context().source_boot_uuid,
- pi1_boot_uuid_);
- });
- pi2_test_event_loop->MakeWatcher(
- "/pi2/aos", [](const Timestamp ×tamp) {
- VLOG(1) << "/pi2/aos Timestamp " << FlatbufferToJson(×tamp);
- });
- }
-
- void StartPi2Test() {
- pi2_test_thread =
- std::make_unique<ThreadedEventLoopRunner>(pi2_test_event_loop.get());
- }
-
- void StopPi2Test() { pi2_test_thread.reset(); }
-
- aos::FlatbufferDetachedBuffer<aos::Configuration> config;
- std::string config_sha256;
-
- const UUID pi1_boot_uuid_;
- const UUID pi2_boot_uuid_;
-
- std::unique_ptr<aos::ShmEventLoop> pi1_server_event_loop;
- std::unique_ptr<MessageBridgeServer> pi1_message_bridge_server;
- std::unique_ptr<ThreadedEventLoopRunner> pi1_server_thread;
-
- std::unique_ptr<aos::ShmEventLoop> pi1_client_event_loop;
- std::unique_ptr<MessageBridgeClient> pi1_message_bridge_client;
- std::unique_ptr<ThreadedEventLoopRunner> pi1_client_thread;
-
- std::unique_ptr<aos::ShmEventLoop> pi1_test_event_loop;
- std::unique_ptr<ThreadedEventLoopRunner> pi1_test_thread;
-
- std::unique_ptr<aos::ShmEventLoop> pi2_server_event_loop;
- std::unique_ptr<MessageBridgeServer> pi2_message_bridge_server;
- std::unique_ptr<ThreadedEventLoopRunner> pi2_server_thread;
-
- std::unique_ptr<aos::ShmEventLoop> pi2_client_event_loop;
- std::unique_ptr<MessageBridgeClient> pi2_message_bridge_client;
- std::unique_ptr<ThreadedEventLoopRunner> pi2_client_thread;
-
- std::unique_ptr<aos::ShmEventLoop> pi2_test_event_loop;
- std::unique_ptr<ThreadedEventLoopRunner> pi2_test_thread;
-};
+// Note: All of these tests spin up ShmEventLoop's in separate threads to allow
+// us to run the "real" message bridge. This requires extra threading and timing
+// coordination to make happen, which is the reason for some of the extra
+// complexity in these tests.
// Test that we can send a ping message over sctp and receive it.
TEST_P(MessageBridgeParameterizedTest, PingPong) {
@@ -913,7 +611,7 @@
StopPi2Client();
}
- // Shut everyone else down
+ // Shut everyone else down.
StopPi1Server();
StopPi1Client();
StopPi2Server();
@@ -1073,7 +771,7 @@
StopPi2Server();
}
- // Shut everyone else down
+ // Shut everyone else down.
StopPi1Server();
StopPi1Client();
StopPi2Client();
@@ -1129,6 +827,8 @@
const size_t ping_channel_index = configuration::ChannelIndex(
receive_event_loop.configuration(), ping_fetcher.channel());
+ // ping_timestamp_count is accessed from multiple threads (the Watcher that
+ // triggers it is in a separate thread), so make it atomic.
std::atomic<int> ping_timestamp_count{0};
const std::string channel_name =
shared() ? "/pi1/aos/remote_timestamps/pi2"
@@ -1150,7 +850,7 @@
EXPECT_FALSE(ping_fetcher.Fetch());
EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
- // Spin up the persistant pieces.
+ // Spin up the persistent pieces.
StartPi1Server();
StartPi1Client();
StartPi2Server();
@@ -1161,7 +861,7 @@
&pi1_remote_timestamp_event_loop);
{
- // Now, spin up a client for 2 seconds.
+ // Now spin up a client for 2 seconds.
MakePi2Client();
RunPi2Client(chrono::milliseconds(2050));
@@ -1210,7 +910,7 @@
StopPi2Client();
}
- // Shut everyone else down
+ // Shut everyone else down.
StopPi1Client();
StopPi2Server();
pi1_remote_timestamp_thread.reset();
@@ -1258,6 +958,8 @@
const size_t ping_channel_index = configuration::ChannelIndex(
receive_event_loop.configuration(), ping_fetcher.channel());
+ // ping_timestamp_count is accessed from multiple threads (the Watcher that
+ // triggers it is in a separate thread), so make it atomic.
std::atomic<int> ping_timestamp_count{0};
const std::string channel_name =
shared() ? "/pi1/aos/remote_timestamps/pi2"
@@ -1279,7 +981,7 @@
EXPECT_FALSE(ping_fetcher.Fetch());
EXPECT_FALSE(unreliable_ping_fetcher.Fetch());
- // Spin up the persistant pieces.
+ // Spin up the persistent pieces.
StartPi1Client();
StartPi2Server();
StartPi2Client();
@@ -1339,13 +1041,144 @@
StopPi1Server();
}
- // Shut everyone else down
+ // Shut everyone else down.
StopPi1Client();
StopPi2Server();
StopPi2Client();
pi1_remote_timestamp_thread.reset();
}
+// Tests that when multiple reliable messages are sent during a time when the
+// client is restarting that only the final of those messages makes it to the
+// client. This ensures that we handle a disconnecting & reconnecting client
+// correctly in the server reliable connection retry logic.
+TEST_P(MessageBridgeParameterizedTest, ReliableSentDuringClientReboot) {
+ OnPi1();
+
+ FLAGS_application_name = "sender";
+ aos::ShmEventLoop send_event_loop(&config.message());
+ aos::Sender<examples::Ping> ping_sender =
+ send_event_loop.MakeSender<examples::Ping>("/test");
+ size_t ping_index = 0;
+ SendPing(&ping_sender, ++ping_index);
+
+ MakePi1Server();
+ MakePi1Client();
+
+ FLAGS_application_name = "pi1_timestamp";
+ aos::ShmEventLoop pi1_remote_timestamp_event_loop(&config.message());
+
+ // Now do it for "raspberrypi2", the client.
+ OnPi2();
+
+ MakePi2Server();
+
+ aos::ShmEventLoop receive_event_loop(&config.message());
+ aos::Fetcher<examples::Ping> ping_fetcher =
+ receive_event_loop.MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<ClientStatistics> pi2_client_statistics_fetcher =
+ receive_event_loop.MakeFetcher<ClientStatistics>("/pi2/aos");
+
+ const size_t ping_channel_index = configuration::ChannelIndex(
+ receive_event_loop.configuration(), ping_fetcher.channel());
+
+ // ping_timestamp_count is accessed from multiple threads (the Watcher that
+ // triggers it is in a separate thread), so make it atomic.
+ std::atomic<int> ping_timestamp_count{0};
+ const std::string channel_name =
+ shared() ? "/pi1/aos/remote_timestamps/pi2"
+ : "/pi1/aos/remote_timestamps/pi2/test/aos-examples-Ping";
+ pi1_remote_timestamp_event_loop.MakeWatcher(
+ channel_name, [this, channel_name, ping_channel_index,
+ &ping_timestamp_count](const RemoteMessage &header) {
+ VLOG(1) << channel_name << " RemoteMessage "
+ << aos::FlatbufferToJson(&header);
+ EXPECT_TRUE(header.has_boot_uuid());
+ if (shared() && header.channel_index() != ping_channel_index) {
+ return;
+ }
+ CHECK_EQ(header.channel_index(), ping_channel_index);
+ ++ping_timestamp_count;
+ });
+
+ // Before everything starts up, confirm there is no message.
+ EXPECT_FALSE(ping_fetcher.Fetch());
+
+ // Spin up the persistent pieces.
+ StartPi1Server();
+ StartPi1Client();
+ StartPi2Server();
+
+ // Event used to wait for the timestamp counting thread to start.
+ std::unique_ptr<ThreadedEventLoopRunner> pi1_remote_timestamp_thread =
+ std::make_unique<ThreadedEventLoopRunner>(
+ &pi1_remote_timestamp_event_loop);
+
+ {
+ // Now, spin up a client for 2 seconds.
+ MakePi2Client();
+
+ RunPi2Client(chrono::milliseconds(2050));
+
+ // Confirm there is no detected duplicate packet.
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->duplicate_packets(),
+ 0u);
+
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->partial_deliveries(),
+ 0u);
+
+ EXPECT_TRUE(ping_fetcher.Fetch());
+ EXPECT_EQ(ping_timestamp_count, 1);
+
+ StopPi2Client();
+ }
+
+ // Send some reliable messages while the client is dead. Only the final one
+ // should make it through.
+ while (ping_index < 10) {
+ SendPing(&ping_sender, ++ping_index);
+ }
+
+ {
+ // Now, spin up a client for 2 seconds.
+ MakePi2Client();
+
+ RunPi2Client(chrono::milliseconds(5050));
+
+ // No duplicate packets should have appeared.
+ EXPECT_TRUE(pi2_client_statistics_fetcher.Fetch());
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->duplicate_packets(),
+ 0u);
+
+ EXPECT_EQ(pi2_client_statistics_fetcher->connections()
+ ->Get(0)
+ ->partial_deliveries(),
+ 0u);
+
+ EXPECT_EQ(ping_timestamp_count, 2);
+ // We should have gotten precisely one more ping message--the latest one
+ // sent should've made it, but no previous ones.
+ EXPECT_TRUE(ping_fetcher.FetchNext());
+ EXPECT_EQ(ping_index, ping_fetcher->value());
+ EXPECT_FALSE(ping_fetcher.FetchNext());
+
+ StopPi2Client();
+ }
+
+ // Shut everyone else down.
+ StopPi1Client();
+ StopPi2Server();
+ pi1_remote_timestamp_thread.reset();
+ StopPi1Server();
+}
+
// Test that differing config sha256's result in no connection.
TEST_P(MessageBridgeParameterizedTest, MismatchedSha256) {
// This is rather annoying to set up. We need to start up a client and
@@ -1441,7 +1274,7 @@
StopPi2Client();
}
- // Shut everyone else down
+ // Shut everyone else down.
StopPi1Server();
StopPi1Client();
StopPi2Server();
@@ -1590,7 +1423,7 @@
StopPi2Client();
}
- // Shut everyone else down
+ // Shut everyone else down.
StopPi1Server();
StopPi1Client();
StopPi2Server();