Recreate remote timestamp logging in LogReader
It is useful to be able to log data, replay it into a simulation, and
then recreate a log again. To do this, we need remote timestamps to
work correctly.
When LogReader replays a forwarded message, it now creates the
corresponding MessageHeader and publishes it. It also tracks the queue
indicies such that the message is valid and can be logged.
Logger also translates channel indices as well when the logging config
is not the event loop config.
Change-Id: Iff6175a204b191c6f43a1d73ffce5b542925860c
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 108d4de..51994e3 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -5,6 +5,7 @@
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/network/timestamp_generated.h"
#include "aos/util/file.h"
#include "glog/logging.h"
#include "gmock/gmock.h"
@@ -412,16 +413,24 @@
std::unique_ptr<Logger> logger;
};
- LoggerState MakeLogger(const Node *node) {
- return {event_loop_factory_.MakeEventLoop("logger", node), {}};
+ LoggerState MakeLogger(const Node *node,
+ SimulatedEventLoopFactory *factory = nullptr) {
+ if (factory == nullptr) {
+ factory = &event_loop_factory_;
+ }
+ return {factory->MakeEventLoop("logger", node), {}};
}
- void StartLogger(LoggerState *logger) {
+ void StartLogger(LoggerState *logger, std::string logfile_base = "") {
+ if (logfile_base.empty()) {
+ logfile_base = logfile_base_;
+ }
+
logger->logger = std::make_unique<Logger>(logger->event_loop.get());
logger->logger->set_polling_period(std::chrono::milliseconds(100));
- logger->event_loop->OnRun([this, logger]() {
+ logger->event_loop->OnRun([logger, logfile_base]() {
logger->logger->StartLogging(std::make_unique<MultiNodeLogNamer>(
- logfile_base_, logger->event_loop->configuration(),
+ logfile_base, logger->event_loop->configuration(),
logger->event_loop->node()));
});
}
@@ -656,7 +665,7 @@
LogReader reader(structured_logfiles_);
- SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
// This sends out the fetched messages and advances time to the start of the
@@ -731,7 +740,7 @@
++pi2_ping_count;
});
- constexpr ssize_t kQueueIndexOffset = 0;
+ constexpr ssize_t kQueueIndexOffset = -9;
// Confirm that the ping and pong counts both match, and the value also
// matches.
pi1_event_loop->MakeWatcher(
@@ -771,7 +780,7 @@
<< pi2_event_loop->context().monotonic_event_time;
EXPECT_EQ(pi2_event_loop->context().remote_queue_index,
- pi2_pong_count + kQueueIndexOffset - 9);
+ pi2_pong_count + kQueueIndexOffset);
EXPECT_EQ(pi2_event_loop->context().monotonic_remote_time,
chrono::microseconds(200) +
@@ -854,7 +863,7 @@
LogReader reader(structured_logfiles_);
- SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
// This sends out the fetched messages and advances time to the start of the
@@ -994,7 +1003,7 @@
LogReader reader(structured_logfiles_);
- SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
const Node *pi1 =
@@ -1216,6 +1225,213 @@
reader.Deregister();
}
+// Tests that we properly recreate forwarded timestamps when replaying a log.
+// This should be enough that we can then re-run the logger and get a valid log
+// back.
+TEST_F(MultinodeLoggerTest, MessageHeader) {
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(95));
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ LogReader reader(structured_logfiles_);
+
+ SimulatedEventLoopFactory log_reader_factory(reader.configuration());
+ log_reader_factory.set_send_delay(chrono::microseconds(0));
+
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
+ reader.Register(&log_reader_factory);
+
+ const Node *pi1 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi1");
+ const Node *pi2 =
+ configuration::GetNode(log_reader_factory.configuration(), "pi2");
+
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+ LOG(INFO) << "now pi1 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+ LOG(INFO) << "now pi2 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
+ EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
+
+ reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
+
+ std::unique_ptr<EventLoop> pi1_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi1);
+ std::unique_ptr<EventLoop> pi2_event_loop =
+ log_reader_factory.MakeEventLoop("test", pi2);
+
+ MessageCounter<MessageHeader> pi1_original_message_header_counter(
+ pi1_event_loop.get(), "/original/aos/remote_timestamps/pi2");
+ MessageCounter<MessageHeader> pi2_original_message_header_counter(
+ pi2_event_loop.get(), "/original/aos/remote_timestamps/pi1");
+
+ aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi1_fetcher =
+ pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
+ aos::Fetcher<message_bridge::Timestamp> pi1_timestamp_on_pi2_fetcher =
+ pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi1/aos");
+
+ aos::Fetcher<examples::Ping> ping_on_pi1_fetcher =
+ pi1_event_loop->MakeFetcher<examples::Ping>("/test");
+ aos::Fetcher<examples::Ping> ping_on_pi2_fetcher =
+ pi2_event_loop->MakeFetcher<examples::Ping>("/test");
+
+ aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi2_fetcher =
+ pi2_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
+ aos::Fetcher<message_bridge::Timestamp> pi2_timestamp_on_pi1_fetcher =
+ pi1_event_loop->MakeFetcher<message_bridge::Timestamp>("/pi2/aos");
+
+ aos::Fetcher<examples::Pong> pong_on_pi2_fetcher =
+ pi2_event_loop->MakeFetcher<examples::Pong>("/test");
+ aos::Fetcher<examples::Pong> pong_on_pi1_fetcher =
+ pi1_event_loop->MakeFetcher<examples::Pong>("/test");
+
+ const size_t pi1_timestamp_channel = configuration::ChannelIndex(
+ pi1_event_loop->configuration(), pi1_timestamp_on_pi1_fetcher.channel());
+ const size_t ping_timestamp_channel = configuration::ChannelIndex(
+ pi2_event_loop->configuration(), ping_on_pi2_fetcher.channel());
+
+ const size_t pi2_timestamp_channel = configuration::ChannelIndex(
+ pi2_event_loop->configuration(), pi2_timestamp_on_pi2_fetcher.channel());
+ const size_t pong_timestamp_channel = configuration::ChannelIndex(
+ pi1_event_loop->configuration(), pong_on_pi1_fetcher.channel());
+
+ pi1_event_loop->MakeWatcher(
+ "/aos/remote_timestamps/pi2",
+ [&pi1_event_loop, pi1_timestamp_channel, ping_timestamp_channel,
+ &pi1_timestamp_on_pi1_fetcher, &pi1_timestamp_on_pi2_fetcher,
+ &ping_on_pi1_fetcher,
+ &ping_on_pi2_fetcher](const MessageHeader &header) {
+ const aos::monotonic_clock::time_point header_monotonic_sent_time(
+ chrono::nanoseconds(header.monotonic_sent_time()));
+ const aos::realtime_clock::time_point header_realtime_sent_time(
+ chrono::nanoseconds(header.realtime_sent_time()));
+ const aos::monotonic_clock::time_point header_monotonic_remote_time(
+ chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::realtime_clock::time_point header_realtime_remote_time(
+ chrono::nanoseconds(header.realtime_remote_time()));
+
+ const Context *pi1_context = nullptr;
+ const Context *pi2_context = nullptr;
+
+ if (header.channel_index() == pi1_timestamp_channel) {
+ ASSERT_TRUE(pi1_timestamp_on_pi1_fetcher.FetchNext());
+ ASSERT_TRUE(pi1_timestamp_on_pi2_fetcher.FetchNext());
+ pi1_context = &pi1_timestamp_on_pi1_fetcher.context();
+ pi2_context = &pi1_timestamp_on_pi2_fetcher.context();
+ } else if (header.channel_index() == ping_timestamp_channel) {
+ ASSERT_TRUE(ping_on_pi1_fetcher.FetchNext());
+ ASSERT_TRUE(ping_on_pi2_fetcher.FetchNext());
+ pi1_context = &ping_on_pi1_fetcher.context();
+ pi2_context = &ping_on_pi2_fetcher.context();
+ } else {
+ LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
+ << configuration::CleanedChannelToString(
+ pi1_event_loop->configuration()->channels()->Get(
+ header.channel_index()));
+ }
+
+ EXPECT_EQ(pi1_context->queue_index, header.remote_queue_index());
+ EXPECT_EQ(pi2_context->remote_queue_index, header.remote_queue_index());
+ EXPECT_EQ(pi2_context->queue_index, header.queue_index());
+
+ EXPECT_EQ(pi2_context->monotonic_event_time,
+ header_monotonic_sent_time);
+ EXPECT_EQ(pi2_context->realtime_event_time, header_realtime_sent_time);
+ EXPECT_EQ(pi2_context->realtime_remote_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_remote_time,
+ header_monotonic_remote_time);
+
+ EXPECT_EQ(pi1_context->realtime_event_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi1_context->monotonic_event_time,
+ header_monotonic_remote_time);
+ });
+ pi2_event_loop->MakeWatcher(
+ "/aos/remote_timestamps/pi1",
+ [&pi2_event_loop, pi2_timestamp_channel, pong_timestamp_channel,
+ &pi2_timestamp_on_pi2_fetcher, &pi2_timestamp_on_pi1_fetcher,
+ &pong_on_pi2_fetcher,
+ &pong_on_pi1_fetcher](const MessageHeader &header) {
+ const aos::monotonic_clock::time_point header_monotonic_sent_time(
+ chrono::nanoseconds(header.monotonic_sent_time()));
+ const aos::realtime_clock::time_point header_realtime_sent_time(
+ chrono::nanoseconds(header.realtime_sent_time()));
+ const aos::monotonic_clock::time_point header_monotonic_remote_time(
+ chrono::nanoseconds(header.monotonic_remote_time()));
+ const aos::realtime_clock::time_point header_realtime_remote_time(
+ chrono::nanoseconds(header.realtime_remote_time()));
+
+ const Context *pi2_context = nullptr;
+ const Context *pi1_context = nullptr;
+
+ if (header.channel_index() == pi2_timestamp_channel) {
+ ASSERT_TRUE(pi2_timestamp_on_pi2_fetcher.FetchNext());
+ ASSERT_TRUE(pi2_timestamp_on_pi1_fetcher.FetchNext());
+ pi2_context = &pi2_timestamp_on_pi2_fetcher.context();
+ pi1_context = &pi2_timestamp_on_pi1_fetcher.context();
+ } else if (header.channel_index() == pong_timestamp_channel) {
+ ASSERT_TRUE(pong_on_pi2_fetcher.FetchNext());
+ ASSERT_TRUE(pong_on_pi1_fetcher.FetchNext());
+ pi2_context = &pong_on_pi2_fetcher.context();
+ pi1_context = &pong_on_pi1_fetcher.context();
+ } else {
+ LOG(FATAL) << "Unknown channel " << FlatbufferToJson(&header) << " "
+ << configuration::CleanedChannelToString(
+ pi2_event_loop->configuration()->channels()->Get(
+ header.channel_index()));
+ }
+
+ EXPECT_EQ(pi2_context->queue_index, header.remote_queue_index());
+ EXPECT_EQ(pi1_context->remote_queue_index, header.remote_queue_index());
+ EXPECT_EQ(pi1_context->queue_index, header.queue_index());
+
+ EXPECT_EQ(pi1_context->monotonic_event_time,
+ header_monotonic_sent_time);
+ EXPECT_EQ(pi1_context->realtime_event_time, header_realtime_sent_time);
+ EXPECT_EQ(pi1_context->realtime_remote_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi1_context->monotonic_remote_time,
+ header_monotonic_remote_time);
+
+ EXPECT_EQ(pi2_context->realtime_event_time,
+ header_realtime_remote_time);
+ EXPECT_EQ(pi2_context->monotonic_event_time,
+ header_monotonic_remote_time);
+ });
+
+ // And confirm we can re-create a log again, while checking the contents.
+ {
+ LoggerState pi1_logger = MakeLogger(
+ configuration::GetNode(log_reader_factory.configuration(), pi1_),
+ &log_reader_factory);
+ LoggerState pi2_logger = MakeLogger(
+ configuration::GetNode(log_reader_factory.configuration(), pi2_),
+ &log_reader_factory);
+
+ StartLogger(&pi1_logger, "relogged");
+ StartLogger(&pi2_logger, "relogged");
+
+ log_reader_factory.Run();
+ }
+
+ EXPECT_EQ(pi2_original_message_header_counter.count(), 0u);
+ EXPECT_EQ(pi1_original_message_header_counter.count(), 0u);
+
+ reader.Deregister();
+}
+
// TODO(austin): We can write a test which recreates a logfile and confirms that
// we get it back. That is the ultimate test.