Merge "Add node_boots parsing from log file headers"
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 86d3930..118ddf7 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -33,7 +33,7 @@
}
bool ConfigOnly(const LogFileHeader *header) {
- CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 17u);
+ CHECK_EQ(LogFileHeader::MiniReflectTypeTable()->num_elems, 18u);
if (header->has_monotonic_start_time()) return false;
if (header->has_realtime_start_time()) return false;
if (header->has_max_out_of_order_duration()) return false;
@@ -50,6 +50,7 @@
if (header->has_parts_uuid()) return false;
if (header->has_parts_index()) return false;
if (header->has_logger_node()) return false;
+ if (header->has_boot_uuids()) return false;
return header->has_configuration();
}
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 7600c7e..cd2ceb8 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -516,7 +516,52 @@
}
PartsMessageReader::PartsMessageReader(LogParts log_parts)
- : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {}
+ : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {
+ ComputeBootCounts();
+}
+
+void PartsMessageReader::ComputeBootCounts() {
+ boot_counts_.assign(configuration::NodesCount(parts_.config.get()),
+ std::nullopt);
+
+ // We have 3 vintages of log files with different amounts of information.
+ if (log_file_header()->has_boot_uuids()) {
+ // The new hotness with the boots explicitly listed out. We can use the log
+ // file header to compute the boot count of all relevant nodes.
+ CHECK_EQ(log_file_header()->boot_uuids()->size(), boot_counts_.size());
+ size_t node_index = 0;
+ for (const flatbuffers::String *boot_uuid :
+ *log_file_header()->boot_uuids()) {
+ CHECK(parts_.boots);
+ if (boot_uuid->size() != 0) {
+ auto it = parts_.boots->boot_count_map.find(boot_uuid->str());
+ if (it != parts_.boots->boot_count_map.end()) {
+ boot_counts_[node_index] = it->second;
+ }
+ } else if (parts().boots->boots[node_index].size() == 1u) {
+ boot_counts_[node_index] = 0;
+ }
+ ++node_index;
+ }
+ } else {
+ // Older multi-node logs which are guarenteed to have UUIDs logged, or
+ // single node log files with boot UUIDs in the header. We only know how to
+ // order certain boots in certain circumstances.
+ if (configuration::MultiNode(parts_.config.get()) || parts_.boots) {
+ for (size_t node_index = 0; node_index < boot_counts_.size();
+ ++node_index) {
+ CHECK(parts_.boots);
+ if (parts().boots->boots[node_index].size() == 1u) {
+ boot_counts_[node_index] = 0;
+ }
+ }
+ } else {
+ // Really old single node logs without any UUIDs. They can't reboot.
+ CHECK_EQ(boot_counts_.size(), 1u);
+ boot_counts_[0] = 0u;
+ }
+ }
+}
std::optional<SizePrefixedFlatbufferVector<MessageHeader>>
PartsMessageReader::ReadMessage() {
@@ -557,6 +602,7 @@
return;
}
message_reader_ = MessageReader(parts_.parts[next_part_index_]);
+ ComputeBootCounts();
++next_part_index_;
}
@@ -625,7 +671,9 @@
}
LogPartsSorter::LogPartsSorter(LogParts log_parts)
- : parts_message_reader_(log_parts) {}
+ : parts_message_reader_(log_parts),
+ source_node_index_(configuration::SourceNodeIndex(parts().config.get())) {
+}
Message *LogPartsSorter::Front() {
// Queue up data until enough data has been queued that the front message is
@@ -633,7 +681,8 @@
// sure the nothing path is checked quickly.
if (sorted_until() != monotonic_clock::max_time) {
while (true) {
- if (!messages_.empty() && messages_.begin()->timestamp.time < sorted_until() &&
+ if (!messages_.empty() &&
+ messages_.begin()->timestamp.time < sorted_until() &&
sorted_until() >= monotonic_start_time()) {
break;
}
@@ -646,6 +695,20 @@
break;
}
+ size_t monotonic_timestamp_boot = 0;
+ if (m.value().message().has_monotonic_timestamp_time()) {
+ monotonic_timestamp_boot = parts().logger_boot_count;
+ }
+ size_t monotonic_remote_boot = 0xffffff;
+
+ if (m.value().message().has_monotonic_remote_time()) {
+ std::optional<size_t> boot = parts_message_reader_.boot_count(
+ source_node_index_[m->message().channel_index()]);
+ CHECK(boot) << ": Failed to find boot for node "
+ << source_node_index_[m->message().channel_index()];
+ monotonic_remote_boot = *boot;
+ }
+
messages_.insert(Message{
.channel_index = m.value().message().channel_index(),
.queue_index = m.value().message().queue_index(),
@@ -654,6 +717,8 @@
.boot = parts().boot_count,
.time = monotonic_clock::time_point(std::chrono::nanoseconds(
m.value().message().monotonic_sent_time()))},
+ .monotonic_remote_boot = monotonic_remote_boot,
+ .monotonic_timestamp_boot = monotonic_timestamp_boot,
.data = std::move(m.value())});
// Now, update sorted_until_ to match the new message.
@@ -802,9 +867,7 @@
// Now, we need to split things out by boot.
for (size_t i = 0; i < files.size(); ++i) {
- LOG(INFO) << "Trying file " << i;
const size_t boot_count = files[i].boot_count;
- LOG(INFO) << "Boot count " << boot_count;
if (boot_count + 1 > boots.size()) {
boots.resize(boot_count + 1);
}
@@ -813,9 +876,9 @@
node_mergers_.reserve(boots.size());
for (size_t i = 0; i < boots.size(); ++i) {
- LOG(INFO) << "Boot " << i;
+ VLOG(2) << "Boot " << i;
for (auto &p : boots[i]) {
- LOG(INFO) << "Part " << p;
+ VLOG(2) << "Part " << p;
}
node_mergers_.emplace_back(
std::make_unique<NodeMerger>(std::move(boots[i])));
@@ -1007,14 +1070,15 @@
std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
.remote_queue_index = m->data.message().remote_queue_index(),
.monotonic_remote_time =
- // TODO(austin): 0 is wrong...
- {0, monotonic_clock::time_point(std::chrono::nanoseconds(
- m->data.message().monotonic_remote_time()))},
+ {m->monotonic_remote_boot,
+ monotonic_clock::time_point(std::chrono::nanoseconds(
+ m->data.message().monotonic_remote_time()))},
.realtime_remote_time = realtime_clock::time_point(
std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
.monotonic_timestamp_time =
- {0, monotonic_clock::time_point(std::chrono::nanoseconds(
- m->data.message().monotonic_timestamp_time()))},
+ {m->monotonic_timestamp_boot,
+ monotonic_clock::time_point(std::chrono::nanoseconds(
+ m->data.message().monotonic_timestamp_time()))},
.data = std::move(data.data)});
CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
last_message_time_ = matched_messages_.back().monotonic_event_time;
@@ -1080,7 +1144,7 @@
CHECK(message.data.message().has_realtime_remote_time());
const BootTimestamp monotonic_remote_time{
- .boot = 0,
+ .boot = message.monotonic_remote_boot,
.time = monotonic_clock::time_point(std::chrono::nanoseconds(
message.data.message().monotonic_remote_time()))};
const realtime_clock::time_point realtime_remote_time(
@@ -1098,6 +1162,8 @@
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
@@ -1112,6 +1178,8 @@
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
@@ -1121,6 +1189,8 @@
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
@@ -1155,6 +1225,8 @@
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 93d9e8b..56b582f 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -334,10 +334,19 @@
// Note: reading the next message may change the max_out_of_order_duration().
std::optional<SizePrefixedFlatbufferVector<MessageHeader>> ReadMessage();
+ // Returns the boot count for the requested node, or std::nullopt if we don't
+ // know.
+ std::optional<size_t> boot_count(size_t node_index) const {
+ CHECK_GE(node_index, 0u);
+ CHECK_LT(node_index, boot_counts_.size());
+ return boot_counts_[node_index];
+ }
+
private:
// Opens the next log and updates message_reader_. Sets done_ if there is
// nothing more to do.
void NextLog();
+ void ComputeBootCounts();
const LogParts parts_;
size_t next_part_index_ = 1u;
@@ -354,6 +363,9 @@
bool after_start_ = false;
monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+
+ // Per node boot counts.
+ std::vector<std::optional<size_t>> boot_counts_;
};
// Struct to hold a message as it gets sorted on a single node.
@@ -365,6 +377,11 @@
// The local timestamp.
BootTimestamp timestamp;
+ // Remote boot when this is a timestamp.
+ size_t monotonic_remote_boot = 0xffffff;
+
+ size_t monotonic_timestamp_boot = 0xffffff;
+
// The data (either a timestamp header, or a data header).
SizePrefixedFlatbufferVector<MessageHeader> data;
@@ -437,6 +454,10 @@
// Set used for efficient sorting of messages. We can benchmark and evaluate
// other data structures if this proves to be the bottleneck.
absl::btree_set<Message> messages_;
+
+ // Mapping from channel to source node.
+ // TODO(austin): Should we put this in Boots so it can be cached for everyone?
+ std::vector<size_t> source_node_index_;
};
// Class to run merge sort on the messages from multiple LogPartsSorter
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 4973d02..e99db94 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -228,11 +228,15 @@
.queue_index = 0,
.timestamp =
BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(1)},
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
Message m2{.channel_index = 0,
.queue_index = 0,
.timestamp =
BootTimestamp{.boot = 0, .time = e + chrono::milliseconds(2)},
+ .monotonic_remote_boot = 0xffffff,
+ .monotonic_timestamp_boot = 0xffffff,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
EXPECT_LT(m1, m2);
@@ -306,6 +310,16 @@
"name": "/c",
"type": "aos.logger.testing.TestMessage",
"source_node": "pi1"
+ },
+ {
+ "name": "/d",
+ "type": "aos.logger.testing.TestMessage",
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1"
+ }
+ ]
}
],
"nodes": [
@@ -333,6 +347,13 @@
"monotonic_start_time": 1000000,
"realtime_start_time": 1000000000000,
"log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "boot_uuids": [
+ "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "",
+ ""
+ ],
"parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
"parts_index": 0
})")),
@@ -349,6 +370,13 @@
"monotonic_start_time": 1000000,
"realtime_start_time": 1000000000000,
"log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "boot_uuids": [
+ "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "",
+ ""
+ ],
"parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
"parts_index": 0
})")),
@@ -364,6 +392,13 @@
},
"monotonic_start_time": 0,
"realtime_start_time": 1000000000000,
+ "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+ "logger_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+ "boot_uuids": [
+ "",
+ "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+ ""
+ ],
"log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
"parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
"parts_index": 0
@@ -381,6 +416,13 @@
"monotonic_start_time": 2000000,
"realtime_start_time": 1000000000,
"log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
+ "source_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "boot_uuids": [
+ "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "",
+ ""
+ ],
"parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
"parts_index": 0
})")),
@@ -398,11 +440,19 @@
"realtime_start_time": 1000000000,
"log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
"parts_uuid": "4e560a47-e2a6-4ce3-a925-490bebc947c5",
+ "source_node_boot_uuid": "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+ "logger_node_boot_uuid": "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "boot_uuids": [
+ "1d782c63-b3c7-466e-bea9-a01308b43333",
+ "6f4269ec-547f-4a1a-8281-37aca7fe5dc2",
+ ""
+ ],
"parts_index": 0
})")) {
unlink(logfile0_.c_str());
unlink(logfile1_.c_str());
unlink(logfile2_.c_str());
+ unlink(logfile3_.c_str());
queue_index_.resize(kChannels);
}
@@ -487,6 +537,7 @@
const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
+ const std::string logfile3_ = aos::testing::TestTmpDir() + "/log3.bfbs";
const aos::FlatbufferDetachedBuffer<Configuration> config_;
const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
@@ -1817,7 +1868,12 @@
"parts_index": 0,
"logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
"logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
- "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464"
+ "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+ ""
+ ]
})")),
boot1_(MakeHeader(config_, R"({
/* 100ms */
@@ -1837,7 +1893,12 @@
"parts_index": 1,
"logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
"logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
- "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2"
+ "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+ ""
+ ]
})")) {}
protected:
@@ -1921,6 +1982,491 @@
EXPECT_EQ(output[3].timestamp.time, e + chrono::milliseconds(200));
}
+class RebootTimestampMapperTest : public SortingElementTest {
+ public:
+ RebootTimestampMapperTest()
+ : SortingElementTest(),
+ boot0a_(MakeHeader(config_, R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "logger_monotonic_start_time": 1000000,
+ "logger_realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
+ "parts_index": 0,
+ "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+ "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+ ""
+ ]
+})")),
+ boot0b_(MakeHeader(config_, R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "logger_monotonic_start_time": 1000000,
+ "logger_realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "ee4f5a98-77d0-4e01-af2f-bbb29e098ede",
+ "parts_index": 1,
+ "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+ "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "source_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+ ""
+ ]
+})")),
+ boot1a_(MakeHeader(config_, R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi2"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "logger_monotonic_start_time": 1000000,
+ "logger_realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "f6ab0cdc-a654-456d-bfd9-2bbc09098edf",
+ "parts_index": 0,
+ "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+ "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "source_node_boot_uuid": "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "6ba4f28d-21a2-4d7f-83f4-ee365cf86464",
+ ""
+ ]
+})")),
+ boot1b_(MakeHeader(config_, R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi2"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "logger_monotonic_start_time": 1000000,
+ "logger_realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "3a9d0445-f520-43ca-93f5-e2cc7f54d40a",
+ "parts_index": 1,
+ "logger_instance_uuid": "1c3142ad-10a5-408d-a760-b63b73d3b904",
+ "logger_node_boot_uuid": "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "source_node_boot_uuid": "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+ "boot_uuids": [
+ "a570df8b-5cc2-4dbe-89bd-286f9ddd02b7",
+ "b728d27a-9181-4eac-bfc1-5d09b80469d2",
+ ""
+ ]
+})")) {}
+
+ protected:
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0a_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot0b_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1a_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> boot1b_;
+};
+
+
+// Tests that we can match timestamps on delivered messages in the presence of
+// reboots on the node receiving timestamps.
+TEST_F(RebootTimestampMapperTest, ReadNode0First) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
+ writer0a.QueueSpan(boot0a_.span());
+ DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
+ writer0b.QueueSpan(boot0b_.span());
+ DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
+ writer1a.QueueSpan(boot1a_.span());
+ DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
+ writer1b.QueueSpan(boot1b_.span());
+
+ writer0a.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1a.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100),
+ e + chrono::milliseconds(1001)));
+
+ writer0b.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(20),
+ e + chrono::milliseconds(2001)));
+
+ writer0b.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1b.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(20),
+ e + chrono::milliseconds(3001)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+
+ for (const auto &x : parts) {
+ LOG(INFO) << x;
+ }
+ ASSERT_EQ(parts.size(), 1u);
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+
+ size_t mapper0_count = 0;
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ EXPECT_EQ(mapper0_count, 0u);
+ EXPECT_EQ(mapper1_count, 0u);
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 2u);
+ EXPECT_EQ(mapper1_count, 0u);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ LOG(INFO) << output0[0];
+ LOG(INFO) << output0[1];
+ LOG(INFO) << output0[2];
+
+ EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[0].monotonic_event_time.time,
+ e + chrono::milliseconds(1000));
+ EXPECT_EQ(output0[0].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output0[0].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output0[0].data.Verify());
+
+ EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[1].monotonic_event_time.time,
+ e + chrono::milliseconds(2000));
+ EXPECT_EQ(output0[1].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output0[1].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output0[1].data.Verify());
+
+ EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[2].monotonic_event_time.time,
+ e + chrono::milliseconds(3000));
+ EXPECT_EQ(output0[2].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output0[2].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output0[2].data.Verify());
+ }
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 2u);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output1[0].monotonic_event_time.time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_EQ(output1[0].monotonic_remote_time.boot, 0u);
+ EXPECT_EQ(output1[0].monotonic_remote_time.time,
+ e + chrono::milliseconds(1000));
+ EXPECT_EQ(output1[0].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output1[0].monotonic_timestamp_time.time,
+ e + chrono::milliseconds(1001));
+ EXPECT_TRUE(output1[0].data.Verify());
+
+ EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
+ EXPECT_EQ(output1[1].monotonic_event_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(2000));
+ EXPECT_EQ(output1[1].monotonic_remote_time.boot, 0u);
+ EXPECT_EQ(output1[1].monotonic_remote_time.time,
+ e + chrono::milliseconds(2000));
+ EXPECT_EQ(output1[1].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output1[1].monotonic_timestamp_time.time,
+ e + chrono::milliseconds(2001));
+ EXPECT_TRUE(output1[1].data.Verify());
+
+ EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
+ EXPECT_EQ(output1[2].monotonic_event_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(3000));
+ EXPECT_EQ(output1[2].monotonic_remote_time.boot, 0u);
+ EXPECT_EQ(output1[2].monotonic_remote_time.time,
+ e + chrono::milliseconds(3000));
+ EXPECT_EQ(output1[2].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output1[2].monotonic_timestamp_time.time,
+ e + chrono::milliseconds(3001));
+ EXPECT_TRUE(output1[2].data.Verify());
+
+ LOG(INFO) << output1[0];
+ LOG(INFO) << output1[1];
+ LOG(INFO) << output1[2];
+ }
+}
+
+TEST_F(RebootTimestampMapperTest, Node2Reboot) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0a(logfile0_, std::make_unique<DummyEncoder>());
+ writer0a.QueueSpan(boot0a_.span());
+ DetachedBufferWriter writer0b(logfile1_, std::make_unique<DummyEncoder>());
+ writer0b.QueueSpan(boot0b_.span());
+ DetachedBufferWriter writer1a(logfile2_, std::make_unique<DummyEncoder>());
+ writer1a.QueueSpan(boot1a_.span());
+ DetachedBufferWriter writer1b(logfile3_, std::make_unique<DummyEncoder>());
+ writer1b.QueueSpan(boot1b_.span());
+
+ writer1a.QueueSizedFlatbuffer(MakeLogMessage(
+ e + chrono::seconds(100) + chrono::milliseconds(1000), 3, 0x005));
+ writer0a.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::seconds(100) + chrono::milliseconds(1000), 3,
+ chrono::seconds(-100),
+ e + chrono::seconds(100) + chrono::milliseconds(1001)));
+
+ writer1b.QueueSizedFlatbuffer(MakeLogMessage(
+ e + chrono::seconds(20) + chrono::milliseconds(2000), 3, 0x006));
+ writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::seconds(20) + chrono::milliseconds(2000), 3,
+ chrono::seconds(-20),
+ e + chrono::seconds(20) + chrono::milliseconds(2001)));
+
+ writer1b.QueueSizedFlatbuffer(MakeLogMessage(
+ e + chrono::seconds(20) + chrono::milliseconds(3000), 3, 0x007));
+ writer0b.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::seconds(20) + chrono::milliseconds(3000), 3,
+ chrono::seconds(-20),
+ e + chrono::seconds(20) + chrono::milliseconds(3001)));
+ }
+
+ const std::vector<LogFile> parts =
+ SortParts({logfile0_, logfile1_, logfile2_, logfile3_});
+
+ for (const auto &x : parts) {
+ LOG(INFO) << x;
+ }
+ ASSERT_EQ(parts.size(), 1u);
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+
+ size_t mapper0_count = 0;
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ mapper0.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper0_count; });
+ size_t mapper1_count = 0;
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+ mapper1.set_timestamp_callback(
+ [&](TimestampedMessage *) { ++mapper1_count; });
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ EXPECT_EQ(mapper0_count, 0u);
+ EXPECT_EQ(mapper1_count, 0u);
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+ EXPECT_EQ(mapper0_count, 1u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 2u);
+ EXPECT_EQ(mapper1_count, 0u);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_TRUE(mapper0.started());
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ LOG(INFO) << output0[0];
+ LOG(INFO) << output0[1];
+ LOG(INFO) << output0[2];
+
+ EXPECT_EQ(output0[0].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[0].monotonic_event_time.time,
+ e + chrono::milliseconds(1000));
+ EXPECT_EQ(output0[0].monotonic_remote_time.boot, 0u);
+ EXPECT_EQ(output0[0].monotonic_remote_time.time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_EQ(output0[0].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output0[0].monotonic_timestamp_time.time,
+ e + chrono::seconds(100) + chrono::milliseconds(1001));
+ EXPECT_TRUE(output0[0].data.Verify());
+
+ EXPECT_EQ(output0[1].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[1].monotonic_event_time.time,
+ e + chrono::milliseconds(2000));
+ EXPECT_EQ(output0[1].monotonic_remote_time.boot, 1u);
+ EXPECT_EQ(output0[1].monotonic_remote_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(2000));
+ EXPECT_EQ(output0[1].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output0[1].monotonic_timestamp_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(2001));
+ EXPECT_TRUE(output0[1].data.Verify());
+
+ EXPECT_EQ(output0[2].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output0[2].monotonic_event_time.time,
+ e + chrono::milliseconds(3000));
+ EXPECT_EQ(output0[2].monotonic_remote_time.boot, 1u);
+ EXPECT_EQ(output0[2].monotonic_remote_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(3000));
+ EXPECT_EQ(output0[2].monotonic_timestamp_time.boot, 0u);
+ EXPECT_EQ(output0[2].monotonic_timestamp_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(3001));
+ EXPECT_TRUE(output0[2].data.Verify());
+ }
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 0u);
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 1u);
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 2u);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_TRUE(mapper1.started());
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(mapper0_count, 3u);
+ EXPECT_EQ(mapper1_count, 3u);
+
+ EXPECT_EQ(output1[0].monotonic_event_time.boot, 0u);
+ EXPECT_EQ(output1[0].monotonic_event_time.time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_EQ(output1[0].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output1[0].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output1[0].data.Verify());
+
+ EXPECT_EQ(output1[1].monotonic_event_time.boot, 1u);
+ EXPECT_EQ(output1[1].monotonic_event_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(2000));
+ EXPECT_EQ(output1[1].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output1[1].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output1[1].data.Verify());
+
+ EXPECT_EQ(output1[2].monotonic_event_time.boot, 1u);
+ EXPECT_EQ(output1[2].monotonic_event_time.time,
+ e + chrono::seconds(20) + chrono::milliseconds(3000));
+ EXPECT_EQ(output1[2].monotonic_remote_time, BootTimestamp::min_time());
+ EXPECT_EQ(output1[2].monotonic_timestamp_time, BootTimestamp::min_time());
+ EXPECT_TRUE(output1[2].data.Verify());
+
+ LOG(INFO) << output1[0];
+ LOG(INFO) << output1[1];
+ LOG(INFO) << output1[2];
+ }
+}
+
} // namespace testing
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logger.fbs b/aos/events/logging/logger.fbs
index 2896963..d5f1248 100644
--- a/aos/events/logging/logger.fbs
+++ b/aos/events/logging/logger.fbs
@@ -93,6 +93,10 @@
// The node the data was logged on, if known and running in a multi-node configuration.
logger_node:Node (id: 9);
+
+ // The boot UUIDs for all nodes we know them for, or "" for the ones we don't.
+ // TODO(austin): Actually add this in the log writer.
+ boot_uuids:[string] (id: 17);
}
// Table holding a message.