Add a BootTimestamp class and move some things over
This makes it so we can compare timestamps again across boots. Move
most things over to using it instead so we can more naturally track what
happens in what order.
Note: this is not complete and has no tests. Anywhere where I'm not
comfortable that the code will handle it today should now CHECK that the
count is 0 so we explode. Tests and tolerance of reboots will come in
future patches, starting with TimestampMapper and then with the
time recover code and then LogReader.
Change-Id: I0fdbbe1860de4fe5e48d1a6cd516e7118e5942dc
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index e295fc1..3cf7fd5 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -11,6 +11,7 @@
#include "absl/strings/escaping.h"
#include "absl/types/span.h"
#include "aos/events/event_loop.h"
+#include "aos/events/logging/boot_timestamp.h"
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/flatbuffer_merge.h"
@@ -280,7 +281,8 @@
states_[configuration::GetNodeIndex(configuration(), node)].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
- return state->monotonic_start_time();
+ // TODO(austin): Un-hard-code the 0 boot count.
+ return state->monotonic_start_time(0);
}
realtime_clock::time_point LogReader::realtime_start_time(
@@ -289,7 +291,8 @@
states_[configuration::GetNodeIndex(configuration(), node)].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
- return state->realtime_start_time();
+ // TODO(austin): Un-hard-code the 0 boot count.
+ return state->realtime_start_time(0);
}
void LogReader::Register() {
@@ -393,16 +396,16 @@
// running until the last node.
for (std::unique_ptr<State> &state : states_) {
- VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
+ VLOG(1) << "Start time is " << state->monotonic_start_time(0) << " for node "
<< MaybeNodeName(state->event_loop()->node()) << "now "
<< state->monotonic_now();
- if (state->monotonic_start_time() == monotonic_clock::min_time) {
+ if (state->monotonic_start_time(0) == monotonic_clock::min_time) {
continue;
}
// And start computing the start time on the distributed clock now that
// that works.
start_time = std::max(
- start_time, state->ToDistributedClock(state->monotonic_start_time()));
+ start_time, state->ToDistributedClock(state->monotonic_start_time(0)));
}
// TODO(austin): If a node doesn't have a start time, we might not queue
@@ -452,11 +455,11 @@
for (std::unique_ptr<State> &state : states_) {
// Make the RT clock be correct before handing it to the user.
- if (state->realtime_start_time() != realtime_clock::min_time) {
- state->SetRealtimeOffset(state->monotonic_start_time(),
- state->realtime_start_time());
+ if (state->realtime_start_time(0) != realtime_clock::min_time) {
+ state->SetRealtimeOffset(state->monotonic_start_time(0),
+ state->realtime_start_time(0));
}
- VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
+ VLOG(1) << "Start time is " << state->monotonic_start_time(0) << " for node "
<< MaybeNodeName(state->event_loop()->node()) << "now "
<< state->monotonic_now();
}
@@ -555,16 +558,20 @@
}
TimestampedMessage timestamped_message = state->PopOldest();
+ CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+ CHECK_EQ(timestamped_message.monotonic_timestamp_time.boot, 0u);
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
if (!FLAGS_skip_order_validation) {
- CHECK(monotonic_now == timestamped_message.monotonic_event_time)
+ CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
<< ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
<< monotonic_now << " trying to send "
<< timestamped_message.monotonic_event_time << " failure "
<< state->DebugString();
- } else if (monotonic_now != timestamped_message.monotonic_event_time) {
+ } else if (BootTimestamp{.boot = 0u, .time = monotonic_now} !=
+ timestamped_message.monotonic_event_time) {
LOG(WARNING) << "Check failed: monotonic_now == "
"timestamped_message.monotonic_event_time) ("
<< monotonic_now << " vs. "
@@ -575,12 +582,13 @@
<< state->DebugString();
}
- if (timestamped_message.monotonic_event_time >
- state->monotonic_start_time() ||
+ if (timestamped_message.monotonic_event_time.time >
+ state->monotonic_start_time(
+ timestamped_message.monotonic_event_time.boot) ||
event_loop_factory_ != nullptr) {
if (timestamped_message.data.span().size() != 0u) {
if (timestamped_message.monotonic_remote_time !=
- monotonic_clock::min_time) {
+ BootTimestamp::min_time()) {
// Confirm that the message was sent on the sending node before the
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
@@ -591,7 +599,7 @@
// fix that.
if (!FLAGS_skip_order_validation) {
CHECK_LE(
- timestamped_message.monotonic_remote_time,
+ timestamped_message.monotonic_remote_time.time,
state->monotonic_remote_now(timestamped_message.channel_index))
<< state->event_loop()->node()->name()->string_view() << " to "
<< state->remote_node(timestamped_message.channel_index)
@@ -602,7 +610,7 @@
logged_configuration()->channels()->Get(
timestamped_message.channel_index))
<< " " << state->DebugString();
- } else if (timestamped_message.monotonic_remote_time >
+ } else if (timestamped_message.monotonic_remote_time.time >
state->monotonic_remote_now(
timestamped_message.channel_index)) {
LOG(WARNING)
@@ -620,18 +628,18 @@
<< " currently " << timestamped_message.monotonic_event_time
<< " ("
<< state->ToDistributedClock(
- timestamped_message.monotonic_event_time)
+ timestamped_message.monotonic_event_time.time)
<< ") remote event time "
<< timestamped_message.monotonic_remote_time << " ("
<< state->RemoteToDistributedClock(
timestamped_message.channel_index,
- timestamped_message.monotonic_remote_time)
+ timestamped_message.monotonic_remote_time.time)
<< ") " << state->DebugString();
}
}
// If we have access to the factory, use it to fix the realtime time.
- state->SetRealtimeOffset(timestamped_message.monotonic_event_time,
+ state->SetRealtimeOffset(timestamped_message.monotonic_event_time.time,
timestamped_message.realtime_event_time);
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
@@ -643,8 +651,9 @@
// When starting up, we can have data which was sent before the
// log starts, but the timestamp was after the log starts. This
// is unreasonable to avoid, so ignore the missing data.
- timestamped_message.monotonic_remote_time >=
+ timestamped_message.monotonic_remote_time.time >=
state->monotonic_remote_start_time(
+ timestamped_message.monotonic_remote_time.boot,
timestamped_message.channel_index) &&
!FLAGS_skip_missing_forwarding_entries) {
// We've found a timestamp without data that we expect to have data for.
@@ -725,13 +734,15 @@
}
}
} else {
- LOG(WARNING)
- << "Not sending data from before the start of the log file. "
- << timestamped_message.monotonic_event_time.time_since_epoch().count()
- << " start " << monotonic_start_time().time_since_epoch().count()
- << " "
- << FlatbufferToJson(timestamped_message.data,
- {.multi_line = false, .max_vector_size = 100});
+ LOG(WARNING) << "Not sending data from before the start of the log file. "
+ << timestamped_message.monotonic_event_time.time
+ .time_since_epoch()
+ .count()
+ << " start "
+ << monotonic_start_time().time_since_epoch().count() << " "
+ << FlatbufferToJson(
+ timestamped_message.data,
+ {.multi_line = false, .max_vector_size = 100});
}
const monotonic_clock::time_point next_time = state->OldestMessageTime();
@@ -1176,7 +1187,8 @@
uint32_t queue_index;
} search;
- search.monotonic_event_time = timestamped_message.monotonic_remote_time;
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+ search.monotonic_event_time = timestamped_message.monotonic_remote_time.time;
search.queue_index = timestamped_message.remote_queue_index;
// Find the sent time if available.
@@ -1207,9 +1219,11 @@
// other node isn't done yet. So there is no send time, but there is a
// receive time.
if (element != queue_index_map->end()) {
- CHECK_GE(timestamped_message.monotonic_remote_time,
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
+
+ CHECK_GE(timestamped_message.monotonic_remote_time.time,
element->starting_monotonic_event_time);
- CHECK_LE(timestamped_message.monotonic_remote_time,
+ CHECK_LE(timestamped_message.monotonic_remote_time.time,
element->ending_monotonic_event_time);
CHECK_GE(timestamped_message.remote_queue_index,
element->starting_queue_index);
@@ -1226,10 +1240,11 @@
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
const bool sent = sender->Send(
timestamped_message.data.message().data()->Data(),
timestamped_message.data.message().data()->size(),
- timestamped_message.monotonic_remote_time,
+ timestamped_message.monotonic_remote_time.time,
timestamped_message.realtime_remote_time, remote_queue_index,
(channel_source_state_[timestamped_message.channel_index] != nullptr
? CHECK_NOTNULL(
@@ -1242,9 +1257,10 @@
if (queue_index_map_[timestamped_message.channel_index]->empty()) {
// Nothing here, start a range with 0 length.
ContiguousSentTimestamp timestamp;
+ CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
timestamp.starting_monotonic_event_time =
timestamp.ending_monotonic_event_time =
- timestamped_message.monotonic_event_time;
+ timestamped_message.monotonic_event_time.time;
timestamp.starting_queue_index = timestamp.ending_queue_index =
timestamped_message.queue_index;
timestamp.actual_queue_index = sender->sent_queue_index();
@@ -1258,14 +1274,16 @@
if ((back->starting_queue_index - back->actual_queue_index) ==
(timestamped_message.queue_index - sender->sent_queue_index())) {
back->ending_queue_index = timestamped_message.queue_index;
+ CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
back->ending_monotonic_event_time =
- timestamped_message.monotonic_event_time;
+ timestamped_message.monotonic_event_time.time;
} else {
// Otherwise, make a new one.
+ CHECK_EQ(timestamped_message.monotonic_event_time.boot, 0u);
ContiguousSentTimestamp timestamp;
timestamp.starting_monotonic_event_time =
timestamp.ending_monotonic_event_time =
- timestamped_message.monotonic_event_time;
+ timestamped_message.monotonic_event_time.time;
timestamp.starting_queue_index = timestamp.ending_queue_index =
timestamped_message.queue_index;
timestamp.actual_queue_index = sender->sent_queue_index();
@@ -1297,8 +1315,10 @@
sender->realtime_sent_time().time_since_epoch().count());
message_header_builder.add_queue_index(sender->sent_queue_index());
+ CHECK_EQ(timestamped_message.monotonic_remote_time.boot, 0u);
message_header_builder.add_monotonic_remote_time(
- timestamped_message.monotonic_remote_time.time_since_epoch().count());
+ timestamped_message.monotonic_remote_time.time.time_since_epoch()
+ .count());
message_header_builder.add_realtime_remote_time(
timestamped_message.realtime_remote_time.time_since_epoch().count());
@@ -1307,9 +1327,10 @@
fbb.Finish(message_header_builder.Finish());
+ CHECK_EQ(timestamped_message.monotonic_timestamp_time.boot, 0u);
remote_timestamp_senders_[timestamped_message.channel_index]->Send(
FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
- timestamped_message.monotonic_timestamp_time);
+ timestamped_message.monotonic_timestamp_time.time);
}
return true;
@@ -1435,7 +1456,9 @@
timestamp_mapper_->PopFront();
SeedSortedMessages();
- if (result.monotonic_remote_time != monotonic_clock::min_time) {
+ CHECK_EQ(result.monotonic_remote_time.boot, 0u);
+
+ if (result.monotonic_remote_time.time != monotonic_clock::min_time) {
message_bridge::NoncausalOffsetEstimator *filter =
filters_[result.channel_index];
CHECK(filter != nullptr);
@@ -1459,9 +1482,10 @@
if (result_ptr == nullptr) {
return monotonic_clock::max_time;
}
+ CHECK_EQ(result_ptr->monotonic_event_time.boot, 0u);
VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
- << result_ptr->monotonic_event_time;
- return result_ptr->monotonic_event_time;
+ << result_ptr->monotonic_event_time.time;
+ return result_ptr->monotonic_event_time.time;
}
void LogReader::State::SeedSortedMessages() {