Track matching forwarded times for log file sorting
Previously, the only thing we knew was a loose ordering based on part
index. That isn't very conclusive. This now provides us with
corresponding timestamps for when things cross the network boundary in a
way that we think should tell us definitively what came first.
There are 2 main problem cases. Let's say we have 2 channels. /a which
is reliable, and /b which isn't, both sent from the same remote node.
case 1: /a -> boot 0 received on boot 0.
/b -> boot 1 received on boot 0.
We start logging after both messages arrive.
case 2: /a -> boot 0 received on boot 0.
/b -> boot 0 received on boot 0.
We log for a bit, then reboot. More messages show up when we reconnect.
/a -> boot 0 received on boot 1.
/b -> boot 0 received on boot 1.
In case 1: we only have a reliable timestamp from boot 0, but that
reliable timestamp makes it clear that /a was before /b, so boot 0 was
before boot 1.
In case 2: we have the same reliable timestamp, so that tells us nothing.
The unreliable timestamps though tell a different story. /b will be after
/a, since any messages on /b generated before the reboot won't get
delivered. So, we get an ordering constraint saying that any sent /b's
on the second boot were after /b on the first boot.
We believe that any other cases are covered by the same mechanism.
Without fully implementing and debugging the sorting code, we won't know
for certain. Sanjay and I have been unable to break the logic so far.
Change-Id: I990bf249b18bf43072997cdb099ac66c2fa8fc57
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_namer.cc b/aos/events/logging/log_namer.cc
index a4b15e8..93dd02d 100644
--- a/aos/events/logging/log_namer.cc
+++ b/aos/events/logging/log_namer.cc
@@ -25,9 +25,8 @@
log_namer_(log_namer),
reopen_(std::move(reopen)),
close_(std::move(close)) {
- boot_uuids_.resize(configuration::NodesCount(log_namer->configuration_),
- UUID::Zero());
- CHECK_LT(node_index_, boot_uuids_.size());
+ state_.resize(configuration::NodesCount(log_namer->configuration_));
+ CHECK_LT(node_index_, state_.size());
reopen_(this);
}
@@ -54,14 +53,44 @@
header_written_ = false;
}
-void NewDataWriter::UpdateRemote(size_t remote_node_index,
- const UUID &remote_node_boot_uuid) {
- CHECK_LT(remote_node_index, boot_uuids_.size());
- if (boot_uuids_[remote_node_index] != remote_node_boot_uuid) {
+void NewDataWriter::UpdateRemote(
+ const size_t remote_node_index, const UUID &remote_node_boot_uuid,
+ const monotonic_clock::time_point monotonic_remote_time,
+ const monotonic_clock::time_point monotonic_event_time,
+ const bool reliable) {
+ bool rotate = false;
+ CHECK_LT(remote_node_index, state_.size());
+ State &state = state_[remote_node_index];
+ if (state.boot_uuid != remote_node_boot_uuid) {
VLOG(1) << filename() << " Remote " << remote_node_index << " updated to "
- << remote_node_boot_uuid << " from "
- << boot_uuids_[remote_node_index];
- boot_uuids_[remote_node_index] = remote_node_boot_uuid;
+ << remote_node_boot_uuid << " from " << state.boot_uuid;
+ state.boot_uuid = remote_node_boot_uuid;
+ state.oldest_remote_monotonic_timestamp = monotonic_clock::max_time;
+ state.oldest_local_monotonic_timestamp = monotonic_clock::max_time;
+ state.oldest_remote_unreliable_monotonic_timestamp =
+ monotonic_clock::max_time;
+ state.oldest_local_unreliable_monotonic_timestamp =
+ monotonic_clock::max_time;
+ rotate = true;
+ }
+
+ if (!reliable) {
+ if (state.oldest_remote_unreliable_monotonic_timestamp >
+ monotonic_remote_time) {
+ state.oldest_remote_unreliable_monotonic_timestamp =
+ monotonic_remote_time;
+ state.oldest_local_unreliable_monotonic_timestamp = monotonic_event_time;
+ rotate = true;
+ }
+ }
+
+ if (state.oldest_remote_monotonic_timestamp > monotonic_remote_time) {
+ state.oldest_remote_monotonic_timestamp = monotonic_remote_time;
+ state.oldest_local_monotonic_timestamp = monotonic_event_time;
+ rotate = true;
+ }
+
+ if (rotate) {
Rotate();
}
}
@@ -70,15 +99,15 @@
const UUID &source_node_boot_uuid,
aos::monotonic_clock::time_point now) {
// TODO(austin): Handle remote nodes changing too, not just the source node.
- if (boot_uuids_[node_index_] != source_node_boot_uuid) {
- boot_uuids_[node_index_] = source_node_boot_uuid;
+ if (state_[node_index_].boot_uuid != source_node_boot_uuid) {
+ state_[node_index_].boot_uuid = source_node_boot_uuid;
if (header_written_) {
Reboot();
}
QueueHeader(MakeHeader());
}
- CHECK_EQ(boot_uuids_[node_index_], source_node_boot_uuid);
+ CHECK_EQ(state_[node_index_].boot_uuid, source_node_boot_uuid);
CHECK(header_written_) << ": Attempting to write message before header to "
<< writer->filename();
writer->QueueSizedFlatbuffer(fbb, now);
@@ -88,14 +117,14 @@
NewDataWriter::MakeHeader() {
const size_t logger_node_index = log_namer_->logger_node_index();
const UUID &logger_node_boot_uuid = log_namer_->logger_node_boot_uuid();
- if (boot_uuids_[logger_node_index] == UUID::Zero()) {
+ if (state_[logger_node_index].boot_uuid == UUID::Zero()) {
VLOG(1) << filename() << " Logger node is " << logger_node_index
<< " and uuid is " << logger_node_boot_uuid;
- boot_uuids_[logger_node_index] = logger_node_boot_uuid;
+ state_[logger_node_index].boot_uuid = logger_node_boot_uuid;
} else {
- CHECK_EQ(boot_uuids_[logger_node_index], logger_node_boot_uuid);
+ CHECK_EQ(state_[logger_node_index].boot_uuid, logger_node_boot_uuid);
}
- return log_namer_->MakeHeader(node_index_, boot_uuids_, parts_uuid(),
+ return log_namer_->MakeHeader(node_index_, state_, parts_uuid(),
parts_index_);
}
@@ -104,7 +133,7 @@
CHECK(!header_written_) << ": Attempting to write duplicate header to "
<< writer->filename();
CHECK(header.message().has_source_node_boot_uuid());
- CHECK_EQ(boot_uuids_[node_index_],
+ CHECK_EQ(state_[node_index_].boot_uuid,
UUID::FromString(header.message().source_node_boot_uuid()));
// TODO(austin): This triggers a dummy allocation that we don't need as part
// of releasing. Can we skip it?
@@ -120,12 +149,12 @@
}
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> LogNamer::MakeHeader(
- size_t node_index, const std::vector<UUID> &boot_uuids,
+ size_t node_index, const std::vector<NewDataWriter::State> &state,
const UUID &parts_uuid, int parts_index) const {
- const UUID &source_node_boot_uuid = boot_uuids[node_index];
+ const UUID &source_node_boot_uuid = state[node_index].boot_uuid;
const Node *const source_node =
configuration::GetNode(configuration_, node_index);
- CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 20u);
+ CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 24u);
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
@@ -181,10 +210,10 @@
}
std::vector<flatbuffers::Offset<flatbuffers::String>> boot_uuid_offsets;
- boot_uuid_offsets.reserve(boot_uuids.size());
- for (const UUID &uuid : boot_uuids) {
- if (uuid != UUID::Zero()) {
- boot_uuid_offsets.emplace_back(uuid.PackString(&fbb));
+ boot_uuid_offsets.reserve(state.size());
+ for (const NewDataWriter::State &state : state) {
+ if (state.boot_uuid != UUID::Zero()) {
+ boot_uuid_offsets.emplace_back(state.boot_uuid.PackString(&fbb));
} else {
boot_uuid_offsets.emplace_back(fbb.CreateString(""));
}
@@ -194,6 +223,43 @@
flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
boot_uuids_offset = fbb.CreateVector(boot_uuid_offsets);
+ int64_t *oldest_remote_monotonic_timestamps;
+ flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+ oldest_remote_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
+ state.size(), &oldest_remote_monotonic_timestamps);
+
+ int64_t *oldest_local_monotonic_timestamps;
+ flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+ oldest_local_monotonic_timestamps_offset = fbb.CreateUninitializedVector(
+ state.size(), &oldest_local_monotonic_timestamps);
+
+ int64_t *oldest_remote_unreliable_monotonic_timestamps;
+ flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+ oldest_remote_unreliable_monotonic_timestamps_offset =
+ fbb.CreateUninitializedVector(
+ state.size(), &oldest_remote_unreliable_monotonic_timestamps);
+
+ int64_t *oldest_local_unreliable_monotonic_timestamps;
+ flatbuffers::Offset<flatbuffers::Vector<int64_t>>
+ oldest_local_unreliable_monotonic_timestamps_offset =
+ fbb.CreateUninitializedVector(
+ state.size(), &oldest_local_unreliable_monotonic_timestamps);
+
+ for (size_t i = 0; i < state.size(); ++i) {
+ oldest_remote_monotonic_timestamps[i] =
+ state[i].oldest_remote_monotonic_timestamp.time_since_epoch().count();
+ oldest_local_monotonic_timestamps[i] =
+ state[i].oldest_local_monotonic_timestamp.time_since_epoch().count();
+ oldest_remote_unreliable_monotonic_timestamps[i] =
+ state[i]
+ .oldest_remote_unreliable_monotonic_timestamp.time_since_epoch()
+ .count();
+ oldest_local_unreliable_monotonic_timestamps[i] =
+ state[i]
+ .oldest_local_unreliable_monotonic_timestamp.time_since_epoch()
+ .count();
+ }
+
aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
log_file_header_builder.add_name(name_offset);
@@ -263,6 +329,14 @@
std::chrono::duration_cast<std::chrono::nanoseconds>(
event_loop_->realtime_now().time_since_epoch())
.count());
+ log_file_header_builder.add_oldest_remote_monotonic_timestamps(
+ oldest_remote_monotonic_timestamps_offset);
+ log_file_header_builder.add_oldest_local_monotonic_timestamps(
+ oldest_local_monotonic_timestamps_offset);
+ log_file_header_builder.add_oldest_remote_unreliable_monotonic_timestamps(
+ oldest_remote_unreliable_monotonic_timestamps_offset);
+ log_file_header_builder.add_oldest_local_unreliable_monotonic_timestamps(
+ oldest_local_unreliable_monotonic_timestamps_offset);
fbb.FinishSizePrefixed(log_file_header_builder.Finish());
aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> result(
fbb.Release());