Add monotonic_offset to the ServerStatistics message
Send a base level of traffic between all nodes which should be
forwarding messages between each other. This helps replay, and also
gives us enough information to estimate the monotonic clock offset
between nodes. That lets us sort out latencies between nodes.
Change-Id: I9b10243aca2444e201d0d8c0a551e29560d0e147
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
index 383c1c4..c1dd1ca 100644
--- a/aos/network/message_bridge_test.cc
+++ b/aos/network/message_bridge_test.cc
@@ -49,8 +49,12 @@
FLAGS_application_name = "pi1_message_bridge_server";
// Force ourselves to be "raspberrypi" and allocate everything.
FLAGS_override_hostname = "raspberrypi";
- aos::ShmEventLoop server_event_loop(&server_config.message());
- MessageBridgeServer message_bridge_server(&server_event_loop);
+ aos::ShmEventLoop pi1_server_event_loop(&server_config.message());
+ MessageBridgeServer pi1_message_bridge_server(&pi1_server_event_loop);
+
+ FLAGS_application_name = "pi1_message_bridge_client";
+ aos::ShmEventLoop pi1_client_event_loop(&server_config.message());
+ MessageBridgeClient pi1_message_bridge_client(&pi1_client_event_loop);
// And build the app which sends the pings.
FLAGS_application_name = "ping";
@@ -61,42 +65,41 @@
// Now do it for "raspberrypi2", the client.
FLAGS_application_name = "pi2_message_bridge_client";
FLAGS_override_hostname = "raspberrypi2";
- aos::ShmEventLoop client_event_loop(&client_config.message());
- MessageBridgeClient message_bridge_client(&client_event_loop);
+ aos::ShmEventLoop pi2_client_event_loop(&client_config.message());
+ MessageBridgeClient pi2_message_bridge_client(&pi2_client_event_loop);
+
+ FLAGS_application_name = "pi2_message_bridge_server";
+ aos::ShmEventLoop pi2_server_event_loop(&client_config.message());
+ MessageBridgeServer pi2_message_bridge_server(&pi2_server_event_loop);
// And build the app which sends the pongs.
FLAGS_application_name = "pong";
aos::ShmEventLoop pong_event_loop(&client_config.message());
+ // And build the app for testing.
+ FLAGS_application_name = "test";
+ aos::ShmEventLoop test_event_loop(&client_config.message());
+
+ aos::Fetcher<ClientStatistics> client_statistics_fetcher =
+ test_event_loop.MakeFetcher<ClientStatistics>("/aos");
+
// Count the pongs.
int pong_count = 0;
pong_event_loop.MakeWatcher(
- "/test2", [&pong_count, &ping_event_loop](const examples::Ping &ping) {
+ "/test2", [&pong_count](const examples::Ping &ping) {
++pong_count;
LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
- if (pong_count >= 2) {
- LOG(INFO) << "That's enough bailing early.";
- // And Exit is async safe, so thread safe is easy.
- ping_event_loop.Exit();
- }
});
FLAGS_override_hostname = "";
- // Start everything up. Pong is the only thing we don't know how to wait on,
- // so start it first.
- std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
-
- std::thread server_thread(
- [&server_event_loop]() { server_event_loop.Run(); });
- std::thread client_thread(
- [&client_event_loop]() { client_event_loop.Run(); });
-
// Wait until we are connected, then send.
int ping_count = 0;
+ int pi1_server_statistics_count = 0;
ping_event_loop.MakeWatcher(
- "/aos/pi1", [&ping_count, &client_event_loop,
- &ping_sender](const ServerStatistics &stats) {
+ "/aos/pi1",
+ [&ping_count, &pi2_client_event_loop, &ping_sender,
+ &pi1_server_statistics_count](const ServerStatistics &stats) {
LOG(INFO) << FlatbufferToJson(&stats);
ASSERT_TRUE(stats.has_connections());
@@ -104,12 +107,21 @@
bool connected = false;
for (const ServerConnection *connection : *stats.connections()) {
+ // Confirm that we are estimating the server time offset correctly. It
+ // should be about 0 since we are on the same machine here.
+ if (connection->has_monotonic_offset()) {
+ EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ ++pi1_server_statistics_count;
+ }
+
if (connection->node()->name()->string_view() ==
- client_event_loop.node()->name()->string_view()) {
+ pi2_client_event_loop.node()->name()->string_view()) {
if (connection->state() == State::CONNECTED) {
connected = true;
}
- break;
}
}
@@ -124,29 +136,136 @@
}
});
- // Time ourselves out after a while if Pong doesn't do it for us.
+ // Confirm both client and server statistics messages have decent offsets in
+ // them.
+ int pi2_server_statistics_count = 0;
+ pong_event_loop.MakeWatcher("/aos/pi2", [&pi2_server_statistics_count](
+ const ServerStatistics &stats) {
+ LOG(INFO) << FlatbufferToJson(&stats);
+ for (const ServerConnection *connection : *stats.connections()) {
+ if (connection->has_monotonic_offset()) {
+ ++pi2_server_statistics_count;
+ // Confirm that we are estimating the server time offset correctly. It
+ // should be about 0 since we are on the same machine here.
+ EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(1));
+ EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(-1));
+ }
+ }
+ });
+
+ int pi1_client_statistics_count = 0;
+ ping_event_loop.MakeWatcher(
+ "/aos/pi1", [&pi1_client_statistics_count](const ClientStatistics &stats) {
+ LOG(INFO) << FlatbufferToJson(&stats);
+
+ for (const ClientConnection *connection : *stats.connections()) {
+ if (connection->has_monotonic_offset()) {
+ ++pi1_client_statistics_count;
+ // It takes at least 10 microseconds to send a message between the
+ // client and server. The min (filtered) time shouldn't be over 10
+ // milliseconds on localhost. This might have to bump up if this is
+ // proving flaky.
+ EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(10));
+ EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::microseconds(10));
+ }
+ }
+ });
+
+ int pi2_client_statistics_count = 0;
+ pong_event_loop.MakeWatcher("/aos/pi2", [&pi2_client_statistics_count](
+ const ClientStatistics &stats) {
+ LOG(INFO) << FlatbufferToJson(&stats);
+
+ for (const ClientConnection *connection : *stats.connections()) {
+ if (connection->has_monotonic_offset()) {
+ ++pi2_client_statistics_count;
+ EXPECT_LT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::milliseconds(10));
+ EXPECT_GT(chrono::nanoseconds(connection->monotonic_offset()),
+ chrono::microseconds(10));
+ }
+ }
+ });
+
+ ping_event_loop.MakeWatcher("/aos/pi1", [](const Timestamp ×tamp) {
+ EXPECT_TRUE(timestamp.has_offsets());
+ LOG(INFO) << FlatbufferToJson(×tamp);
+ });
+ pong_event_loop.MakeWatcher("/aos/pi2", [](const Timestamp ×tamp) {
+ EXPECT_TRUE(timestamp.has_offsets());
+ LOG(INFO) << FlatbufferToJson(×tamp);
+ });
+
+ // Run for 5 seconds to make sure we have time to estimate the offset.
aos::TimerHandler *quit = ping_event_loop.AddTimer(
[&ping_event_loop]() { ping_event_loop.Exit(); });
ping_event_loop.OnRun([quit, &ping_event_loop]() {
- quit->Setup(ping_event_loop.monotonic_now() + chrono::seconds(10));
+ // Stop between timestamps, not exactly on them.
+ quit->Setup(ping_event_loop.monotonic_now() + chrono::milliseconds(5050));
});
+ // Start everything up. Pong is the only thing we don't know how to wait on,
+ // so start it first.
+ std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
+
+ std::thread pi1_server_thread(
+ [&pi1_server_event_loop]() { pi1_server_event_loop.Run(); });
+ std::thread pi1_client_thread(
+ [&pi1_client_event_loop]() { pi1_client_event_loop.Run(); });
+ std::thread pi2_client_thread(
+ [&pi2_client_event_loop]() { pi2_client_event_loop.Run(); });
+ std::thread pi2_server_thread(
+ [&pi2_server_event_loop]() { pi2_server_event_loop.Run(); });
// And go!
ping_event_loop.Run();
// Shut everyone else down
- server_event_loop.Exit();
- client_event_loop.Exit();
+ pi1_server_event_loop.Exit();
+ pi1_client_event_loop.Exit();
+ pi2_client_event_loop.Exit();
+ pi2_server_event_loop.Exit();
pong_event_loop.Exit();
- server_thread.join();
- client_thread.join();
+ pi1_server_thread.join();
+ pi1_client_thread.join();
+ pi2_client_thread.join();
+ pi2_server_thread.join();
pong_thread.join();
// Make sure we sent something.
EXPECT_GE(ping_count, 1);
// And got something back.
EXPECT_GE(pong_count, 1);
+
+ // Confirm that we are estimating a monotonic offset on the client.
+ ASSERT_TRUE(client_statistics_fetcher.Fetch());
+
+ EXPECT_EQ(client_statistics_fetcher->connections()->size(), 1u);
+ EXPECT_EQ(client_statistics_fetcher->connections()
+ ->Get(0)
+ ->node()
+ ->name()
+ ->string_view(),
+ "pi1");
+
+ // Make sure the offset in one direction is less than a second.
+ EXPECT_GT(
+ client_statistics_fetcher->connections()->Get(0)->monotonic_offset(), 0);
+ EXPECT_LT(
+ client_statistics_fetcher->connections()->Get(0)->monotonic_offset(),
+ 1000000000);
+
+ EXPECT_GE(pi1_server_statistics_count, 2);
+ EXPECT_GE(pi2_server_statistics_count, 2);
+ EXPECT_GE(pi1_client_statistics_count, 2);
+ EXPECT_GE(pi2_client_statistics_count, 2);
+
+ // TODO(austin): Need 2 servers going so we can do the round trip offset
+ // estimation.
}
} // namespace testing