Fix log sorting for good
Unfortunately, this is really hard to split up, so a bunch of stuff
happens at once.
When a message is sent from node A -> node B, add support for sending
that timestamp back from node B to node A for logging.
{
"name": "/pi1/aos",
"type": "aos.message_bridge.Timestamp",
"source_node": "pi1",
"frequency": 10,
"max_size": 200,
"destination_nodes": [
{
"name": "pi2",
"priority": 1,
"timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
"timestamp_logger_nodes": ["pi1"]
}
]
},
This gives us a way to log enough information on node A such that
everything is self contained. We know all the messages we sent to B,
and when they got there, so we can recreate the time offset and replay
the node.
This data is then published over
{ "name": "/aos/remote_timestamps/pi2", "type": ".aos.logger.MessageHeader"}
The logger then treats that channel specially and log the contents
directly as though the message contents were received on the remote
node.
This (among other things) exposes log sorting problems. Use our fancy
new infinite precision noncausal filter to estimate time precise enough
to actually order events. This gets us down to 2-3 ns of error due to
integer precision.
Change-Id: Ia843c5176a2c4efc227e669c07d7bb4c7cbe7c91
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 3b10f9d..ab5f02c 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -4,6 +4,7 @@
#include "aos/events/ping_lib.h"
#include "aos/events/pong_lib.h"
#include "aos/events/simulated_event_loop.h"
+#include "aos/util/file.h"
#include "glog/logging.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
@@ -42,20 +43,20 @@
// the config.
TEST_F(LoggerTest, Starts) {
const ::std::string tmpdir(getenv("TEST_TMPDIR"));
- const ::std::string logfile = tmpdir + "/logfile.bfbs";
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string logfile = base_name + ".part0.bfbs";
// Remove it.
unlink(logfile.c_str());
LOG(INFO) << "Logging data to " << logfile;
{
- DetachedBufferWriter writer(logfile);
std::unique_ptr<EventLoop> logger_event_loop =
event_loop_factory_.MakeEventLoop("logger");
event_loop_factory_.RunFor(chrono::milliseconds(95));
- Logger logger(&writer, logger_event_loop.get(),
+ Logger logger(base_name, logger_event_loop.get(),
std::chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
@@ -101,8 +102,9 @@
// Tests that we can read and write rotated log files.
TEST_F(LoggerTest, RotatedLogFile) {
const ::std::string tmpdir(getenv("TEST_TMPDIR"));
- const ::std::string logfile0 = tmpdir + "/logfile0.bfbs";
- const ::std::string logfile1 = tmpdir + "/logfile1.bfbs";
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string logfile0 = base_name + ".part0.bfbs";
+ const ::std::string logfile1 = base_name + ".part1.bfbs";
// Remove it.
unlink(logfile0.c_str());
unlink(logfile1.c_str());
@@ -110,17 +112,16 @@
LOG(INFO) << "Logging data to " << logfile0 << " and " << logfile1;
{
- DetachedBufferWriter writer0(logfile0);
- DetachedBufferWriter writer1(logfile1);
std::unique_ptr<EventLoop> logger_event_loop =
event_loop_factory_.MakeEventLoop("logger");
event_loop_factory_.RunFor(chrono::milliseconds(95));
- Logger logger(&writer0, logger_event_loop.get(),
- std::chrono::milliseconds(100));
+ Logger logger(
+ std::make_unique<LocalLogNamer>(base_name, logger_event_loop->node()),
+ logger_event_loop.get(), std::chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(10000));
- logger.Rotate(&writer1);
+ logger.Rotate();
event_loop_factory_.RunFor(chrono::milliseconds(10000));
}
@@ -166,7 +167,8 @@
// Tests that a large number of messages per second doesn't overwhelm writev.
TEST_F(LoggerTest, ManyMessages) {
const ::std::string tmpdir(getenv("TEST_TMPDIR"));
- const ::std::string logfile = tmpdir + "/logfile.bfbs";
+ const ::std::string base_name = tmpdir + "/logfile";
+ const ::std::string logfile = base_name + ".part0.bfbs";
// Remove the log file.
unlink(logfile.c_str());
@@ -174,7 +176,6 @@
ping_.set_quiet(true);
{
- DetachedBufferWriter writer(logfile);
std::unique_ptr<EventLoop> logger_event_loop =
event_loop_factory_.MakeEventLoop("logger");
@@ -198,7 +199,7 @@
chrono::microseconds(50));
});
- Logger logger(&writer, logger_event_loop.get(),
+ Logger logger(base_name, logger_event_loop.get(),
std::chrono::milliseconds(100));
event_loop_factory_.RunFor(chrono::milliseconds(1000));
@@ -217,9 +218,35 @@
configuration::GetNode(event_loop_factory_.configuration(), "pi2")),
tmp_dir_(getenv("TEST_TMPDIR")),
logfile_base_(tmp_dir_ + "/multi_logfile"),
- logfiles_({logfile_base_ + "_pi1_data.bfbs",
- logfile_base_ + "_pi2_data/test/aos.examples.Pong.bfbs",
- logfile_base_ + "_pi2_data.bfbs"}),
+ logfiles_(
+ {logfile_base_ + "_pi1_data.part0.bfbs",
+ logfile_base_ + "_pi2_data/test/aos.examples.Pong.part0.bfbs",
+ logfile_base_ + "_pi2_data/test/aos.examples.Pong.part1.bfbs",
+ logfile_base_ + "_pi2_data.part0.bfbs",
+ logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.logger.MessageHeader.part0.bfbs",
+ logfile_base_ + "_timestamps/pi1/aos/remote_timestamps/pi2/"
+ "aos.logger.MessageHeader.part1.bfbs",
+ logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+ "aos.logger.MessageHeader.part0.bfbs",
+ logfile_base_ + "_timestamps/pi2/aos/remote_timestamps/pi1/"
+ "aos.logger.MessageHeader.part1.bfbs",
+ logfile_base_ +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part0.bfbs",
+ logfile_base_ +
+ "_pi1_data/pi1/aos/aos.message_bridge.Timestamp.part1.bfbs",
+ logfile_base_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part0.bfbs",
+ logfile_base_ +
+ "_pi2_data/pi2/aos/aos.message_bridge.Timestamp.part1.bfbs"}),
+ structured_logfiles_{
+ std::vector<std::string>{logfiles_[0]},
+ std::vector<std::string>{logfiles_[1], logfiles_[2]},
+ std::vector<std::string>{logfiles_[3]},
+ std::vector<std::string>{logfiles_[4], logfiles_[5]},
+ std::vector<std::string>{logfiles_[6], logfiles_[7]},
+ std::vector<std::string>{logfiles_[8], logfiles_[9]},
+ std::vector<std::string>{logfiles_[10], logfiles_[11]}},
ping_event_loop_(event_loop_factory_.MakeEventLoop("ping", pi1_)),
ping_(ping_event_loop_.get()),
pong_event_loop_(event_loop_factory_.MakeEventLoop("pong", pi2_)),
@@ -259,7 +286,9 @@
std::string tmp_dir_;
std::string logfile_base_;
- std::array<std::string, 3> logfiles_;
+ std::vector<std::string> logfiles_;
+
+ std::vector<std::vector<std::string>> structured_logfiles_;
std::unique_ptr<EventLoop> ping_event_loop_;
Ping ping_;
@@ -353,40 +382,96 @@
EXPECT_EQ(logheader2.message().node()->name()->string_view(), "pi2");
FlatbufferVector<LogFileHeader> logheader3 = ReadHeader(logfiles_[2]);
EXPECT_EQ(logheader3.message().node()->name()->string_view(), "pi2");
+ FlatbufferVector<LogFileHeader> logheader4 = ReadHeader(logfiles_[3]);
+ EXPECT_EQ(logheader4.message().node()->name()->string_view(), "pi2");
using ::testing::UnorderedElementsAre;
// Timing reports, pings
- EXPECT_THAT(CountChannelsData(logfiles_[0]),
- UnorderedElementsAre(
- std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
- std::make_tuple("/test", "aos.examples.Ping", 2001)));
+ EXPECT_THAT(
+ CountChannelsData(logfiles_[0]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200),
+ std::make_tuple("/pi1/aos", "aos.timing.Report", 40),
+ std::make_tuple("/test", "aos.examples.Ping", 2001)));
// Timestamps for pong
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[0]),
- UnorderedElementsAre(
- std::make_tuple("/test", "aos.examples.Pong", 2001)));
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[0]),
+ UnorderedElementsAre(
+ std::make_tuple("/test", "aos.examples.Pong", 2001),
+ std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200)));
// Pong data.
EXPECT_THAT(CountChannelsData(logfiles_[1]),
UnorderedElementsAre(
- std::make_tuple("/test", "aos.examples.Pong", 2001)));
+ std::make_tuple("/test", "aos.examples.Pong", 101)));
+ EXPECT_THAT(CountChannelsData(logfiles_[2]),
+ UnorderedElementsAre(
+ std::make_tuple("/test", "aos.examples.Pong", 1900)));
// No timestamps
EXPECT_THAT(CountChannelsTimestamp(logfiles_[1]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]), UnorderedElementsAre());
// Timing reports and pongs.
- EXPECT_THAT(CountChannelsData(logfiles_[2]),
- UnorderedElementsAre(
- std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
- std::make_tuple("/test", "aos.examples.Pong", 2001)));
+ EXPECT_THAT(
+ CountChannelsData(logfiles_[3]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi2/aos", "aos.message_bridge.Timestamp", 200),
+ std::make_tuple("/pi2/aos", "aos.timing.Report", 40),
+ std::make_tuple("/test", "aos.examples.Pong", 2001)));
// And ping timestamps.
- EXPECT_THAT(CountChannelsTimestamp(logfiles_[2]),
- UnorderedElementsAre(
- std::make_tuple("/test", "aos.examples.Ping", 2001)));
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[3]),
+ UnorderedElementsAre(
+ std::make_tuple("/test", "aos.examples.Ping", 2001),
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 200)));
+
+ // Timestamps from pi2 on pi1, and the other way.
+ EXPECT_THAT(CountChannelsData(logfiles_[4]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsData(logfiles_[5]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsData(logfiles_[6]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsData(logfiles_[7]), UnorderedElementsAre());
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[4]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 10),
+ std::make_tuple("/test", "aos.examples.Ping", 101)));
+ EXPECT_THAT(
+ CountChannelsTimestamp(logfiles_[5]),
+ UnorderedElementsAre(
+ std::make_tuple("/pi1/aos", "aos.message_bridge.Timestamp", 190),
+ std::make_tuple("/test", "aos.examples.Ping", 1900)));
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[6]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 10)));
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[7]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 190)));
+
+ // And then test that the remotely logged timestamp data files only have
+ // timestamps in them.
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[8]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[9]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[10]), UnorderedElementsAre());
+ EXPECT_THAT(CountChannelsTimestamp(logfiles_[11]), UnorderedElementsAre());
+
+ EXPECT_THAT(CountChannelsData(logfiles_[8]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi1/aos", "aos.message_bridge.Timestamp", 10)));
+ EXPECT_THAT(CountChannelsData(logfiles_[9]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi1/aos", "aos.message_bridge.Timestamp", 190)));
+
+ EXPECT_THAT(CountChannelsData(logfiles_[10]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 10)));
+ EXPECT_THAT(CountChannelsData(logfiles_[11]),
+ UnorderedElementsAre(std::make_tuple(
+ "/pi2/aos", "aos.message_bridge.Timestamp", 190)));
}
- LogReader reader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}});
+ LogReader reader(structured_logfiles_);
SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -400,6 +485,13 @@
const Node *pi2 =
configuration::GetNode(log_reader_factory.configuration(), "pi2");
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+ LOG(INFO) << "now pi1 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+ LOG(INFO) << "now pi2 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
EXPECT_THAT(reader.Nodes(), ::testing::ElementsAre(pi1, pi2));
reader.event_loop_factory()->set_send_delay(chrono::microseconds(0));
@@ -417,7 +509,7 @@
// Confirm that the ping value matches.
pi1_event_loop->MakeWatcher(
"/test", [&pi1_ping_count, &pi1_event_loop](const examples::Ping &ping) {
- VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping)
+ VLOG(1) << "Pi1 ping " << FlatbufferToJson(&ping) << " at "
<< pi1_event_loop->context().monotonic_remote_time << " -> "
<< pi1_event_loop->context().monotonic_event_time;
EXPECT_EQ(ping.value(), pi1_ping_count + 1);
@@ -436,7 +528,7 @@
});
pi2_event_loop->MakeWatcher(
"/test", [&pi2_ping_count, &pi2_event_loop](const examples::Ping &ping) {
- VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping)
+ VLOG(1) << "Pi2 ping " << FlatbufferToJson(&ping) << " at "
<< pi2_event_loop->context().monotonic_remote_time << " -> "
<< pi2_event_loop->context().monotonic_event_time;
EXPECT_EQ(ping.value(), pi2_ping_count + 1);
@@ -555,11 +647,8 @@
}
)");
- EXPECT_DEATH(LogReader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}},
- &extra_nodes_config.message()),
+ EXPECT_DEATH(LogReader(structured_logfiles_, &extra_nodes_config.message()),
"Log file and replay config need to have matching nodes lists.");
- ;
}
// Tests that we can read log files where they don't start at the same monotonic
@@ -580,8 +669,7 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}});
+ LogReader reader(structured_logfiles_);
SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -721,21 +809,27 @@
event_loop_factory_.RunFor(chrono::milliseconds(400));
}
- LogReader reader({std::vector<std::string>{logfiles_[0]},
- std::vector<std::string>{logfiles_[2]}});
+ LogReader reader(structured_logfiles_);
SimulatedEventLoopFactory log_reader_factory(reader.logged_configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
- // This sends out the fetched messages and advances time to the start of the
- // log file.
- reader.Register(&log_reader_factory);
-
const Node *pi1 =
configuration::GetNode(log_reader_factory.configuration(), "pi1");
const Node *pi2 =
configuration::GetNode(log_reader_factory.configuration(), "pi2");
+ // This sends out the fetched messages and advances time to the start of the
+ // log file.
+ reader.Register(&log_reader_factory);
+
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi1) << " pi1";
+ LOG(INFO) << "Start time " << reader.monotonic_start_time(pi2) << " pi2";
+ LOG(INFO) << "now pi1 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now();
+ LOG(INFO) << "now pi2 "
+ << log_reader_factory.GetNodeEventLoopFactory(pi2)->monotonic_now();
+
LOG(INFO) << "Done registering (pi1) "
<< log_reader_factory.GetNodeEventLoopFactory(pi1)->monotonic_now()
<< " "