Fix a log-reading bug

I can't manage to reproduce it in a test, but the new CHECK should catch
it robustly.

Change-Id: I4e8baab929326bd3de12865d37703796d5cf2417
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 8e5e27c..7125bda 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -461,7 +461,7 @@
       }
     } else {
       if (!NextLogFile()) {
-        VLOG(1) << "End of log file " << filenames_.back();
+        VLOG(1) << "No more files, last was " << filenames_.back();
         at_end_ = true;
         for (MessageHeaderQueue *queue : channels_to_write_) {
           if (queue == nullptr || queue->timestamp_merger == nullptr) {
@@ -541,7 +541,8 @@
       std::move(channels_[channel_index].data.front());
   channels_[channel_index].data.pop_front();
 
-  VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+  VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp) << " for "
+          << channel_index;
 
   QueueMessages(std::get<0>(timestamp));
 
@@ -559,7 +560,8 @@
       std::move(channels_[channel].timestamps[node_index].front());
   channels_[channel].timestamps[node_index].pop_front();
 
-  VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp);
+  VLOG(1) << "Popped " << this << " " << std::get<0>(timestamp) << " for "
+          << channel << " on " << node_index;
 
   QueueMessages(std::get<0>(timestamp));
 
@@ -693,7 +695,7 @@
 
   // If we are just a data merger, don't wait for timestamps.
   if (!has_timestamps_) {
-    channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+    channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
     pushed_ = true;
   }
 }
@@ -737,7 +739,7 @@
   // If we are a timestamp merger, don't wait for data.  Missing data will be
   // caught at read time.
   if (has_timestamps_) {
-    channel_merger_->Update(std::get<0>(timestamp), channel_index_);
+    channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
     pushed_ = true;
   }
 }
@@ -766,7 +768,7 @@
   CHECK_EQ(std::get<1>(oldest_message), std::get<1>(oldest_message_reader));
 
   // Now, keep reading until we have found all duplicates.
-  while (message_heap_.size() > 0u) {
+  while (!message_heap_.empty()) {
     // See if it is a duplicate.
     std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
         next_oldest_message_reader = message_heap_.front();
@@ -888,7 +890,7 @@
             std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
 
     // See if we have any data.  If not, pass the problem up the chain.
-    if (message_heap_.size() == 0u) {
+    if (message_heap_.empty()) {
       VLOG(1) << "No data to match timestamp on "
               << configuration::CleanedChannelToString(
                      configuration_->channels()->Get(channel_index_));
@@ -1072,14 +1074,14 @@
 }
 
 monotonic_clock::time_point ChannelMerger::OldestMessage() const {
-  if (channel_heap_.size() == 0u) {
+  if (channel_heap_.empty()) {
     return monotonic_clock::max_time;
   }
   return channel_heap_.front().first;
 }
 
 TimestampMerger::DeliveryTimestamp ChannelMerger::OldestTimestamp() const {
-  if (timestamp_heap_.size() == 0u) {
+  if (timestamp_heap_.empty()) {
     return TimestampMerger::DeliveryTimestamp{};
   }
   return timestamp_mergers_[timestamp_heap_.front().second].OldestTimestamp();
@@ -1101,20 +1103,34 @@
   // Pop and recreate the heap if it has already been pushed.  And since we are
   // pushing again, we don't need to clear pushed.
   if (timestamp_mergers_[channel_index].pushed()) {
-    channel_heap_.erase(std::find_if(
+    const auto channel_iterator = std::find_if(
         channel_heap_.begin(), channel_heap_.end(),
         [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
           return x.second == channel_index;
-        }));
+        });
+    DCHECK(channel_iterator != channel_heap_.end());
+    if (std::get<0>(*channel_iterator) == timestamp) {
+      // It's already in the heap, in the correct spot, so nothing
+      // more for us to do here.
+      return;
+    }
+    channel_heap_.erase(channel_iterator);
     std::make_heap(channel_heap_.begin(), channel_heap_.end(),
                    ChannelHeapCompare);
 
     if (timestamp_mergers_[channel_index].has_timestamps()) {
-      timestamp_heap_.erase(std::find_if(
+      const auto timestamp_iterator = std::find_if(
           timestamp_heap_.begin(), timestamp_heap_.end(),
           [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
             return x.second == channel_index;
-          }));
+          });
+      DCHECK(timestamp_iterator != timestamp_heap_.end());
+      if (std::get<0>(*timestamp_iterator) == timestamp) {
+        // It's already in the heap, in the correct spot, so nothing
+        // more for us to do here.
+        return;
+      }
+      timestamp_heap_.erase(timestamp_iterator);
       std::make_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
                      ChannelHeapCompare);
     }
@@ -1164,6 +1180,10 @@
   std::tuple<TimestampMerger::DeliveryTimestamp,
              FlatbufferVector<MessageHeader>>
       message = merger->PopOldest();
+  DCHECK_EQ(std::get<0>(message).monotonic_event_time,
+            oldest_channel_data.first)
+      << ": channel_heap_ was corrupted for " << channel_index << ": "
+      << DebugString();
 
   return std::make_tuple(std::get<0>(message), channel_index,
                          std::move(std::get<1>(message)));
@@ -1247,7 +1267,7 @@
     std::vector<
         std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
         message_heap = message_heap_;
-    while (message_heap.size() > 0u) {
+    while (!message_heap.empty()) {
       std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
           oldest_message_reader = message_heap.front();
 
@@ -1276,7 +1296,7 @@
   ss << "channel_heap {\n";
   std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
       channel_heap_;
-  while (channel_heap.size() > 0u) {
+  while (!channel_heap.empty()) {
     std::tuple<monotonic_clock::time_point, int> channel = channel_heap.front();
     ss << "  " << std::get<0>(channel) << " (" << std::get<1>(channel) << ") "
        << configuration::CleanedChannelToString(