Handle log files not starting at the same time.
The monotonic clocks were assumed to be in sync. That isn't realistic.
This assumption leaked into how we kept the queues primed, and how the
event loop was initialized.
This isn't enough to actually replay in sync. We are assuming that the
realtime clocks are in sync and the monotonic clocks don't drift from
each other. That'll be good enough to get started, but not for long.
Change-Id: Ic18e31598f1a76edee0b0d5a2d7936deee1fbfec
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 84e1f43..c7f64ae 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -96,7 +96,8 @@
realtime_start_time_ = event_loop_->realtime_now();
last_synchronized_time_ = monotonic_start_time_;
- LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node());
+ LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node())
+ << " start_time " << monotonic_start_time_;
WriteHeader();
@@ -105,6 +106,10 @@
});
}
+// TODO(austin): Set the remote start time to the first time we see a remote
+// message when we are logging those messages separate? Need to signal what to
+// do, or how to get a good timestamp.
+
void Logger::WriteHeader() {
for (const Node *node : log_namer_->nodes()) {
WriteHeader(node);
@@ -224,12 +229,11 @@
fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
f.channel_index, f.log_type));
- VLOG(1) << "Writing data as node "
+ VLOG(2) << "Writing data as node "
<< FlatbufferToJson(event_loop_->node()) << " for channel "
<< configuration::CleanedChannelToString(
f.fetcher->channel())
- << " to " << f.writer->filename()
- << " data "
+ << " to " << f.writer->filename() << " data "
<< FlatbufferToJson(
flatbuffers::GetSizePrefixedRoot<MessageHeader>(
fbb.GetBufferPointer()));
@@ -248,12 +252,11 @@
f.channel_index,
LogType::kLogDeliveryTimeOnly));
- VLOG(1) << "Writing timestamps as node "
+ VLOG(2) << "Writing timestamps as node "
<< FlatbufferToJson(event_loop_->node()) << " for channel "
<< configuration::CleanedChannelToString(
f.fetcher->channel())
- << " to " << f.timestamp_writer->filename()
- << " timestamp "
+ << " to " << f.timestamp_writer->filename() << " timestamp "
<< FlatbufferToJson(
flatbuffers::GetSizePrefixedRoot<MessageHeader>(
fbb.GetBufferPointer()));
@@ -342,10 +345,6 @@
void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
- // We want to start the log file at the last start time of the log files from
- // all the nodes. Compute how long each node's simulation needs to run to
- // move time to this point.
- monotonic_clock::duration run_time = monotonic_clock::duration(0);
for (const Node *node : configuration::GetNodes(configuration())) {
auto it = channel_mergers_.insert(std::make_pair(node, State{}));
@@ -360,15 +359,50 @@
event_loop_factory->MakeEventLoop("log_reader", node);
Register(state->event_loop_unique_ptr.get());
+ }
- const monotonic_clock::duration startup_time =
- state->channel_merger->monotonic_start_time() -
- state->event_loop->monotonic_now();
- if (startup_time > run_time) {
- run_time = startup_time;
+ // Basic idea is that we want to
+ // 1) Find the node which booted first.
+ // 2) Setup the clocks so that each clock is at the time it would be at when
+ // the first node booted.
+
+ realtime_clock::time_point earliest_boot_time = realtime_clock::max_time;
+ for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
+ State *state = &(state_pair.second);
+
+ const realtime_clock::time_point boot_time =
+ state->channel_merger->realtime_start_time() -
+ state->channel_merger->monotonic_start_time().time_since_epoch();
+
+ if (boot_time < earliest_boot_time) {
+ earliest_boot_time = boot_time;
}
}
+ // We want to start the log file at the last start time of the log files from
+ // all the nodes. Compute how long each node's simulation needs to run to
+ // move time to this point.
+ monotonic_clock::duration run_time = monotonic_clock::duration(0);
+
+ for (std::pair<const Node *const, State> &state_pair : channel_mergers_) {
+ State *state = &(state_pair.second);
+
+ const realtime_clock::time_point boot_time =
+ state->channel_merger->realtime_start_time() -
+ state->channel_merger->monotonic_start_time().time_since_epoch();
+
+ // And start each node's clocks so the realtime clocks line up for the start
+ // times. This will let us start using it, but isn't good enough.
+ state->node_event_loop_factory->SetMonotonicNow(
+ monotonic_clock::time_point(earliest_boot_time - boot_time));
+ state->node_event_loop_factory->SetRealtimeOffset(
+ state->channel_merger->monotonic_start_time(),
+ state->channel_merger->realtime_start_time());
+ run_time =
+ std::max(run_time, state->channel_merger->monotonic_start_time() -
+ state->node_event_loop_factory->monotonic_now());
+ }
+
// Forwarding is tracked per channel. If it is enabled, we want to turn it
// off. Otherwise messages replayed will get forwarded across to the other
// nodes, and also replayed on the other nodes. This may not satisfy all our
@@ -390,7 +424,14 @@
}
}
+ // While we are starting the system up, we might be relying on matching data
+ // to timestamps on log files where the timestamp log file starts before the
+ // data. In this case, it is reasonable to expect missing data.
+ ignore_missing_data_ = true;
event_loop_factory_->RunFor(run_time);
+ // Now that we are running for real, missing data means that the log file is
+ // corrupted or went wrong.
+ ignore_missing_data_ = false;
}
void LogReader::Register(EventLoop *event_loop) {
@@ -443,7 +484,7 @@
if (channel_timestamp.monotonic_event_time >
state->channel_merger->monotonic_start_time() ||
event_loop_factory_ != nullptr) {
- if (!FLAGS_skip_missing_forwarding_entries ||
+ if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries) ||
channel_data.message().data() != nullptr) {
CHECK(channel_data.message().data() != nullptr)
<< ": Got a message without data. Forwarding entry which was "