Track file ending more carefully
The old method was failing on long compressed logs with unclean
shutdown. We were getting large differences in end times between parts
and CHECKing. Instead, track when we stop seeing data on a channel, and
when that happens, eat the rest of the log file, confirming that the data
on that channel doesn't come back. This makes it so we don't try to
determine if we got to the end or not, and instead just confirm that we
don't have gaps in the data. That is a more direct measurement of what
we actually care about.
Change-Id: I20debdf5623788f66f6c1f01dcb066c8a7ba7ef6
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 45bda51..2dd4701 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -981,6 +981,7 @@
}
}
}
+
MakeRemappedConfig();
// Remap all existing remote timestamp channels. They will be recreated, and
@@ -1237,13 +1238,16 @@
// While we are starting the system up, we might be relying on matching data
// to timestamps on log files where the timestamp log file starts before the
// data. In this case, it is reasonable to expect missing data.
- ignore_missing_data_ = true;
- VLOG(1) << "Running until " << start_time << " in Register";
- event_loop_factory_->RunFor(start_time.time_since_epoch());
- VLOG(1) << "At start time";
- // Now that we are running for real, missing data means that the log file is
- // corrupted or went wrong.
- ignore_missing_data_ = false;
+ {
+ const bool prior_ignore_missing_data = ignore_missing_data_;
+ ignore_missing_data_ = true;
+ VLOG(1) << "Running until " << start_time << " in Register";
+ event_loop_factory_->RunFor(start_time.time_since_epoch());
+ VLOG(1) << "At start time";
+ // Now that we are running for real, missing data means that the log file is
+ // corrupted or went wrong.
+ ignore_missing_data_ = prior_ignore_missing_data;
+ }
for (std::unique_ptr<State> &state : states_) {
// Make the RT clock be correct before handing it to the user.
@@ -1356,8 +1360,7 @@
return;
}
- bool update_time;
- TimestampedMessage timestamped_message = state->PopOldest(&update_time);
+ TimestampedMessage timestamped_message = state->PopOldest();
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
@@ -1381,18 +1384,9 @@
if (timestamped_message.monotonic_event_time >
state->monotonic_start_time() ||
event_loop_factory_ != nullptr) {
- if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
- !state->at_end()) ||
- timestamped_message.data.span().size() != 0u) {
- CHECK_NE(timestamped_message.data.span().size(), 0u)
- << ": Got a message without data on channel "
- << configuration::CleanedChannelToString(
- logged_configuration()->channels()->Get(
- timestamped_message.channel_index))
- << ". Forwarding entry which was not matched? Use "
- "--skip_missing_forwarding_entries to ignore this.";
-
- if (update_time) {
+ if (timestamped_message.data.span().size() != 0u) {
+ if (timestamped_message.monotonic_remote_time !=
+ monotonic_clock::min_time) {
// Confirm that the message was sent on the sending node before the
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
@@ -1451,16 +1445,44 @@
// TODO(austin): std::move channel_data in and make that efficient in
// simulation.
state->Send(std::move(timestamped_message));
- } else if (state->at_end() && !ignore_missing_data_) {
- // We are at the end of the log file and found missing data. Finish
- // reading the rest of the log file and call it quits. We don't want
- // to replay partial data.
+ } else if (!ignore_missing_data_ &&
+ !FLAGS_skip_missing_forwarding_entries) {
+ // We've found a timestamp without data. This likely means that we are
+ // at the end of the log file. Record it and CHECK that in the rest of
+ // the log file, we don't find any more data on that channel. Not all
+ // channels will end at the same point in time since they can be in
+ // different files.
+ VLOG(1) << "Found the last message on channel "
+ << timestamped_message.channel_index;
+
+ // Vector storing if we've seen a nullptr message or not per channel.
+ std::vector<bool> last_message;
+ last_message.resize(logged_configuration()->channels()->size(), false);
+
+ last_message[timestamped_message.channel_index] = true;
+
+ // Now that we found the end of one channel, artificially stop the
+ // rest. It is confusing when part of your data gets replayed but not
+ // all.
while (state->OldestMessageTime() != monotonic_clock::max_time) {
- bool update_time_dummy;
- state->PopOldest(&update_time_dummy);
+ TimestampedMessage next = state->PopOldest();
+ // Make sure that once we have seen the last message on a channel,
+ // data doesn't start back up again. If the user wants to play
+ // through events like this, they can set
+ // --skip_missing_forwarding_entries or ignore_missing_data_.
+ CHECK_LT(next.channel_index, last_message.size());
+ if (next.data.span().size() == 0u) {
+ last_message[next.channel_index] = true;
+ } else {
+ if (last_message[next.channel_index]) {
+ LOG(FATAL)
+ << "Found missing data in the middle of the log file on "
+ "channel "
+ << next.channel_index << " Last "
+ << last_message[next.channel_index] << state->DebugString();
+ }
+ }
}
- } else {
- CHECK(timestamped_message.data.span().data() == nullptr) << ": Nullptr";
}
} else {
LOG(WARNING)
@@ -1491,70 +1513,6 @@
}
}
- // Once we make this call, the current time changes. So do everything
- // which involves time before changing it. That especially includes
- // sending the message.
- if (update_time) {
- VLOG(1) << MaybeNodeName(state->event_loop()->node())
- << "updating offsets";
-
- std::vector<aos::monotonic_clock::time_point> before_times;
- before_times.resize(states_.size());
- std::transform(states_.begin(), states_.end(), before_times.begin(),
- [](const std::unique_ptr<State> &state) {
- return state->monotonic_now();
- });
-
- VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Now is now "
- << state->monotonic_now();
-
- // TODO(austin): We should be perfect.
- const std::chrono::nanoseconds kTolerance{3};
- if (!FLAGS_skip_order_validation) {
- CHECK_GE(next_time, state->monotonic_now())
- << ": Time skipped the next event, just sent "
- << timestamped_message << ", sending next " << state->PeekOldest();
-
- for (size_t i = 0; i < states_.size(); ++i) {
- CHECK_GE(states_[i]->monotonic_now(), before_times[i] - kTolerance)
- << ": Time changed too much on node "
- << MaybeNodeName(states_[i]->event_loop()->node());
- CHECK_LE(states_[i]->monotonic_now(), before_times[i] + kTolerance)
- << ": Time changed too much on node "
- << MaybeNodeName(states_[i]->event_loop()->node());
- }
- } else {
- if (next_time < state->monotonic_now()) {
- LOG(WARNING) << "Check failed: next_time >= "
- "state->monotonic_now() ("
- << next_time << " vs. " << state->monotonic_now()
- << "): Time skipped the next event, just sent "
- << timestamped_message << ", sending next "
- << state->PeekOldest();
- }
- for (size_t i = 0; i < states_.size(); ++i) {
- if (states_[i]->monotonic_now() < before_times[i] - kTolerance) {
- LOG(WARNING) << "Check failed: "
- "states_[i]->monotonic_now() "
- ">= before_times[i] - kTolerance ("
- << states_[i]->monotonic_now() << " vs. "
- << before_times[i] - kTolerance
- << ") : Time changed too much on node "
- << MaybeNodeName(states_[i]->event_loop()->node());
- }
- if (states_[i]->monotonic_now() > before_times[i] + kTolerance) {
- LOG(WARNING) << "Check failed: "
- "states_[i]->monotonic_now() "
- "<= before_times[i] + kTolerance ("
- << states_[i]->monotonic_now() << " vs. "
- << before_times[i] + kTolerance
- << ") : Time changed too much on node "
- << MaybeNodeName(states_[i]->event_loop()->node());
- }
- }
- }
- }
-
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Done sending at "
<< state->event_loop()->context().monotonic_event_time << " now "
<< state->monotonic_now();
@@ -2129,11 +2087,7 @@
return sender->second.get();
}
-const TimestampedMessage &LogReader::State::PeekOldest() {
- return std::get<0>(sorted_messages_.front());
-}
-
-TimestampedMessage LogReader::State::PopOldest(bool *update_time) {
+TimestampedMessage LogReader::State::PopOldest() {
CHECK_GT(sorted_messages_.size(), 0u);
std::tuple<TimestampedMessage, message_bridge::NoncausalOffsetEstimator *>
@@ -2144,10 +2098,8 @@
SeedSortedMessages();
if (std::get<1>(result) != nullptr) {
- *update_time = std::get<1>(result)->Pop(
+ std::get<1>(result)->Pop(
event_loop_->node(), std::get<0>(result).monotonic_event_time);
- } else {
- *update_time = false;
}
return std::move(std::get<0>(result));
}
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index 4aa45a1..948b3a0 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -552,13 +552,9 @@
// Connects up the timestamp mappers.
void AddPeer(State *peer);
- // Returns the timestamps, channel_index, and message from a channel.
- // update_time (will be) set to true when popping this message causes the
- // filter to change the time offset estimation function.
- TimestampedMessage PopOldest(bool *update_time);
-
- // Returns the oldest message (if it exists) non destructively.
- const TimestampedMessage &PeekOldest();
+ // Returns the next sorted message with all the timestamps extracted and
+ // matched.
+ TimestampedMessage PopOldest();
// Returns the monotonic time of the oldest message.
monotonic_clock::time_point OldestMessageTime() const;
@@ -639,11 +635,6 @@
RemoteMessageSender *remote_timestamp_sender,
State *source_state);
- // Returns if we have read all the messages from all the logs.
- bool at_end() const {
- return timestamp_mapper_ ? timestamp_mapper_->Front() == nullptr : true;
- }
-
// Unregisters everything so we can destory the event loop.
void Deregister();
@@ -671,6 +662,8 @@
<< configuration::StrippedChannelToString(
event_loop_->configuration()->channels()->Get(
std::get<0>(message).channel_index))
+ << (std::get<0>(message).data.span().size() == 0 ? " null"
+ : " data")
<< "\n";
} else if (i == 7) {
messages << "...\n";