Push managing the message queue into TimestampMapper

We want to be able to have multinode_timestamp_filter trigger more data
to be queued.  This will let us manage startup much better when Logger
tries to compute what is happening at the start time (and we aren't
guaranteed to have queued that far ahead).  We can then have
multinode_timestamp_filter grow the queue as timestamps are being
pulled.

Move feeding timetamps into the noncausal filter from LogReader into the
TimestampMapper callback.  This is now registered from inside the
multinode timestamp filter directly.  This also removes the need for
the queue inside LogReader to hold the buffered times.

Change-Id: I054518d9be41d2a6560c45bc3a68a23c8e31347a
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index 5ec7025..2b84975 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -72,6 +72,14 @@
   message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
       config, config, FLAGS_skip_order_validation, chrono::seconds(0));
 
+  {
+    std::vector<TimestampMapper *> timestamp_mappers;
+    for (std::unique_ptr<TimestampMapper> &mapper : mappers) {
+      timestamp_mappers.emplace_back(mapper.get());
+    }
+    multinode_estimator.SetTimestampMappers(std::move(timestamp_mappers));
+  }
+
   // To make things more like the logger and faster, cache the node + channel ->
   // filter mapping in a set of vectors.
   std::vector<std::vector<message_bridge::NoncausalOffsetEstimator *>> filters;
@@ -95,6 +103,8 @@
     }
   }
 
+  multinode_estimator.CheckGraph();
+
   // Now, read all the timestamps for each node.  This is simpler than the
   // logger on purpose.  It loads in *all* the timestamps in 1 go per node,
   // ignoring memory usage.
@@ -110,25 +120,6 @@
       if (m == nullptr) {
         break;
       }
-
-      if (m->monotonic_remote_time != monotonic_clock::min_time) {
-        // Got a forwarding timestamp!
-        message_bridge::NoncausalOffsetEstimator *filter =
-            filters[node_index][m->channel_index];
-        CHECK(filter != nullptr);
-
-        filter->Sample(node, m->monotonic_event_time, m->monotonic_remote_time);
-
-        // Call the correct method depending on if we are the forward or
-        // reverse direction here.
-        if (m->monotonic_timestamp_time != monotonic_clock::min_time) {
-          // TODO(austin): This assumes that this timestamp is only logged on
-          // the node which sent the data.  That is correct for now, but should
-          // be explicitly checked somewhere.
-          filter->ReverseSample(node, m->monotonic_event_time,
-                                m->monotonic_timestamp_time);
-        }
-      }
       timestamp_mapper->PopFront();
     }
   }