Fix a log-reading bug
I can't manage to reproduce it in a test, but the new CHECK should catch
it robustly.
Change-Id: I4e8baab929326bd3de12865d37703796d5cf2417
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 8e5e27c..7125bda 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -461,7 +461,7 @@
}
} else {
if (!NextLogFile()) {
- VLOG(1) << "End of log file " << filenames_.back();
+ VLOG(1) << "No more files, last was " << filenames_.back();
at_end_ = true;
for (MessageHeaderQueue *queue : channels_to_write_) {
if (queue == nullptr || queue->timestamp_merger == nullptr) {
@@ -541,7 +541,8 @@
std::move(channels_[channel_index].data.front());
channels_[channel_index].data.pop_front();
- VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+ VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp) << " for "
+ << channel_index;
QueueMessages(std::get<0>(timestamp));
@@ -559,7 +560,8 @@
std::move(channels_[channel].timestamps[node_index].front());
channels_[channel].timestamps[node_index].pop_front();
- VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+ VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp) << " for "
+ << channel << " on " << node_index;
QueueMessages(std::get<0>(timestamp));
@@ -693,7 +695,7 @@
// If we are just a data merger, don't wait for timestamps.
if (!has_timestamps_) {
- channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+ channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
pushed_ = true;
}
}
@@ -737,7 +739,7 @@
// If we are a timestamp merger, don't wait for data. Missing data will be
// caught at read time.
if (has_timestamps_) {
- channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+ channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
pushed_ = true;
}
}
@@ -766,7 +768,7 @@
CHECK_EQ(std::get<1>(oldest_message), std::get<1>(oldest_message_reader));
// Now, keep reading until we have found all duplicates.
- while (message_heap_.size() > 0u) {
+ while (!message_heap_.empty()) {
// See if it is a duplicate.
std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
next_oldest_message_reader = message_heap_.front();
@@ -888,7 +890,7 @@
std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
// See if we have any data. If not, pass the problem up the chain.
- if (message_heap_.size() == 0u) {
+ if (message_heap_.empty()) {
VLOG(1) << "No data to match timestamp on "
<< configuration::CleanedChannelToString(
configuration_->channels()->Get(channel_index_));
@@ -1072,14 +1074,14 @@
}
monotonic_clock::time_point ChannelMerger::OldestMessage() const {
- if (channel_heap_.size() == 0u) {
+ if (channel_heap_.empty()) {
return monotonic_clock::max_time;
}
return channel_heap_.front().first;
}
TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestamp() const {
- if (timestamp_heap_.size() == 0u) {
+ if (timestamp_heap_.empty()) {
return TimestampMerger::DeliveryTimestamp{};
}
return timestamp_mergers_[timestamp_heap_.front().second].OldestTimestamp();
@@ -1101,20 +1103,34 @@
// Pop and recreate the heap if it has already been pushed. And since we are
// pushing again, we don't need to clear pushed.
if (timestamp_mergers_[channel_index].pushed()) {
- channel_heap_.erase(std::find_if(
+ const auto channel_iterator = std::find_if(
channel_heap_.begin(), channel_heap_.end(),
[channel_index](const std::pair<monotonic_clock::time_point, int> x) {
return x.second == channel_index;
- }));
+ });
+ DCHECK(channel_iterator != channel_heap_.end());
+ if (std::get<0>(*channel_iterator) == timestamp) {
+ // It's already in the heap, in the correct spot, so nothing
+ // more for us to do here.
+ return;
+ }
+ channel_heap_.erase(channel_iterator);
std::make_heap(channel_heap_.begin(), channel_heap_.end(),
ChannelHeapCompare);
if (timestamp_mergers_[channel_index].has_timestamps()) {
- timestamp_heap_.erase(std::find_if(
+ const auto timestamp_iterator = std::find_if(
timestamp_heap_.begin(), timestamp_heap_.end(),
[channel_index](const std::pair<monotonic_clock::time_point, int> x) {
return x.second == channel_index;
- }));
+ });
+ DCHECK(timestamp_iterator != timestamp_heap_.end());
+ if (std::get<0>(*timestamp_iterator) == timestamp) {
+ // It's already in the heap, in the correct spot, so nothing
+ // more for us to do here.
+ return;
+ }
+ timestamp_heap_.erase(timestamp_iterator);
std::make_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
ChannelHeapCompare);
}
@@ -1164,6 +1180,10 @@
std::tuple<TimestampMerger::DeliveryTimestamp,
FlatbufferVector<MessageHeader>>
message = merger->PopOldest();
+ DCHECK_EQ(std::get<0>(message).monotonic_event_time,
+ oldest_channel_data.first)
+ << ": channel_heap_ was corrupted for " << channel_index << ": "
+ << DebugString();
return std::make_tuple(std::get<0>(message), channel_index,
std::move(std::get<1>(message)));
@@ -1247,7 +1267,7 @@
std::vector<
std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
message_heap = message_heap_;
- while (message_heap.size() > 0u) {
+ while (!message_heap.empty()) {
std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
oldest_message_reader = message_heap.front();
@@ -1276,7 +1296,7 @@
ss << "channel_heap {\n";
std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
channel_heap_;
- while (channel_heap.size() > 0u) {
+ while (!channel_heap.empty()) {
std::tuple<monotonic_clock::time_point, int> channel = channel_heap.front();
ss << " " << std::get<0>(channel) << " (" << std::get<1>(channel) << ") "
<< configuration::CleanedChannelToString(
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index e767427..8969ee2 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -599,7 +599,9 @@
// to timestamps on log files where the timestamp log file starts before the
// data. In this case, it is reasonable to expect missing data.
ignore_missing_data_ = true;
+ VLOG(1) << "Running until start time: " << start_time;
event_loop_factory_->RunFor(start_time.time_since_epoch());
+ VLOG(1) << "At start time";
// Now that we are running for real, missing data means that the log file is
// corrupted or went wrong.
ignore_missing_data_ = false;