Enforce max out of order duration contract when writing logs.
Actively enforce this metric on the writer side by tracking the newest
message received and fetcher data for the current message being logged.
If max out of order duration is violated, rotate the part file and
double the max out of order duration value for the next part file.
Eventually we'll reach a satisfactory value and continue logging.
On the log reader side, choose the highest max out of order duration
value among all part files with the same part uuid. This ensures we have
sufficient messages in queue while reading to guarantee that max out of
order duration contract is satisfied.
Change-Id: I1666ed54acfcbc59cb9aedbe2f0dee979ceca095
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index f010466..01f4d2f 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -158,11 +158,11 @@
}
}
- const std::vector<LogFile> sorted_log_files = SortParts(logfiles_);
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
{
using ::testing::UnorderedElementsAre;
- std::shared_ptr<const aos::Configuration> config =
- sorted_log_files[0].config;
+ std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
// Timing reports, pings
EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
@@ -401,7 +401,7 @@
}
}
- LogReader reader(sorted_log_files);
+ LogReader reader(sorted_parts);
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -572,6 +572,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
LogReader reader(sorted_parts, &config_.message());
// Remap just on pi1.
@@ -645,6 +646,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
LogReader reader(sorted_parts, &config_.message());
@@ -710,7 +712,9 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(SortParts(logfiles_));
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
int ping_count = 0;
// Adds a callback which mutates the value of the pong message before the
@@ -784,6 +788,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
LogReader reader(sorted_parts, &config_.message());
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
@@ -827,6 +832,7 @@
)");
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
"Log file and replay config need to have matching nodes lists.");
}
@@ -856,7 +862,9 @@
// Since we delay starting pi2, it already knows about all the timestamps so
// we don't end up with extra parts.
- LogReader reader(SortParts(actual_filenames));
+ const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -1015,8 +1023,10 @@
event_loop_factory_.RunFor(logger_run3);
}
- LogReader reader(
- SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3)));
+ const std::vector<LogFile> sorted_parts =
+ SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3));
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -1217,7 +1227,9 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(SortParts(logfiles_));
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
// Remap just on pi1.
reader.RemapLoggedChannel<aos::timing::Report>(
@@ -1300,7 +1312,9 @@
pi2_logger.AppendAllFilenames(&actual_filenames);
}
- LogReader reader(SortParts(actual_filenames));
+ const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
// Rename just on pi2. Add some global maps just to verify they get added in
// the config and used correctly.
@@ -1407,7 +1421,9 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(SortParts(logfiles_));
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
reader.RemapLoggedChannel<examples::Ping>("/test");
@@ -1488,7 +1504,9 @@
pi2_logger.AppendAllFilenames(&actual_filenames);
}
- LogReader reader(SortParts(actual_filenames));
+ const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
std::vector<MapT> maps;
{
@@ -2409,7 +2427,9 @@
// Confirm that we refuse to replay logs with missing boot uuids.
{
- LogReader reader(SortParts(pi1_reboot_logfiles_));
+ auto sorted_parts = SortParts(pi1_reboot_logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -2916,7 +2936,9 @@
// Confirm that we can actually sort the resulting log and read it.
{
- LogReader reader(SortParts(filenames));
+ auto sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -3078,7 +3100,9 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(SortParts(logfiles_));
+ auto sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
@@ -3135,7 +3159,9 @@
// And verify that we can run the LogReader over the relogged files without
// hitting any fatal errors.
{
- LogReader relogged_reader(SortParts(log_files));
+ auto sorted_parts = SortParts(log_files);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader relogged_reader(sorted_parts);
relogged_reader.Register();
relogged_reader.event_loop_factory()->Run();
@@ -3281,6 +3307,7 @@
// Confirm that we can parse the result. LogReader has enough internal CHECKs
// to confirm the right thing happened.
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
auto result = ConfirmReadable(filenames);
EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
chrono::seconds(1)));
@@ -3442,6 +3469,7 @@
// Confirm that we can parse the result. LogReader has enough internal CHECKs
// to confirm the right thing happened.
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
auto result = ConfirmReadable(filenames);
EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
@@ -3595,6 +3623,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
auto result = ConfirmReadable(filenames);
EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
@@ -3713,6 +3742,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -3781,6 +3811,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -3849,6 +3880,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -3919,6 +3951,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -3989,6 +4022,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -4041,6 +4075,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -4086,6 +4121,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
{
@@ -4198,6 +4234,7 @@
// Make sure we can read this.
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
auto result = ConfirmReadable(filenames);
}