Enforce max out of order duration contract when writing logs.
Actively enforce this metric on the writer side by tracking the newest
message received and fetcher data for the current message being logged.
If max out of order duration is violated, rotate the part file and
double the max out of order duration value for the next part file.
Eventually we'll reach a satisfactory value and continue logging.
On the log reader side, choose the highest max out of order duration
value among all part files with the same part uuid. This ensures we have
sufficient messages in queue while reading to guarantee that max out of
order duration contract is satisfied.
Change-Id: I1666ed54acfcbc59cb9aedbe2f0dee979ceca095
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index 28223b2..bdcb454 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -32,7 +32,8 @@
log_namer_(log_namer),
reopen_(std::move(reopen)),
close_(std::move(close)),
- max_message_size_(max_message_size) {
+ max_message_size_(max_message_size),
+ max_out_of_order_duration_(log_namer_->base_max_out_of_order_duration()) {
state_.resize(configuration::NodesCount(log_namer->configuration_));
CHECK_LT(node_index_, state_.size());
}
@@ -79,6 +80,11 @@
state_[node_index_].boot_uuid = source_node_boot_uuid;
VLOG(1) << "Rebooted " << name();
+ newest_message_time_ = monotonic_clock::min_time;
+ // When a node reboots, parts_uuid changes but the same writer continues to
+ // write the data, so we can reset the max out of order duration. If we don't
+ // do this, the max out of order duration can grow to an unreasonable value.
+ max_out_of_order_duration_ = log_namer_->base_max_out_of_order_duration();
}
void NewDataWriter::UpdateBoot(const UUID &source_node_boot_uuid) {
@@ -186,7 +192,8 @@
void NewDataWriter::CopyMessage(DataEncoder::Copier *coppier,
const UUID &source_node_boot_uuid,
- aos::monotonic_clock::time_point now) {
+ aos::monotonic_clock::time_point now,
+ aos::monotonic_clock::time_point message_time) {
// Trigger a reboot if we detect the boot UUID change.
UpdateBoot(source_node_boot_uuid);
@@ -194,9 +201,65 @@
QueueHeader(MakeHeader());
}
+ bool max_out_of_order_duration_exceeded = false;
+ // Enforce max out of duration contract.
+ //
+ // Updates the newest message time.
+ // Rotate the part file if current message is more than
+ // max_out_of_order_duration behind the newest message we've logged so far.
+ if (message_time > newest_message_time_) {
+ newest_message_time_ = message_time;
+ }
+
+ // Don't consider messages before start up when checking for max out of order
+ // duration.
+ monotonic_clock::time_point monotonic_start_time =
+ log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid);
+
+ if (std::chrono::nanoseconds((newest_message_time_ -
+ std::max(monotonic_start_time, message_time))) >
+ max_out_of_order_duration_) {
+ // If the new message is older than 2 * max_out_order_duration, doubling it
+ // won't be sufficient.
+ //
+ // Example: newest_message_time = 10, logged_message_time = 5,
+ // max_ooo_duration = 2
+ //
+ // In this case actual max_ooo_duration = 10 - 5 = 5, but we double the
+ // existing max_ooo_duration we get 4 which is not sufficient.
+ //
+ // Take the max of the two values.
+ max_out_of_order_duration_ =
+ 2 * std::max(max_out_of_order_duration_,
+ std::chrono::nanoseconds(
+ (newest_message_time_ - message_time)));
+ max_out_of_order_duration_exceeded = true;
+ }
+
// If the start time has changed for this node, trigger a rotation.
- if (log_namer_->monotonic_start_time(node_index_, source_node_boot_uuid) !=
- monotonic_start_time_) {
+ if ((monotonic_start_time != monotonic_start_time_) ||
+ max_out_of_order_duration_exceeded) {
+ // If we just received a start time now, we will rotate parts shortly. Use a
+ // reasonable max out of order durationin the new header based on start time
+ // information available now.
+ if ((monotonic_start_time_ == monotonic_clock::min_time) &&
+ (monotonic_start_time != monotonic_clock::min_time)) {
+ // If we're writing current messages but we receive an older start time,
+ // we can pick a reasonable max ooo duration number for the next part.
+ //
+ // For example - Our current max ooo duration is 0.3 seconds. We're
+ // writing messages at 20 seconds and recieve a start time of 1 second. We
+ // don't need max ooo duration to be (20 - 1) = 19 seconds although that
+ // would still work.
+ //
+ // Pick the minimum max out of duration value that satisifies the
+ // requirement but bound the minimum at the base value we started with.
+ max_out_of_order_duration_ =
+ std::max(log_namer_->base_max_out_of_order_duration(),
+ std::min(max_out_of_order_duration_,
+ std::chrono::nanoseconds(newest_message_time_ -
+ monotonic_start_time)));
+ }
CHECK(header_written_);
Rotate();
}
@@ -221,8 +284,8 @@
} else {
CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
}
- return log_namer_->MakeHeader(node_index_, state_, parts_uuid(),
- parts_index_);
+ return log_namer_->MakeHeader(node_index_, state_, parts_uuid(), parts_index_,
+ max_out_of_order_duration_);
}
void NewDataWriter::QueueHeader(
@@ -278,7 +341,8 @@
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
size_t node_index, const std::vector<NewDataWriter::State> &state,
- const UUID &parts_uuid, int parts_index) {
+ const UUID &parts_uuid, int parts_index,
+ std::chrono::nanoseconds max_out_of_order_duration) {
const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
const Node *const source_node =
configuration::GetNode(configuration_, node_index);
@@ -479,8 +543,9 @@
if (!configuration_offset.IsNull()) {
log_file_header_builder.add_configuration(configuration_offset);
}
+
log_file_header_builder.add_max_out_of_order_duration(
- header_.message().max_out_of_order_duration());
+ max_out_of_order_duration.count());
NodeState *node_state = GetNodeState(node_index, source_node_boot_uuid);
log_file_header_builder.add_monotonic_start_time(
diff --git a/aos/events/logging/log_namer.h b/aos/events/logging/log_namer.h
index 8d69fbb..6408aea 100644
--- a/aos/events/logging/log_namer.h
+++ b/aos/events/logging/log_namer.h
@@ -52,6 +52,10 @@
}
size_t max_message_size() const { return max_message_size_; }
+ std::chrono::nanoseconds max_out_of_order_duration() const {
+ return max_out_of_order_duration_;
+ }
+
NewDataWriter(NewDataWriter &&other) = default;
aos::logger::NewDataWriter &operator=(NewDataWriter &&other) = default;
NewDataWriter(const NewDataWriter &) = delete;
@@ -73,7 +77,8 @@
// Coppies a message with the provided boot UUID.
void CopyMessage(DataEncoder::Copier *coppier,
const UUID &source_node_boot_uuid,
- aos::monotonic_clock::time_point now);
+ aos::monotonic_clock::time_point now,
+ aos::monotonic_clock::time_point message_time);
// Updates the current boot for the source node. This is useful when you want
// to queue a message that may trigger a reboot rotation, but then need to
@@ -160,6 +165,23 @@
std::vector<State> state_;
size_t max_message_size_;
+
+ // Each data writer logs the channels for that node, i.e.
+ // each data writer writes one file. We may encounter messages which
+ // violate the max out of order duration specified in the header of that file.
+ // Rotate the data writer and start a new part for that particular file.
+ // This shouldn't affect the headers of other data writers, so make this
+ // a property of individual data writer instead of the overall log.
+ std::chrono::nanoseconds max_out_of_order_duration_;
+
+ // Monotonic time point of the latest message we've logged so far, i.e
+ // Message X - time Z
+ // Message Y - time Z + 1
+ // newest_message_time_ = Z + 1 (even if X was logged after Y)
+ //
+ // Since the messages can be logged out of order, this helps determine if
+ // max out of order duration was violated.
+ monotonic_clock::time_point newest_message_time_ = monotonic_clock::min_time;
};
// Interface describing how to name, track, and add headers to log file parts.
@@ -252,6 +274,14 @@
return node_state->monotonic_start_time;
}
+ // This returns the initial out of order duration set in the header template
+ // by the logger based on polling period. It may be different than the actual
+ // duration used by the data writer.
+ std::chrono::nanoseconds base_max_out_of_order_duration() const {
+ return std::chrono::nanoseconds(
+ header_.message().max_out_of_order_duration());
+ }
+
protected:
// Structure with state per node about times and such.
struct NodeState {
@@ -271,7 +301,8 @@
// them with the arguments provided.
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
size_t node_index, const std::vector<NewDataWriter::State> &state,
- const UUID &parts_uuid, int parts_index);
+ const UUID &parts_uuid, int parts_index,
+ std::chrono::nanoseconds max_out_of_order_duration);
EventLoop *event_loop_;
const Configuration *const configuration_;
diff --git a/aos/events/logging/log_writer.cc b/aos/events/logging/log_writer.cc
index 56d0c4f..1e0f7c8 100644
--- a/aos/events/logging/log_writer.cc
+++ b/aos/events/logging/log_writer.cc
@@ -269,6 +269,7 @@
log_event_uuid_ = UUID::Random();
log_start_uuid_ = log_start_uuid;
+ log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
// We want to do as much work as possible before the initial Fetch. Time
// between that and actually starting to log opens up the possibility of
@@ -287,8 +288,6 @@
}
}
- log_namer_->SetHeaderTemplate(MakeHeader(config_sha256));
-
const aos::monotonic_clock::time_point beginning_time =
event_loop_->monotonic_now();
@@ -705,7 +704,11 @@
ContextDataCopier coppier(f.fetcher->context(), f.channel_index, f.log_type,
event_loop_);
- writer->CopyMessage(&coppier, source_node_boot_uuid, start);
+ aos::monotonic_clock::time_point message_time =
+ static_cast<int>(node_index_) != f.data_node_index
+ ? f.fetcher->context().monotonic_remote_time
+ : f.fetcher->context().monotonic_event_time;
+ writer->CopyMessage(&coppier, source_node_boot_uuid, start, message_time);
RecordCreateMessageTime(start, coppier.end_time(), f);
VLOG(2) << "Wrote data as node " << FlatbufferToJson(node_)
@@ -730,7 +733,8 @@
ContextDataCopier coppier(f.fetcher->context(), f.channel_index,
LogType::kLogDeliveryTimeOnly, event_loop_);
- timestamp_writer->CopyMessage(&coppier, event_loop_->boot_uuid(), start);
+ timestamp_writer->CopyMessage(&coppier, event_loop_->boot_uuid(), start,
+ f.fetcher->context().monotonic_event_time);
RecordCreateMessageTime(start, coppier.end_time(), f);
VLOG(2) << "Wrote timestamps as node " << FlatbufferToJson(node_)
@@ -780,8 +784,10 @@
RemoteMessageCopier coppier(msg, channel_index, monotonic_timestamp_time,
event_loop_);
- contents_writer->CopyMessage(&coppier, UUID::FromVector(msg->boot_uuid()),
- start);
+ contents_writer->CopyMessage(
+ &coppier, UUID::FromVector(msg->boot_uuid()), start,
+ monotonic_clock::time_point(
+ chrono::nanoseconds(msg->monotonic_sent_time())));
RecordCreateMessageTime(start, coppier.end_time(), f);
}
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 59d18d0..d27c301 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -138,7 +138,7 @@
// Start by grouping all parts by UUID, and extracting the part index.
// Datastructure to hold all the info extracted from a set of parts which go
-// together so we can sort them afterwords.
+// together so we can sort them afterwards.
struct UnsortedLogParts {
// Start times.
aos::monotonic_clock::time_point monotonic_start_time;
@@ -165,6 +165,15 @@
std::vector<std::pair<std::string, int>> parts;
std::string config_sha256;
+
+ // Largest max out of order duration among parts with monotonic start
+ // time greater than min_time
+ std::optional<std::chrono::nanoseconds>
+ max_out_of_order_duration_valid_start_time = std::nullopt;
+ // Largest max out of order duration among parts with monotonic start time
+ // equal to min_time.
+ std::optional<std::chrono::nanoseconds>
+ max_out_of_order_duration_min_start_time = std::nullopt;
};
// Struct to hold both the node, and the parts associated with it.
@@ -370,6 +379,9 @@
const realtime_clock::time_point logger_realtime_start_time(
chrono::nanoseconds(
log_header->message().logger_realtime_start_time()));
+ const std::chrono::nanoseconds max_out_of_order_duration =
+ std::chrono::nanoseconds(
+ log_header->message().max_out_of_order_duration());
const std::string_view node =
log_header->message().has_node()
@@ -505,6 +517,8 @@
old_parts.back().parts.config_sha256 = configuration_sha256;
old_parts.back().unsorted_parts.emplace_back(
std::make_pair(first_message_time, part.name));
+ old_parts.back().parts.max_out_of_order_duration =
+ max_out_of_order_duration;
old_parts.back().name = name;
} else {
result->unsorted_parts.emplace_back(
@@ -583,6 +597,24 @@
} else {
CHECK_EQ(it->second.config_sha256, configuration_sha256);
}
+ // Keep track of the largest max out of order duration times based on
+ // whether monotonic start time is available. We'll decide which value to
+ // use later when creating log files.
+ if (monotonic_start_time == monotonic_clock::min_time) {
+ it->second.max_out_of_order_duration_min_start_time =
+ it->second.max_out_of_order_duration_min_start_time.has_value()
+ ? std::max(
+ it->second.max_out_of_order_duration_min_start_time.value(),
+ max_out_of_order_duration)
+ : max_out_of_order_duration;
+ } else {
+ it->second.max_out_of_order_duration_valid_start_time =
+ it->second.max_out_of_order_duration_valid_start_time.has_value()
+ ? std::max(it->second.max_out_of_order_duration_valid_start_time
+ .value(),
+ max_out_of_order_duration)
+ : max_out_of_order_duration;
+ }
// We've got a newer log with boot_uuids, and oldest timestamps. Fill in
// this->boot_times with the info we have found.
@@ -1885,7 +1917,14 @@
new_parts.parts_uuid = parts.first.first;
new_parts.node = std::move(parts.second.node);
new_parts.boots = boot_counts;
-
+ // If there are no part files which have a monotonic start time greater
+ // than min time, use the max of whatever we have, else chose the max out
+ // of order duration of parts with monotonic start time greater than min
+ // time.
+ new_parts.max_out_of_order_duration =
+ parts.second.max_out_of_order_duration_valid_start_time.has_value()
+ ? parts.second.max_out_of_order_duration_valid_start_time.value()
+ : parts.second.max_out_of_order_duration_min_start_time.value();
{
auto boot_count_it =
boot_counts->boot_count_map.find(new_parts.source_boot_uuid);
@@ -2096,6 +2135,9 @@
stream << ",\n \"logger_realtime_start_time\": \""
<< parts.logger_realtime_start_time << "\"";
}
+ stream << ",\n \"max_out_of_order_duration\": \""
+ << parts.max_out_of_order_duration.count() << "\"";
+
stream << ",\n \"monotonic_start_time\": \"" << parts.monotonic_start_time
<< "\",\n \"realtime_start_time\": \"" << parts.realtime_start_time
<< "\",\n \"parts\": [";
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index b7f8297..375a102 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -84,6 +84,9 @@
// Information about all the boots that the system has observed.
std::shared_ptr<const Boots> boots;
+
+ // Highest max out of order durations among all parts.
+ std::chrono::nanoseconds max_out_of_order_duration;
};
// Datastructure to hold parts from the same run of the logger which have no
@@ -177,6 +180,10 @@
std::optional<const LogSource *> log_source() const { return log_source_; }
+ std::chrono::nanoseconds max_out_of_order_duration() const {
+ return log_parts_.max_out_of_order_duration;
+ }
+
std::string GetPartAt(size_t index) const {
CHECK_LT(index, log_parts_.parts.size());
return log_parts_.parts[index];
@@ -241,7 +248,7 @@
explicit LogFilesContainer(const LogSource *log_source)
: LogFilesContainer(log_source, SortParts(*log_source)) {}
- // Returns true when at least on of the log files associated with node.
+ // Returns true when at least one of the log files associated with node.
bool ContainsPartsForNode(std::string_view node_name) const {
// TODO (Alexei): Implement
// https://en.cppreference.com/w/cpp/container/unordered_map/find with C++20
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index ec6bff3..133bb79 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -1598,7 +1598,9 @@
PartsMessageReader::PartsMessageReader(LogPartsAccess log_parts_access)
: log_parts_access_(std::move(log_parts_access)),
- message_reader_(MakeSpanReader(log_parts_access_, 0)) {
+ message_reader_(MakeSpanReader(log_parts_access_, 0)),
+ max_out_of_order_duration_(
+ log_parts_access_.max_out_of_order_duration()) {
if (log_parts_access_.size() >= 2) {
next_message_reader_.emplace(MakeSpanReader(log_parts_access_, 1));
}
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 18902d3..c8d2e70 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -288,8 +288,8 @@
return raw_log_file_header_;
}
- // Returns the minimum maount of data needed to queue up for sorting before
- // ware guarenteed to not see data out of order.
+ // Returns the minimum amount of data needed to queue up for sorting before
+ // we're guarenteed to not see data out of order.
std::chrono::nanoseconds max_out_of_order_duration() const {
return max_out_of_order_duration_;
}
@@ -376,7 +376,7 @@
// Returns the minimum amount of data needed to queue up for sorting before
// we are guarenteed to not see data out of order.
std::chrono::nanoseconds max_out_of_order_duration() const {
- return message_reader_.max_out_of_order_duration();
+ return max_out_of_order_duration_;
}
// Returns the newest timestamp read out of the log file.
@@ -432,6 +432,8 @@
// Per node boot counts.
std::vector<std::optional<size_t>> boot_counts_;
+
+ const std::chrono::nanoseconds max_out_of_order_duration_;
};
// Stores MessageHeader as a flat header and inline, aligned block of data.
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index f4071e4..a0abcd8 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -216,8 +216,13 @@
writer.QueueSpan(m2.span());
}
+ // When parts are sorted, we choose the highest max out of order duration for
+ // all parts with the same part uuid.
const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
+ EXPECT_EQ(parts.size(), 1);
+ EXPECT_EQ(parts[0].parts.size(), 1);
+
PartsMessageReader reader(parts[0].parts[0]);
EXPECT_EQ(reader.filename(), logfile0);
@@ -225,16 +230,18 @@
// Confirm that the timestamps track, and the filename also updates.
// Read the first message.
EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
+ // Since config1 has higher max out of order duration, that will be used to
+ // read partfiles with same part uuid, i.e logfile0 and logfile1.
EXPECT_EQ(
reader.max_out_of_order_duration(),
- std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
+ std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
EXPECT_TRUE(reader.ReadMessage());
EXPECT_EQ(reader.filename(), logfile0);
EXPECT_EQ(reader.newest_timestamp(),
monotonic_clock::time_point(chrono::nanoseconds(1)));
EXPECT_EQ(
reader.max_out_of_order_duration(),
- std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
+ std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
// Read the second message.
EXPECT_TRUE(reader.ReadMessage());
@@ -252,6 +259,11 @@
reader.max_out_of_order_duration(),
std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
+
+ // Verify that the parts metadata has the correct max out of order duration.
+ EXPECT_EQ(
+ parts[0].parts[0].max_out_of_order_duration,
+ std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
}
// Tests that Message's operator < works as expected.
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index ec1a450..5bda225 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -32,6 +32,10 @@
// at most this duration (in nanoseconds). If the log reader buffers until
// it finds messages this much newer than it's simulation time, it will never
// find a message out of order.
+ //
+ // By definition this is the time for log reader to buffer messages after startup,
+ // i.e. messages sent before startup are not taken into account when checking for
+ // max out of order duration.
max_out_of_order_duration:long (id: 2);
// The configuration of the channels. It is valid to have a log file with
diff --git a/aos/events/logging/multinode_logger_test.cc b/aos/events/logging/multinode_logger_test.cc
index f010466..01f4d2f 100644
--- a/aos/events/logging/multinode_logger_test.cc
+++ b/aos/events/logging/multinode_logger_test.cc
@@ -158,11 +158,11 @@
}
}
- const std::vector<LogFile> sorted_log_files = SortParts(logfiles_);
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
{
using ::testing::UnorderedElementsAre;
- std::shared_ptr<const aos::Configuration> config =
- sorted_log_files[0].config;
+ std::shared_ptr<const aos::Configuration> config = sorted_parts[0].config;
// Timing reports, pings
EXPECT_THAT(CountChannelsData(config, logfiles_[2]),
@@ -401,7 +401,7 @@
}
}
- LogReader reader(sorted_log_files);
+ LogReader reader(sorted_parts);
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -572,6 +572,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
LogReader reader(sorted_parts, &config_.message());
// Remap just on pi1.
@@ -645,6 +646,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
LogReader reader(sorted_parts, &config_.message());
@@ -710,7 +712,9 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(SortParts(logfiles_));
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
int ping_count = 0;
// Adds a callback which mutates the value of the pong message before the
@@ -784,6 +788,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
LogReader reader(sorted_parts, &config_.message());
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
@@ -827,6 +832,7 @@
)");
const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
"Log file and replay config need to have matching nodes lists.");
}
@@ -856,7 +862,9 @@
// Since we delay starting pi2, it already knows about all the timestamps so
// we don't end up with extra parts.
- LogReader reader(SortParts(actual_filenames));
+ const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -1015,8 +1023,10 @@
event_loop_factory_.RunFor(logger_run3);
}
- LogReader reader(
- SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3)));
+ const std::vector<LogFile> sorted_parts =
+ SortParts(MakeLogFiles(logfile_base1_, logfile_base2_, 3, 3));
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -1217,7 +1227,9 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(SortParts(logfiles_));
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
// Remap just on pi1.
reader.RemapLoggedChannel<aos::timing::Report>(
@@ -1300,7 +1312,9 @@
pi2_logger.AppendAllFilenames(&actual_filenames);
}
- LogReader reader(SortParts(actual_filenames));
+ const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
// Rename just on pi2. Add some global maps just to verify they get added in
// the config and used correctly.
@@ -1407,7 +1421,9 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(SortParts(logfiles_));
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
reader.RemapLoggedChannel<examples::Ping>("/test");
@@ -1488,7 +1504,9 @@
pi2_logger.AppendAllFilenames(&actual_filenames);
}
- LogReader reader(SortParts(actual_filenames));
+ const std::vector<LogFile> sorted_parts = SortParts(actual_filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
std::vector<MapT> maps;
{
@@ -2409,7 +2427,9 @@
// Confirm that we refuse to replay logs with missing boot uuids.
{
- LogReader reader(SortParts(pi1_reboot_logfiles_));
+ auto sorted_parts = SortParts(pi1_reboot_logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -2916,7 +2936,9 @@
// Confirm that we can actually sort the resulting log and read it.
{
- LogReader reader(SortParts(filenames));
+ auto sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -3078,7 +3100,9 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(SortParts(logfiles_));
+ auto sorted_parts = SortParts(logfiles_);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader reader(sorted_parts);
reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
@@ -3135,7 +3159,9 @@
// And verify that we can run the LogReader over the relogged files without
// hitting any fatal errors.
{
- LogReader relogged_reader(SortParts(log_files));
+ auto sorted_parts = SortParts(log_files);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
+ LogReader relogged_reader(sorted_parts);
relogged_reader.Register();
relogged_reader.event_loop_factory()->Run();
@@ -3281,6 +3307,7 @@
// Confirm that we can parse the result. LogReader has enough internal CHECKs
// to confirm the right thing happened.
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
auto result = ConfirmReadable(filenames);
EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch() +
chrono::seconds(1)));
@@ -3442,6 +3469,7 @@
// Confirm that we can parse the result. LogReader has enough internal CHECKs
// to confirm the right thing happened.
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
auto result = ConfirmReadable(filenames);
EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
@@ -3595,6 +3623,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
auto result = ConfirmReadable(filenames);
EXPECT_THAT(result[0].first, ::testing::ElementsAre(realtime_clock::epoch()));
@@ -3713,6 +3742,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -3781,6 +3811,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -3849,6 +3880,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -3919,6 +3951,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -3989,6 +4022,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -4041,6 +4075,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
}
@@ -4086,6 +4121,7 @@
}
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
ConfirmReadable(filenames);
{
@@ -4198,6 +4234,7 @@
// Make sure we can read this.
const std::vector<LogFile> sorted_parts = SortParts(filenames);
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
auto result = ConfirmReadable(filenames);
}
diff --git a/aos/events/logging/multinode_logger_test_lib.cc b/aos/events/logging/multinode_logger_test_lib.cc
index fdee4d8..2e13814 100644
--- a/aos/events/logging/multinode_logger_test_lib.cc
+++ b/aos/events/logging/multinode_logger_test_lib.cc
@@ -446,6 +446,8 @@
EXPECT_THAT(sorted_parts[0].corrupted, ::testing::Eq(corrupted_parts));
EXPECT_THAT(sorted_parts[1].corrupted, ::testing::Eq(corrupted_parts));
+
+ EXPECT_TRUE(AllPartsMatchOutOfOrderDuration(sorted_parts));
}
void MultinodeLoggerTest::AddExtension(std::string_view extension) {
@@ -654,6 +656,23 @@
});
}
+bool AllPartsMatchOutOfOrderDuration(
+ const std::vector<LogFile> &files,
+ std::chrono::nanoseconds max_out_of_order_duration) {
+ for (const LogFile &file : files) {
+ for (const LogParts &parts : file.parts) {
+ if (parts.max_out_of_order_duration != max_out_of_order_duration) {
+ LOG(ERROR) << "Found an out of order duration of "
+ << parts.max_out_of_order_duration.count()
+ << "ns instead of " << max_out_of_order_duration.count()
+ << "ns for " << parts;
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
} // namespace testing
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/multinode_logger_test_lib.h b/aos/events/logging/multinode_logger_test_lib.h
index e94f626..cf05727 100644
--- a/aos/events/logging/multinode_logger_test_lib.h
+++ b/aos/events/logging/multinode_logger_test_lib.h
@@ -99,6 +99,13 @@
std::shared_ptr<const aos::Configuration> config,
std::string_view filename);
+// Returns true if all of the max_out_of_order_duration's match the provided
+// max_out_of_order_duration.
+bool AllPartsMatchOutOfOrderDuration(
+ const std::vector<LogFile> &files,
+ std::chrono::nanoseconds max_out_of_order_duration =
+ std::chrono::milliseconds(300));
+
class MultinodeLoggerTest : public ::testing::TestWithParam<
std::tuple<ConfigParams, CompressionParams>> {
public: