Add a BootTimestamp class and move some things over
This makes it so we can compare timestamps again across boots. Move
most things over to using it instead so we can more naturally track what
happens in what order.
Note: this is not complete and has no tests. Anywhere where I'm not
comfortable that the code will handle it today should now CHECK that the
count is 0 so we explode. Tests and tolerance of reboots will come in
future patches, starting with TimestampMapper and then with the
time recover code and then LogReader.
Change-Id: I0fdbbe1860de4fe5e48d1a6cd516e7118e5942dc
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index f4d4501..f008ac6 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -527,11 +527,11 @@
}
bool Message::operator<(const Message &m2) const {
- CHECK_EQ(this->boot_count, m2.boot_count);
+ CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
- if (this->timestamp < m2.timestamp) {
+ if (this->timestamp.time < m2.timestamp.time) {
return true;
- } else if (this->timestamp > m2.timestamp) {
+ } else if (this->timestamp.time > m2.timestamp.time) {
return false;
}
@@ -546,16 +546,15 @@
bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
bool Message::operator==(const Message &m2) const {
- CHECK_EQ(this->boot_count, m2.boot_count);
+ CHECK_EQ(this->timestamp.boot, m2.timestamp.boot);
- return timestamp == m2.timestamp && channel_index == m2.channel_index &&
- queue_index == m2.queue_index;
+ return timestamp.time == m2.timestamp.time &&
+ channel_index == m2.channel_index && queue_index == m2.queue_index;
}
std::ostream &operator<<(std::ostream &os, const Message &m) {
os << "{.channel_index=" << m.channel_index
- << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp
- << ", .boot_count=" << m.boot_count;
+ << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
if (m.data.Verify()) {
os << ", .data="
<< aos::FlatbufferToJson(m.data,
@@ -573,13 +572,13 @@
if (m.remote_queue_index != 0xffffffff) {
os << ", .remote_queue_index=" << m.remote_queue_index;
}
- if (m.monotonic_remote_time != monotonic_clock::min_time) {
+ if (m.monotonic_remote_time != BootTimestamp::min_time()) {
os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
}
if (m.realtime_remote_time != realtime_clock::min_time) {
os << ", .realtime_remote_time=" << m.realtime_remote_time;
}
- if (m.monotonic_timestamp_time != monotonic_clock::min_time) {
+ if (m.monotonic_timestamp_time != BootTimestamp::min_time()) {
os << ", .monotonic_timestamp_time=" << m.monotonic_timestamp_time;
}
if (m.data.Verify()) {
@@ -600,7 +599,7 @@
// sure the nothing path is checked quickly.
if (sorted_until() != monotonic_clock::max_time) {
while (true) {
- if (!messages_.empty() && messages_.begin()->timestamp < sorted_until() &&
+ if (!messages_.empty() && messages_.begin()->timestamp.time < sorted_until() &&
sorted_until() >= monotonic_start_time()) {
break;
}
@@ -616,9 +615,11 @@
messages_.insert(Message{
.channel_index = m.value().message().channel_index(),
.queue_index = m.value().message().queue_index(),
- .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
- m.value().message().monotonic_sent_time())),
- .boot_count = parts().boot_count,
+ .timestamp =
+ BootTimestamp{
+ .boot = parts().boot_count,
+ .time = monotonic_clock::time_point(std::chrono::nanoseconds(
+ m.value().message().monotonic_sent_time()))},
.data = std::move(m.value())});
// Now, update sorted_until_ to match the new message.
@@ -640,9 +641,9 @@
return nullptr;
}
- CHECK_GE(messages_.begin()->timestamp, last_message_time_)
+ CHECK_GE(messages_.begin()->timestamp.time, last_message_time_)
<< DebugString() << " reading " << parts_message_reader_.filename();
- last_message_time_ = messages_.begin()->timestamp;
+ last_message_time_ = messages_.begin()->timestamp.time;
return &(*messages_.begin());
}
@@ -707,7 +708,7 @@
// Return the current Front if we have one, otherwise go compute one.
if (current_ != nullptr) {
Message *result = current_->Front();
- CHECK_GE(result->timestamp, last_message_time_);
+ CHECK_GE(result->timestamp.time, last_message_time_);
return result;
}
@@ -745,8 +746,8 @@
}
if (oldest) {
- CHECK_GE(oldest->timestamp, last_message_time_);
- last_message_time_ = oldest->timestamp;
+ CHECK_GE(oldest->timestamp.time, last_message_time_);
+ last_message_time_ = oldest->timestamp.time;
} else {
last_message_time_ = monotonic_clock::max_time;
}
@@ -805,10 +806,22 @@
void BootMerger::PopFront() { node_mergers_[index_]->PopFront(); }
+std::vector<const LogParts *> BootMerger::Parts() const {
+ std::vector<const LogParts *> results;
+ for (const std::unique_ptr<NodeMerger> &node_merger : node_mergers_) {
+ std::vector<const LogParts *> node_parts = node_merger->Parts();
+
+ results.insert(results.end(), std::make_move_iterator(node_parts.begin()),
+ std::make_move_iterator(node_parts.end()));
+ }
+
+ return results;
+}
+
TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
- : node_merger_(std::move(parts)),
+ : boot_merger_(std::move(parts)),
timestamp_callback_([](TimestampedMessage *) {}) {
- for (const LogParts *part : node_merger_.Parts()) {
+ for (const LogParts *part : boot_merger_.Parts()) {
if (!configuration_) {
configuration_ = part->config;
} else {
@@ -873,9 +886,9 @@
.realtime_event_time = aos::realtime_clock::time_point(
std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
.remote_queue_index = 0xffffffff,
- .monotonic_remote_time = monotonic_clock::min_time,
+ .monotonic_remote_time = BootTimestamp::min_time(),
.realtime_remote_time = realtime_clock::min_time,
- .monotonic_timestamp_time = monotonic_clock::min_time,
+ .monotonic_timestamp_time = BootTimestamp::min_time(),
.data = std::move(m->data)});
}
@@ -904,7 +917,7 @@
if (nodes_data_.empty()) {
// Simple path. We are single node, so there are no timestamps to match!
CHECK_EQ(messages_.size(), 0u);
- Message *m = node_merger_.Front();
+ Message *m = boot_merger_.Front();
if (!m) {
return false;
}
@@ -916,7 +929,7 @@
last_message_time_ = matched_messages_.back().monotonic_event_time;
// We are thin wrapper around node_merger. Call it directly.
- node_merger_.PopFront();
+ boot_merger_.PopFront();
timestamp_callback_(&matched_messages_.back());
return true;
}
@@ -931,7 +944,7 @@
}
// Now that it has been added (and cannibalized), forget about it upstream.
- node_merger_.PopFront();
+ boot_merger_.PopFront();
}
Message *m = &(messages_.front());
@@ -958,13 +971,14 @@
std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
.remote_queue_index = m->data.message().remote_queue_index(),
.monotonic_remote_time =
- monotonic_clock::time_point(std::chrono::nanoseconds(
- m->data.message().monotonic_remote_time())),
+ // TODO(austin): 0 is wrong...
+ {0, monotonic_clock::time_point(std::chrono::nanoseconds(
+ m->data.message().monotonic_remote_time()))},
.realtime_remote_time = realtime_clock::time_point(
std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
.monotonic_timestamp_time =
- monotonic_clock::time_point(std::chrono::nanoseconds(
- m->data.message().monotonic_timestamp_time())),
+ {0, monotonic_clock::time_point(std::chrono::nanoseconds(
+ m->data.message().monotonic_timestamp_time()))},
.data = std::move(data.data)});
CHECK_GE(matched_messages_.back().monotonic_event_time, last_message_time_);
last_message_time_ = matched_messages_.back().monotonic_event_time;
@@ -975,7 +989,7 @@
}
}
-void TimestampMapper::QueueUntil(monotonic_clock::time_point queue_time) {
+void TimestampMapper::QueueUntil(BootTimestamp queue_time) {
while (last_message_time_ <= queue_time) {
if (!QueueMatched()) {
return;
@@ -984,6 +998,11 @@
}
void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
+ // Note: queueing for time doesn't really work well across boots. So we just
+ // assume that if you are using this, you only care about the current boot.
+ //
+ // TODO(austin): Is that the right concept?
+ //
// Make sure we have something queued first. This makes the end time
// calculation simpler, and is typically what folks want regardless.
if (matched_messages_.empty()) {
@@ -993,14 +1012,15 @@
}
const aos::monotonic_clock::time_point end_queue_time =
- std::max(monotonic_start_time(),
- matched_messages_.front().monotonic_event_time) +
+ std::max(monotonic_start_time(
+ matched_messages_.front().monotonic_event_time.boot),
+ matched_messages_.front().monotonic_event_time.time) +
time_estimation_buffer;
// Place sorted messages on the list until we have
// --time_estimation_buffer_seconds seconds queued up (but queue at least
// until the log starts).
- while (end_queue_time >= last_message_time_) {
+ while (end_queue_time >= last_message_time_.time) {
if (!QueueMatched()) {
return;
}
@@ -1023,8 +1043,10 @@
CHECK(message.data.message().has_monotonic_remote_time());
CHECK(message.data.message().has_realtime_remote_time());
- const monotonic_clock::time_point monotonic_remote_time(
- std::chrono::nanoseconds(message.data.message().monotonic_remote_time()));
+ const BootTimestamp monotonic_remote_time{
+ .boot = 0,
+ .time = monotonic_clock::time_point(std::chrono::nanoseconds(
+ message.data.message().monotonic_remote_time()))};
const realtime_clock::time_point realtime_remote_time(
std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
@@ -1034,11 +1056,12 @@
// asked to pull a timestamp from a peer which doesn't exist, return an empty
// message.
if (peer == nullptr) {
+ // TODO(austin): Make sure the tests hit all these paths with a boot count
+ // of 1...
return Message{
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
- .boot_count = 0,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
@@ -1053,7 +1076,6 @@
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
- .boot_count = 0,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
@@ -1063,7 +1085,6 @@
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
- .boot_count = 0,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
@@ -1098,7 +1119,6 @@
.channel_index = message.channel_index,
.queue_index = remote_queue_index,
.timestamp = monotonic_remote_time,
- .boot_count = 0,
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
@@ -1117,7 +1137,7 @@
}
}
-void TimestampMapper::QueueUnmatchedUntil(monotonic_clock::time_point t) {
+void TimestampMapper::QueueUnmatchedUntil(BootTimestamp t) {
if (queued_until_ > t) {
return;
}
@@ -1129,17 +1149,17 @@
if (!Queue()) {
// Found nothing to add, we are out of data!
- queued_until_ = monotonic_clock::max_time;
+ queued_until_ = BootTimestamp::max_time();
return;
}
// Now that it has been added (and cannibalized), forget about it upstream.
- node_merger_.PopFront();
+ boot_merger_.PopFront();
}
}
bool TimestampMapper::Queue() {
- Message *m = node_merger_.Front();
+ Message *m = boot_merger_.Front();
if (m == nullptr) {
return false;
}