Sort messages between nodes properly
We used to assume the realtime clocks were in sync. This isn't
realistic. Use the timestamps on forwarded messages in each
direction to observe the network latency and the offset between nodes.
Since time is no longer exactly linear with all the adjustments, we need
to redo how events are scheduled. They can't be converted to the
distributed_clock once. They need to now be converted every time they
are compared between nodes.
Change-Id: I1888c1e6a12f475c321a73aa020b0dc0bab107b3
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 9f2969e..9e69ae4 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -325,10 +325,10 @@
event_loop_factory_.RunFor(chrono::milliseconds(95));
Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
- std::chrono::milliseconds(100));
+ chrono::milliseconds(100));
Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
- std::chrono::milliseconds(100));
+ chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
@@ -547,12 +547,12 @@
event_loop_factory_.RunFor(chrono::milliseconds(95));
Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
- std::chrono::milliseconds(100));
+ chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(200));
Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
- std::chrono::milliseconds(100));
+ chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
@@ -638,12 +638,11 @@
reader.Deregister();
}
-// TODO(austin): We can write a test which recreates a logfile and confirms that
-// we get it back. That is the ultimate test.
-// Tests that we can read log files where the monotonic clocks don't match
-// correctly.
-TEST_F(MultinodeLoggerTest, MissmatchingTimeStart) {
+// Tests that we can read log files where the monotonic clocks drift and don't
+// match correctly. While we are here, also test that different ending times
+// also is readable.
+TEST_F(MultinodeLoggerTest, MismatchedClocks) {
const ::std::string tmpdir(getenv("TEST_TMPDIR"));
const ::std::string logfile_base = tmpdir + "/multi_logfile";
const ::std::string logfile1 = logfile_base + "_pi1_data.bfbs";
@@ -659,47 +658,76 @@
LOG(INFO) << "Logging data to " << logfile1 << " and " << logfile3;
{
- NodeEventLoopFactory *pi2 = event_loop_factory_.GetNodeEventLoopFactory(pi2_);
- LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
- << pi2->realtime_now() << " distributed "
- << pi2->ToDistributedClock(pi2->monotonic_now());
+ NodeEventLoopFactory *pi2 =
+ event_loop_factory_.GetNodeEventLoopFactory(pi2_);
+ LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
+ << pi2->realtime_now() << " distributed "
+ << pi2->ToDistributedClock(pi2->monotonic_now());
- pi2->SetMonotonicNow(pi2->monotonic_now() + std::chrono::seconds(1000));
- LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
- << pi2->realtime_now() << " distributed "
- << pi2->ToDistributedClock(pi2->monotonic_now());
+ const chrono::nanoseconds initial_pi2_offset = -chrono::seconds(1000);
+ chrono::nanoseconds pi2_offset = initial_pi2_offset;
- std::unique_ptr<EventLoop> ping_event_loop =
- event_loop_factory_.MakeEventLoop("ping", pi1_);
- Ping ping(ping_event_loop.get());
- std::unique_ptr<EventLoop> pong_event_loop =
- event_loop_factory_.MakeEventLoop("pong", pi2_);
- Pong pong(pong_event_loop.get());
+ pi2->SetDistributedOffset(pi2_offset);
+ LOG(INFO) << "pi2 times: " << pi2->monotonic_now() << " "
+ << pi2->realtime_now() << " distributed "
+ << pi2->ToDistributedClock(pi2->monotonic_now());
- std::unique_ptr<EventLoop> pi1_logger_event_loop =
- event_loop_factory_.MakeEventLoop("logger", pi1_);
- std::unique_ptr<LogNamer> pi1_log_namer =
- std::make_unique<MultiNodeLogNamer>(
- logfile_base, pi1_logger_event_loop->configuration(),
- pi1_logger_event_loop->node());
+ std::unique_ptr<EventLoop> ping_event_loop =
+ event_loop_factory_.MakeEventLoop("ping", pi1_);
+ Ping ping(ping_event_loop.get());
+ std::unique_ptr<EventLoop> pong_event_loop =
+ event_loop_factory_.MakeEventLoop("pong", pi2_);
+ Pong pong(pong_event_loop.get());
- std::unique_ptr<EventLoop> pi2_logger_event_loop =
- event_loop_factory_.MakeEventLoop("logger", pi2_);
- std::unique_ptr<LogNamer> pi2_log_namer =
- std::make_unique<MultiNodeLogNamer>(
- logfile_base, pi2_logger_event_loop->configuration(),
- pi2_logger_event_loop->node());
+ std::unique_ptr<EventLoop> pi2_logger_event_loop =
+ event_loop_factory_.MakeEventLoop("logger", pi2_);
+ std::unique_ptr<LogNamer> pi2_log_namer =
+ std::make_unique<MultiNodeLogNamer>(
+ logfile_base, pi2_logger_event_loop->configuration(),
+ pi2_logger_event_loop->node());
- event_loop_factory_.RunFor(chrono::milliseconds(95));
+ for (int i = 0; i < 95; ++i) {
+ pi2_offset += chrono::nanoseconds(200);
+ pi2->SetDistributedOffset(pi2_offset);
+ event_loop_factory_.RunFor(chrono::milliseconds(1));
+ }
- Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
- std::chrono::milliseconds(100));
+ Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
+ chrono::milliseconds(100));
- event_loop_factory_.RunFor(chrono::milliseconds(200));
+ event_loop_factory_.RunFor(chrono::milliseconds(200));
- Logger pi2_logger(std::move(pi2_log_namer), pi2_logger_event_loop.get(),
- std::chrono::milliseconds(100));
- event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ {
+ // Run pi1's logger for only part of the time.
+ std::unique_ptr<EventLoop> pi1_logger_event_loop =
+ event_loop_factory_.MakeEventLoop("logger", pi1_);
+ std::unique_ptr<LogNamer> pi1_log_namer =
+ std::make_unique<MultiNodeLogNamer>(
+ logfile_base, pi1_logger_event_loop->configuration(),
+ pi1_logger_event_loop->node());
+
+ Logger pi1_logger(std::move(pi1_log_namer), pi1_logger_event_loop.get(),
+ chrono::milliseconds(100));
+
+ for (int i = 0; i < 20000; ++i) {
+ pi2_offset += chrono::nanoseconds(200);
+ pi2->SetDistributedOffset(pi2_offset);
+ event_loop_factory_.RunFor(chrono::milliseconds(1));
+ }
+
+ EXPECT_GT(pi2_offset - initial_pi2_offset,
+ event_loop_factory_.send_delay() +
+ event_loop_factory_.network_delay());
+
+ for (int i = 0; i < 40000; ++i) {
+ pi2_offset -= chrono::nanoseconds(200);
+ pi2->SetDistributedOffset(pi2_offset);
+ event_loop_factory_.RunFor(chrono::milliseconds(1));
+ }
+ }
+
+ // And log a bit more on pi2.
+ event_loop_factory_.RunFor(chrono::milliseconds(400));
}
LogReader reader(
@@ -785,14 +813,17 @@
});
log_reader_factory.Run();
- EXPECT_EQ(pi1_ping_count, 2030);
- EXPECT_EQ(pi2_ping_count, 2030);
- EXPECT_EQ(pi1_pong_count, 2030);
- EXPECT_EQ(pi2_pong_count, 2030);
+ EXPECT_EQ(pi1_ping_count, 6030);
+ EXPECT_EQ(pi2_ping_count, 6030);
+ EXPECT_EQ(pi1_pong_count, 6030);
+ EXPECT_EQ(pi2_pong_count, 6030);
reader.Deregister();
}
+// TODO(austin): We can write a test which recreates a logfile and confirms that
+// we get it back. That is the ultimate test.
+
} // namespace testing
} // namespace logger
} // namespace aos