Push managing the message queue into TimestampMapper

We want to be able to have multinode_timestamp_filter trigger more data
to be queued.  This will let us manage startup much better when Logger
tries to compute what is happening at the start time (and we aren't
guaranteed to have queued that far ahead).  We can then have
multinode_timestamp_filter grow the queue as timestamps are being
pulled.

Move feeding timetamps into the noncausal filter from LogReader into the
TimestampMapper callback.  This is now registered from inside the
multinode timestamp filter directly.  This also removes the need for
the queue inside LogReader to hold the buffered times.

Change-Id: I054518d9be41d2a6560c45bc3a68a23c8e31347a
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 6f722bc..eb7221d 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1108,6 +1108,7 @@
           chrono::duration_cast<chrono::nanoseconds>(
               chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
 
+  std::vector<TimestampMapper *> timestamp_mappers;
   for (const Node *node : configuration::GetNodes(configuration())) {
     const size_t node_index =
         configuration::GetNodeIndex(configuration(), node);
@@ -1138,7 +1139,12 @@
         event_loop_factory_->GetNodeEventLoopFactory(node)));
 
     state->SetChannelCount(logged_configuration()->channels()->size());
+    timestamp_mappers.emplace_back(state->timestamp_mapper());
   }
+  filters_->SetTimestampMappers(std::move(timestamp_mappers));
+
+  // Note: this needs to be set before any times are pulled, or we won't observe
+  // the timestamps.
   event_loop_factory_->SetTimeConverter(filters_.get());
 
   for (const Node *node : configuration::GetNodes(configuration())) {
@@ -2072,104 +2078,47 @@
 }
 
 TimestampedMessage LogReader::State::PopOldest() {
-  CHECK_GT(sorted_messages_.size(), 0u);
+  CHECK(timestamp_mapper_ != nullptr);
+  TimestampedMessage *result_ptr = timestamp_mapper_->Front();
+  CHECK(result_ptr != nullptr);
 
-  std::tuple<TimestampedMessage, message_bridge::NoncausalOffsetEstimator *>
-      result = std::move(sorted_messages_.front());
+  TimestampedMessage result = std::move(*result_ptr);
+
   VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
-          << std::get<0>(result).monotonic_event_time;
-  sorted_messages_.pop_front();
+          << result.monotonic_event_time;
+  timestamp_mapper_->PopFront();
   SeedSortedMessages();
 
-  if (std::get<1>(result) != nullptr) {
-    std::get<1>(result)->Pop(event_loop_->node(),
-                             std::get<0>(result).monotonic_event_time);
+  if (result.monotonic_remote_time != monotonic_clock::min_time) {
+    message_bridge::NoncausalOffsetEstimator *filter =
+        filters_[result.channel_index];
+    CHECK(filter != nullptr);
+
+    // TODO(austin): We probably want to push this down into the timestamp
+    // mapper directly.
+    filter->Pop(event_loop_->node(), result.monotonic_event_time);
   }
-  return std::move(std::get<0>(result));
+  return result;
 }
 
 monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
-  if (sorted_messages_.size() > 0) {
-    VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
-            << std::get<0>(sorted_messages_.front()).monotonic_event_time;
-    return std::get<0>(sorted_messages_.front()).monotonic_event_time;
-  }
-
-  TimestampedMessage *m =
-      timestamp_mapper_ ? timestamp_mapper_->Front() : nullptr;
-  if (m == nullptr) {
+  if (timestamp_mapper_ == nullptr) {
     return monotonic_clock::max_time;
   }
-  return m->monotonic_event_time;
+  TimestampedMessage *result_ptr = timestamp_mapper_->Front();
+  if (result_ptr == nullptr) {
+    return monotonic_clock::max_time;
+  }
+  VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
+          << result_ptr->monotonic_event_time;
+  return result_ptr->monotonic_event_time;
 }
 
 void LogReader::State::SeedSortedMessages() {
   if (!timestamp_mapper_) return;
-  aos::monotonic_clock::time_point end_queue_time =
-      (sorted_messages_.empty()
-           ? timestamp_mapper_->monotonic_start_time()
-           : std::get<0>(sorted_messages_.front()).monotonic_event_time) +
-      chrono::duration_cast<chrono::seconds>(
-          chrono::duration<double>(FLAGS_time_estimation_buffer_seconds));
 
-  while (true) {
-    TimestampedMessage *m = timestamp_mapper_->Front();
-    if (m == nullptr) {
-      return;
-    }
-    if (!sorted_messages_.empty()) {
-      // Stop placing sorted messages on the list once we have
-      // --time_estimation_buffer_seconds seconds queued up (but queue at least
-      // until the log starts).  Only break if the queue isn't empty to make
-      // sure something is always queued.
-      if (end_queue_time <
-          std::get<0>(sorted_messages_.back()).monotonic_event_time) {
-        return;
-      }
-    } else {
-      // If we were empty, there's a chance the start time was
-      // monotonic_clock::min_time if the log file had no idea of the start.  In
-      // that case, we want to queue --time_estimation_buffer_seconds from the
-      // first message.  The most conservative thing to do is to take the max of
-      // that duration and the one computed using the start time.
-      end_queue_time = std::max(
-          end_queue_time,
-          m->monotonic_event_time +
-              chrono::duration_cast<chrono::seconds>(chrono::duration<double>(
-                  FLAGS_time_estimation_buffer_seconds)));
-    }
-
-    message_bridge::NoncausalOffsetEstimator *filter = nullptr;
-
-    TimestampedMessage timestamped_message = std::move(*m);
-    timestamp_mapper_->PopFront();
-
-    // Skip any messages without forwarding information.
-    if (timestamped_message.monotonic_remote_time !=
-        monotonic_clock::min_time) {
-      // Got a forwarding timestamp!
-      filter = filters_[timestamped_message.channel_index];
-
-      CHECK(filter != nullptr);
-
-      // Call the correct method depending on if we are the forward or
-      // reverse direction here.
-      filter->Sample(event_loop_->node(),
-                     timestamped_message.monotonic_event_time,
-                     timestamped_message.monotonic_remote_time);
-
-      if (timestamped_message.monotonic_timestamp_time !=
-          monotonic_clock::min_time) {
-        // TODO(austin): This assumes that this timestamp is only logged on the
-        // node which sent the data.  That is correct for now, but should be
-        // explicitly checked somewhere.
-        filter->ReverseSample(event_loop_->node(),
-                              timestamped_message.monotonic_event_time,
-                              timestamped_message.monotonic_timestamp_time);
-      }
-    }
-    sorted_messages_.emplace_back(std::move(timestamped_message), filter);
-  }
+  timestamp_mapper_->QueueFor(chrono::duration_cast<chrono::seconds>(
+      chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
 }
 
 void LogReader::State::Deregister() {