Handle node reboots
Reset all the filter states when the node disappears. We don't handle
that very well, and don't need to.
Change-Id: Ia92b42f38a030a945120b6c404acae76abe4a0af
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 6b85830..c12c675 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -258,6 +258,7 @@
":message_bridge_test_client_config.json",
":message_bridge_test_server_config.json",
],
+ shard_count = 3,
deps = [
":message_bridge_client_lib",
":message_bridge_server_lib",
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
index efb3dd0..3963485 100644
--- a/aos/network/message_bridge_client_lib.cc
+++ b/aos/network/message_bridge_client_lib.cc
@@ -230,6 +230,7 @@
remote_assoc_id_ = assoc_id;
connection_->mutate_state(State::CONNECTED);
+ filter_.Reset();
}
void SctpClientConnection::NodeDisconnected() {
@@ -238,6 +239,8 @@
chrono::milliseconds(100));
remote_assoc_id_ = 0;
connection_->mutate_state(State::DISCONNECTED);
+ connection_->mutate_monotonic_offset(0);
+ filter_.Reset();
}
void SctpClientConnection::HandleData(const Message *message) {
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
index 3607449..1b8d36f 100644
--- a/aos/network/message_bridge_server_lib.cc
+++ b/aos/network/message_bridge_server_lib.cc
@@ -177,14 +177,14 @@
// time out eventually. Need to sort that out.
}
-void ChannelState::AddPeer(const Connection *connection,
- ServerConnection *server_connection_statistics,
- bool logged_remotely) {
- peers_.emplace_back(0, 0, connection, server_connection_statistics,
+void ChannelState::AddPeer(
+ const Connection *connection, int node_index,
+ ServerConnection *server_connection_statistics, bool logged_remotely) {
+ peers_.emplace_back(connection, node_index, server_connection_statistics,
logged_remotely);
}
-void ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
+int ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
for (ChannelState::Peer &peer : peers_) {
if (peer.sac_assoc_id == assoc_id) {
// TODO(austin): This will not handle multiple clients from
@@ -192,12 +192,13 @@
peer.server_connection_statistics->mutate_state(State::DISCONNECTED);
peer.sac_assoc_id = 0;
peer.stream = 0;
- break;
+ return peer.node_index;
}
}
+ return -1;
}
-void ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
+int ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
int stream, SctpServer *server) {
for (ChannelState::Peer &peer : peers_) {
if (peer.connection->name()->string_view() == node->name()->string_view()) {
@@ -205,10 +206,10 @@
peer.stream = stream;
peer.server_connection_statistics->mutate_state(State::CONNECTED);
server->SetStreamPriority(assoc_id, stream, peer.connection->priority());
-
- break;
+ return peer.node_index;
}
}
+ return -1;
}
MessageBridgeServer::MessageBridgeServer(aos::ShmEventLoop *event_loop)
@@ -301,6 +302,8 @@
event_loop_->configuration(), connection->name()->string_view());
state->AddPeer(
connection,
+ configuration::GetNodeIndex(event_loop_->configuration(),
+ connection->name()->string_view()),
FindServerConnection(statistics_.mutable_message(),
connection->name()->string_view()),
configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
@@ -344,13 +347,30 @@
void MessageBridgeServer::NodeDisconnected(sctp_assoc_t assoc_id) {
// Find any matching peers and remove them.
+ int node_index = -1;
for (std::unique_ptr<ChannelState> &channel_state : channels_) {
if (channel_state.get() == nullptr) {
continue;
}
- channel_state->NodeDisconnected(assoc_id);
+ node_index = channel_state->NodeDisconnected(assoc_id);
+ CHECK_NE(node_index, -1);
}
+
+ if (node_index != -1) {
+ VLOG(1) << "Resetting filters for " << node_index << " "
+ << event_loop_->configuration()
+ ->nodes()
+ ->Get(node_index)
+ ->name()
+ ->string_view();
+ ResetFilter(node_index);
+ }
+}
+
+void MessageBridgeServer::ResetFilter(int node_index) {
+ filters_[node_index].Reset();
+ server_connection_[node_index]->mutate_monotonic_offset(0);
}
void MessageBridgeServer::MessageReceived() {
@@ -399,6 +419,7 @@
// Account for the control channel and delivery times channel.
size_t channel_index = kControlStreams();
+ int node_index = -1;
for (const Channel *channel : *connect->channels_to_transfer()) {
bool matched = false;
for (std::unique_ptr<ChannelState> &channel_state : channels_) {
@@ -406,9 +427,10 @@
continue;
}
if (channel_state->Matches(channel)) {
- channel_state->NodeConnected(connect->node(),
- message->header.rcvinfo.rcv_assoc_id,
- channel_index, &server_);
+ node_index = channel_state->NodeConnected(
+ connect->node(), message->header.rcvinfo.rcv_assoc_id,
+ channel_index, &server_);
+ CHECK_NE(node_index, -1);
matched = true;
break;
@@ -421,6 +443,13 @@
++channel_index;
}
}
+ ResetFilter(node_index);
+ VLOG(1) << "Resetting filters for " << node_index << " "
+ << event_loop_->configuration()
+ ->nodes()
+ ->Get(node_index)
+ ->name()
+ ->string_view();
} else if (message->header.rcvinfo.rcv_sid == kTimestampStream()) {
// Message delivery
const logger::MessageHeader *message_header =
@@ -507,15 +536,23 @@
// Iterate through the connections this node has made.
for (const ClientConnection *connection :
*client_statistics_fetcher_->connections()) {
- // Filter out the ones which aren't connected.
- if (connection->state() != State::CONNECTED) continue;
- // And the ones without monotonic offsets.
- if (!connection->has_monotonic_offset()) continue;
-
const int node_index = configuration::GetNodeIndex(
event_loop_->configuration(),
connection->node()->name()->string_view());
+ // Filter out the ones which aren't connected.
+ // And the ones without monotonic offsets.
+ if (connection->state() != State::CONNECTED ||
+ !connection->has_monotonic_offset() ||
+ client_statistics_fetcher_.context().monotonic_event_time +
+ kClientStatisticsStaleTimeout <
+ event_loop_->context().monotonic_event_time) {
+ VLOG(1) << "Disconnected, no offset, or client message too old for "
+ << connection->node()->name()->string_view();
+ ResetFilter(node_index);
+ continue;
+ }
+
timestamp_fetchers_[node_index].Fetch();
// Find the offset computed on their node for this client connection
@@ -527,17 +564,28 @@
*timestamp_fetchers_[node_index]->offsets()) {
if (client_offset->node()->name()->string_view() ==
event_loop_->node()->name()->string_view()) {
+ // Make sure it has an offset and the message isn't stale.
if (client_offset->has_monotonic_offset()) {
- their_offset =
- std::chrono::nanoseconds(client_offset->monotonic_offset());
- has_their_offset = true;
+ if (timestamp_fetchers_[node_index]
+ .context()
+ .monotonic_event_time +
+ kTimestampStaleTimeout >
+ event_loop_->context().monotonic_event_time) {
+ their_offset =
+ std::chrono::nanoseconds(client_offset->monotonic_offset());
+ has_their_offset = true;
+ } else {
+ ResetFilter(node_index);
+ VLOG(1) << "Timestamp old, resetting.";
+ }
}
break;
}
}
}
- if (has_their_offset) {
+ if (has_their_offset &&
+ server_connection_[node_index]->state() == State::CONNECTED) {
// Update the filters.
if (filters_[node_index].MissingSamples()) {
// Update the offset the first time. This should be representative.
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
index d271c8c..8e8ec64 100644
--- a/aos/network/message_bridge_server_lib.h
+++ b/aos/network/message_bridge_server_lib.h
@@ -22,42 +22,42 @@
// Class to encapsulate all the state per channel. This is the dispatcher for a
// new message from the event loop.
class ChannelState {
- public:
- ChannelState(const Channel *channel, int channel_index)
- : channel_index_(channel_index), channel_(channel) {}
+ public:
+ ChannelState(const Channel *channel, int channel_index)
+ : channel_index_(channel_index), channel_(channel) {}
- // Class to encapsulate all the state per client on a channel. A client may
- // be subscribed to multiple channels.
- struct Peer {
- Peer(sctp_assoc_t new_sac_assoc_id, size_t new_stream,
- const Connection *new_connection,
- ServerConnection *new_server_connection_statistics,
- bool new_logged_remotely)
- : sac_assoc_id(new_sac_assoc_id),
- stream(new_stream),
- connection(new_connection),
- server_connection_statistics(new_server_connection_statistics),
- logged_remotely(new_logged_remotely) {}
+ // Class to encapsulate all the state per client on a channel. A client may
+ // be subscribed to multiple channels.
+ struct Peer {
+ Peer(const Connection *new_connection, int new_node_index,
+ ServerConnection *new_server_connection_statistics,
+ bool new_logged_remotely)
+ : connection(new_connection),
+ node_index(new_node_index),
+ server_connection_statistics(new_server_connection_statistics),
+ logged_remotely(new_logged_remotely) {}
- // Valid if != 0.
- sctp_assoc_t sac_assoc_id = 0;
+ // Valid if != 0.
+ sctp_assoc_t sac_assoc_id = 0;
- size_t stream;
- const aos::Connection *connection;
- ServerConnection *server_connection_statistics;
+ size_t stream = 0;
+ const aos::Connection *connection;
+ const int node_index;
+ ServerConnection *server_connection_statistics;
- // If true, this message will be logged on a receiving node. We need to
- // keep it around to log it locally if that fails.
- bool logged_remotely = false;
+ // If true, this message will be logged on a receiving node. We need to
+ // keep it around to log it locally if that fails.
+ bool logged_remotely = false;
};
// Needs to be called when a node (might have) disconnected.
- void NodeDisconnected(sctp_assoc_t assoc_id);
- void NodeConnected(const Node *node, sctp_assoc_t assoc_id, int stream,
- SctpServer *server);
+ // Returns the node index which [dis]connected, or -1 if it didn't match.
+ int NodeDisconnected(sctp_assoc_t assoc_id);
+ int NodeConnected(const Node *node, sctp_assoc_t assoc_id, int stream,
+ SctpServer *server);
// Adds a new peer.
- void AddPeer(const Connection *connection,
+ void AddPeer(const Connection *connection, int node_index,
ServerConnection *server_connection_statistics,
bool logged_remotely);
@@ -94,6 +94,12 @@
~MessageBridgeServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
+ // Time after which we consider the client statistics message stale, and reset
+ // the filter.
+ static constexpr std::chrono::seconds kClientStatisticsStaleTimeout{1};
+ // Time after which we consider the timestamp stale, and reset the filter.
+ static constexpr std::chrono::milliseconds kTimestampStaleTimeout{250};
+
private:
// Reads a message from the socket. Could be a notification.
void MessageReceived();
@@ -103,6 +109,9 @@
// Called when the server connection disconnects.
void NodeDisconnected(sctp_assoc_t assoc_id);
+ // Resets the filter and clears the entry from the server statistics.
+ void ResetFilter(int node_index);
+
// Called when data (either a connection request or delivery timestamps) is
// received.
void HandleData(const Message *message);
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 27bed82..96003f7 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -42,7 +42,7 @@
aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
aos::configuration::ReadConfig(
"aos/network/message_bridge_test_server_config.json");
- aos::FlatbufferDetachedBuffer<aos::Configuration> client_config =
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
aos::configuration::ReadConfig(
"aos/network/message_bridge_test_client_config.json");
@@ -65,20 +65,20 @@
// Now do it for "raspberrypi2", the client.
FLAGS_application_name = "pi2_message_bridge_client";
FLAGS_override_hostname = "raspberrypi2";
- aos::ShmEventLoop pi2_client_event_loop(&client_config.message());
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
FLAGS_application_name = "pi2_message_bridge_server";
- aos::ShmEventLoop pi2_server_event_loop(&client_config.message());
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
// And build the app which sends the pongs.
FLAGS_application_name = "pong";
- aos::ShmEventLoop pong_event_loop(&client_config.message());
+ aos::ShmEventLoop pong_event_loop(&pi2_config.message());
// And build the app for testing.
FLAGS_application_name = "test";
- aos::ShmEventLoop test_event_loop(&client_config.message());
+ aos::ShmEventLoop test_event_loop(&pi2_config.message());
aos::Fetcher<ClientStatistics> client_statistics_fetcher =
test_event_loop.MakeFetcher<ClientStatistics>("/aos");
@@ -156,24 +156,24 @@
});
int pi1_client_statistics_count = 0;
- ping_event_loop.MakeWatcher(
- "/pi1/aos", [&pi1_client_statistics_count](const ClientStatistics &stats) {
- LOG(INFO) << FlatbufferToJson(&stats);
+ ping_event_loop.MakeWatcher("/pi1/aos", [&pi1_client_statistics_count](
+ const ClientStatistics &stats) {
+ LOG(INFO) << FlatbufferToJson(&stats);
- for (const ClientConnection *connection : *stats.connections()) {
- if (connection->has_monotonic_offset()) {
- ++pi1_client_statistics_count;
- // It takes at least 10 microseconds to send a message between the
- // client and server. The min (filtered) time shouldn't be over 10
- // milliseconds on localhost. This might have to bump up if this is
- // proving flaky.
- EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
- chrono::milliseconds(10));
- EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
- chrono::microseconds(10));
- }
- }
- });
+ for (const ClientConnection *connection : *stats.connections()) {
+ if (connection->has_monotonic_offset()) {
+ ++pi1_client_statistics_count;
+ // It takes at least 10 microseconds to send a message between the
+ // client and server. The min (filtered) time shouldn't be over 10
+ // milliseconds on localhost. This might have to bump up if this is
+ // proving flaky.
+ EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(10));
+ EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::microseconds(10));
+ }
+ }
+ });
int pi2_client_statistics_count = 0;
pong_event_loop.MakeWatcher("/pi2/aos", [&pi2_client_statistics_count](
@@ -263,11 +263,430 @@
EXPECT_GE(pi2_server_statistics_count, 2);
EXPECT_GE(pi1_client_statistics_count, 2);
EXPECT_GE(pi2_client_statistics_count, 2);
-
- // TODO(austin): Need 2 servers going so we can do the round trip offset
- // estimation.
}
+// Test that the client disconnecting triggers the server offsets on both sides
+// to clear.
+TEST(MessageBridgeTest, ClientRestart) {
+ // This is rather annoying to set up. We need to start up a client and
+ // server, on the same node, but get them to think that they are on different
+ // nodes.
+ //
+ // We need the client to not post directly to "/test" like it would in a
+ // real system, otherwise we will re-send the ping message... So, use an
+ // application specific map to have the client post somewhere else.
+ //
+ // To top this all off, each of these needs to be done with a ShmEventLoop,
+ // which needs to run in a separate thread... And it is really hard to get
+ // everything started up reliably. So just be super generous on timeouts and
+ // hope for the best. We can be more generous in the future if we need to.
+ //
+ // We are faking the application names by passing in --application_name=foo
+ aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_server_config.json");
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_client_config.json");
+
+ FLAGS_application_name = "pi1_message_bridge_server";
+ // Force ourselves to be "raspberrypi" and allocate everything.
+ FLAGS_override_hostname = "raspberrypi";
+ aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ FLAGS_application_name = "pi1_message_bridge_client";
+ aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
+ MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+ // And build the app for testing.
+ FLAGS_application_name = "test1";
+ aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+ aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
+ pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
+
+ // Now do it for "raspberrypi2", the client.
+ FLAGS_override_hostname = "raspberrypi2";
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+ // And build the app for testing.
+ FLAGS_application_name = "test2";
+ aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
+ aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
+ pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
+
+ // Wait until we are connected, then send.
+ pi1_test_event_loop.MakeWatcher(
+ "/pi1/aos", [](const ServerStatistics &stats) {
+ LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi2_test_event_loop.MakeWatcher(
+ "/pi2/aos", [](const ServerStatistics &stats) {
+ LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi1_test_event_loop.MakeWatcher(
+ "/pi1/aos", [](const ClientStatistics &stats) {
+ LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi2_test_event_loop.MakeWatcher(
+ "/pi2/aos", [](const ClientStatistics &stats) {
+ LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp ×tamp) {
+ LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(×tamp);
+ });
+ pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp ×tamp) {
+ LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(×tamp);
+ });
+
+ // Start everything up. Pong is the only thing we don't know how to wait on,
+ // so start it first.
+ std::thread pi1_test_thread(
+ [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
+ std::thread pi2_test_thread(
+ [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
+
+ std::thread pi1_server_thread(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+ std::thread pi1_client_thread(
+ [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+ std::thread pi2_server_thread(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
+
+ {
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ // Run for 5 seconds to make sure we have time to estimate the offset.
+ aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+ pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_client_event_loop.monotonic_now() +
+ chrono::milliseconds(3050));
+ });
+
+ // And go!
+ pi2_client_event_loop.Run();
+
+ // Now confirm we are synchronized.
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+ const ServerConnection *const pi1_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ServerConnection *const pi2_connection =
+ pi2_server_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+
+ EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ }
+
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ {
+ // Now confirm we are un-synchronized.
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+ const ServerConnection *const pi1_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ServerConnection *const pi2_connection =
+ pi2_server_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_connection->state(), State::DISCONNECTED);
+ EXPECT_FALSE(pi1_connection->has_monotonic_offset());
+ EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+ EXPECT_FALSE(pi2_connection->has_monotonic_offset());
+ }
+
+ {
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ // Run for 5 seconds to make sure we have time to estimate the offset.
+ aos::TimerHandler *const quit = pi2_client_event_loop.AddTimer(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Exit(); });
+ pi2_client_event_loop.OnRun([quit, &pi2_client_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_client_event_loop.monotonic_now() +
+ chrono::milliseconds(3050));
+ });
+
+ // And go!
+ pi2_client_event_loop.Run();
+
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+ // Now confirm we are synchronized again.
+ const ServerConnection *const pi1_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ServerConnection *const pi2_connection =
+ pi2_server_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+
+ EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ }
+
+ // Shut everyone else down
+ pi1_server_event_loop.Exit();
+ pi1_client_event_loop.Exit();
+ pi2_server_event_loop.Exit();
+ pi1_test_event_loop.Exit();
+ pi2_test_event_loop.Exit();
+ pi1_server_thread.join();
+ pi1_client_thread.join();
+ pi2_server_thread.join();
+ pi1_test_thread.join();
+ pi2_test_thread.join();
+}
+
+// Test that the server disconnecting triggers the server offsets on the other
+// side to clear, along with the other client.
+TEST(MessageBridgeTest, ServerRestart) {
+ // This is rather annoying to set up. We need to start up a client and
+ // server, on the same node, but get them to think that they are on different
+ // nodes.
+ //
+ // We need the client to not post directly to "/test" like it would in a
+ // real system, otherwise we will re-send the ping message... So, use an
+ // application specific map to have the client post somewhere else.
+ //
+ // To top this all off, each of these needs to be done with a ShmEventLoop,
+ // which needs to run in a separate thread... And it is really hard to get
+ // everything started up reliably. So just be super generous on timeouts and
+ // hope for the best. We can be more generous in the future if we need to.
+ //
+ // We are faking the application names by passing in --application_name=foo
+ aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_server_config.json");
+ aos::FlatbufferDetachedBuffer<aos::Configuration> pi2_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_client_config.json");
+
+ FLAGS_application_name = "pi1_message_bridge_server";
+ // Force ourselves to be "raspberrypi" and allocate everything.
+ FLAGS_override_hostname = "raspberrypi";
+ aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ FLAGS_application_name = "pi1_message_bridge_client";
+ aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
+ MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
+
+ // And build the app for testing.
+ FLAGS_application_name = "test1";
+ aos::ShmEventLoop pi1_test_event_loop(&server_config.message());
+ aos::Fetcher<ServerStatistics> pi1_server_statistics_fetcher =
+ pi1_test_event_loop.MakeFetcher<ServerStatistics>("/pi1/aos");
+ aos::Fetcher<ClientStatistics> pi1_client_statistics_fetcher =
+ pi1_test_event_loop.MakeFetcher<ClientStatistics>("/pi1/aos");
+
+ // Now do it for "raspberrypi2", the client.
+ FLAGS_override_hostname = "raspberrypi2";
+ FLAGS_application_name = "pi2_message_bridge_client";
+ aos::ShmEventLoop pi2_client_event_loop(&pi2_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ // And build the app for testing.
+ FLAGS_application_name = "test2";
+ aos::ShmEventLoop pi2_test_event_loop(&pi2_config.message());
+ aos::Fetcher<ServerStatistics> pi2_server_statistics_fetcher =
+ pi2_test_event_loop.MakeFetcher<ServerStatistics>("/pi2/aos");
+
+ // Wait until we are connected, then send.
+ pi1_test_event_loop.MakeWatcher(
+ "/pi1/aos", [](const ServerStatistics &stats) {
+ LOG(INFO) << "pi1 ServerStatistics " << FlatbufferToJson(&stats);
+ });
+
+ // Confirm both client and server statistics messages have decent offsets in
+ // them.
+ pi2_test_event_loop.MakeWatcher(
+ "/pi2/aos", [](const ServerStatistics &stats) {
+ LOG(INFO) << "pi2 ServerStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi1_test_event_loop.MakeWatcher(
+ "/pi1/aos", [](const ClientStatistics &stats) {
+ LOG(INFO) << "pi1 ClientStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi2_test_event_loop.MakeWatcher(
+ "/pi2/aos", [](const ClientStatistics &stats) {
+ LOG(INFO) << "pi2 ClientStatistics " << FlatbufferToJson(&stats);
+ });
+
+ pi1_test_event_loop.MakeWatcher("/pi1/aos", [](const Timestamp ×tamp) {
+ LOG(INFO) << "pi1 Timestamp " << FlatbufferToJson(×tamp);
+ });
+ pi2_test_event_loop.MakeWatcher("/pi2/aos", [](const Timestamp ×tamp) {
+ LOG(INFO) << "pi2 Timestamp " << FlatbufferToJson(×tamp);
+ });
+
+ // Start everything up. Pong is the only thing we don't know how to wait on,
+ // so start it first.
+ std::thread pi1_test_thread(
+ [&pi1_test_event_loop]() { pi1_test_event_loop.Run(); });
+ std::thread pi2_test_thread(
+ [&pi2_test_event_loop]() { pi2_test_event_loop.Run(); });
+
+ std::thread pi1_server_thread(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+ std::thread pi1_client_thread(
+ [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+ std::thread pi2_client_thread(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+
+ {
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+ // Run for 5 seconds to make sure we have time to estimate the offset.
+ aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
+ pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_server_event_loop.monotonic_now() +
+ chrono::milliseconds(3050));
+ });
+
+ // And go!
+ pi2_server_event_loop.Run();
+
+ // Now confirm we are synchronized.
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+ const ServerConnection *const pi1_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ServerConnection *const pi2_connection =
+ pi2_server_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+
+ EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ }
+
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ {
+ // And confirm we are unsynchronized.
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi1_client_statistics_fetcher.Fetch());
+
+ const ServerConnection *const pi1_server_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ClientConnection *const pi1_client_connection =
+ pi1_client_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_server_connection->state(), State::CONNECTED);
+ EXPECT_FALSE(pi1_server_connection->has_monotonic_offset());
+ EXPECT_EQ(pi1_client_connection->state(), State::DISCONNECTED);
+ EXPECT_FALSE(pi1_client_connection->has_monotonic_offset());
+ }
+
+ {
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&pi2_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
+
+ // Run for 5 seconds to make sure we have time to estimate the offset.
+ aos::TimerHandler *const quit = pi2_server_event_loop.AddTimer(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Exit(); });
+ pi2_server_event_loop.OnRun([quit, &pi2_server_event_loop]() {
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(pi2_server_event_loop.monotonic_now() +
+ chrono::milliseconds(3050));
+ });
+
+ // And go!
+ pi2_server_event_loop.Run();
+
+ // And confirm we are synchronized again.
+ EXPECT_TRUE(pi1_server_statistics_fetcher.Fetch());
+ EXPECT_TRUE(pi2_server_statistics_fetcher.Fetch());
+
+ const ServerConnection *const pi1_connection =
+ pi1_server_statistics_fetcher->connections()->Get(0);
+ const ServerConnection *const pi2_connection =
+ pi2_server_statistics_fetcher->connections()->Get(0);
+
+ EXPECT_EQ(pi1_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi1_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi1_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+
+ EXPECT_EQ(pi2_connection->state(), State::CONNECTED);
+ EXPECT_TRUE(pi2_connection->has_monotonic_offset());
+ EXPECT_LT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(pi2_connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ }
+
+ // Shut everyone else down
+ pi1_server_event_loop.Exit();
+ pi1_client_event_loop.Exit();
+ pi2_client_event_loop.Exit();
+ pi1_test_event_loop.Exit();
+ pi2_test_event_loop.Exit();
+ pi1_server_thread.join();
+ pi1_client_thread.join();
+ pi2_client_thread.join();
+ pi1_test_thread.join();
+ pi2_test_thread.join();
+}
+
+// TODO(austin): This test confirms that the external state does the right
+// thing, but doesn't confirm that the internal state does. We either need to
+// expose a way to check the state in a thread-safe way, or need a way to jump
+// time for one node to do that.
+
} // namespace testing
} // namespace message_bridge
} // namespace aos
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
index 1092e4b..a8475b1 100644
--- a/aos/network/message_bridge_test_common.json
+++ b/aos/network/message_bridge_test_common.json
@@ -133,7 +133,7 @@
"maps": [
{
"match": {
- "name": "/aos",
+ "name": "/aos*",
"source_node": "pi1"
},
"rename": {
@@ -142,7 +142,7 @@
},
{
"match": {
- "name": "/aos",
+ "name": "/aos*",
"source_node": "pi2"
},
"rename": {
@@ -161,7 +161,7 @@
},
{
"match": {
- "name": "/pi1/aos",
+ "name": "/pi1/aos*",
"source_node": "pi2"
},
"rename": {
@@ -170,7 +170,7 @@
},
{
"match": {
- "name": "/pi2/aos",
+ "name": "/pi2/aos*",
"source_node": "pi1"
},
"rename": {
diff --git a/aos/network/timestamp_filter.h b/aos/network/timestamp_filter.h
index 6f2baca..88561b9 100644
--- a/aos/network/timestamp_filter.h
+++ b/aos/network/timestamp_filter.h
@@ -120,6 +120,13 @@
return last_time_ != aos::monotonic_clock::min_time;
}
+ void Reset() {
+ offset_ = 0;
+
+ last_time_ = aos::monotonic_clock::min_time;
+ base_offset_ = std::chrono::nanoseconds(0);
+ }
+
private:
double offset_ = 0;
@@ -245,6 +252,19 @@
(last_rev_time_ == aos::monotonic_clock::min_time);
}
+ void Reset() {
+ base_offset_ = std::chrono::nanoseconds(0);
+ offset_ = 0;
+
+ last_fwd_time_ = aos::monotonic_clock::min_time;
+ last_rev_time_ = aos::monotonic_clock::min_time;
+ first_fwd_time_ = aos::monotonic_clock::min_time;
+ first_rev_time_ = aos::monotonic_clock::min_time;
+
+ fwd_.Reset();
+ rev_.Reset();
+ }
+
private:
// Updates the offset estimate given the current time, and a pointer to the
// variable holding the last time.
@@ -283,7 +303,14 @@
*last_time = monotonic_now;
if (sample_pointer_ != nullptr) {
- *sample_pointer_ = offset_;
+ // TODO(austin): Probably shouldn't do the update if we don't have fwd and
+ // reverse samples.
+ if (!MissingSamples()) {
+ *sample_pointer_ = offset_;
+ VLOG(1) << "Updating sample to " << offset_;
+ } else {
+ LOG(WARNING) << "Don't have both samples.";
+ }
}
}