Push managing the message queue into TimestampMapper

We want to be able to have multinode_timestamp_filter trigger more data
to be queued.  This will let us manage startup much better when Logger
tries to compute what is happening at the start time (and we aren't
guaranteed to have queued that far ahead).  We can then have
multinode_timestamp_filter grow the queue as timestamps are being
pulled.

Move feeding timetamps into the noncausal filter from LogReader into the
TimestampMapper callback.  This is now registered from inside the
multinode timestamp filter directly.  This also removes the need for
the queue inside LogReader to hold the buffered times.

Change-Id: I054518d9be41d2a6560c45bc3a68a23c8e31347a
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 3850b44..8f0e938 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -884,6 +884,30 @@
   }
 }
 
+void TimestampMapper::QueueFor(chrono::nanoseconds time_estimation_buffer) {
+  // Make sure we have something queued first.  This makes the end time
+  // calculation simpler, and is typically what folks want regardless.
+  if (matched_messages_.empty()) {
+    if (!QueueMatched()) {
+      return;
+    }
+  }
+
+  const aos::monotonic_clock::time_point end_queue_time =
+      std::max(monotonic_start_time(),
+               matched_messages_.front().monotonic_event_time) +
+      time_estimation_buffer;
+
+  // Place sorted messages on the list until we have
+  // --time_estimation_buffer_seconds seconds queued up (but queue at least
+  // until the log starts).
+  while (end_queue_time >= last_message_time_) {
+    if (!QueueMatched()) {
+      return;
+    }
+  }
+}
+
 void TimestampMapper::PopFront() {
   CHECK(first_message_ != FirstMessage::kNeedsUpdate);
   first_message_ = FirstMessage::kNeedsUpdate;
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 070a2de..632b1df 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -517,6 +517,8 @@
 
   // Queues data the provided time.
   void QueueUntil(monotonic_clock::time_point queue_time);
+  // Queues until we have time_estimation_buffer of data in the queue.
+  void QueueFor(std::chrono::nanoseconds time_estimation_buffer);
 
   // Sets a callback to be called whenever a full message is queued.
   void set_timestamp_callback(std::function<void(TimestampedMessage *)> fn) {
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 6f722bc..eb7221d 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1108,6 +1108,7 @@
           chrono::duration_cast<chrono::nanoseconds>(
               chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
 
+  std::vector<TimestampMapper *> timestamp_mappers;
   for (const Node *node : configuration::GetNodes(configuration())) {
     const size_t node_index =
         configuration::GetNodeIndex(configuration(), node);
@@ -1138,7 +1139,12 @@
         event_loop_factory_->GetNodeEventLoopFactory(node)));
 
     state->SetChannelCount(logged_configuration()->channels()->size());
+    timestamp_mappers.emplace_back(state->timestamp_mapper());
   }
+  filters_->SetTimestampMappers(std::move(timestamp_mappers));
+
+  // Note: this needs to be set before any times are pulled, or we won't observe
+  // the timestamps.
   event_loop_factory_->SetTimeConverter(filters_.get());
 
   for (const Node *node : configuration::GetNodes(configuration())) {
@@ -2072,104 +2078,47 @@
 }
 
 TimestampedMessage LogReader::State::PopOldest() {
-  CHECK_GT(sorted_messages_.size(), 0u);
+  CHECK(timestamp_mapper_ != nullptr);
+  TimestampedMessage *result_ptr = timestamp_mapper_->Front();
+  CHECK(result_ptr != nullptr);
 
-  std::tuple<TimestampedMessage, message_bridge::NoncausalOffsetEstimator *>
-      result = std::move(sorted_messages_.front());
+  TimestampedMessage result = std::move(*result_ptr);
+
   VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
-          << std::get<0>(result).monotonic_event_time;
-  sorted_messages_.pop_front();
+          << result.monotonic_event_time;
+  timestamp_mapper_->PopFront();
   SeedSortedMessages();
 
-  if (std::get<1>(result) != nullptr) {
-    std::get<1>(result)->Pop(event_loop_->node(),
-                             std::get<0>(result).monotonic_event_time);
+  if (result.monotonic_remote_time != monotonic_clock::min_time) {
+    message_bridge::NoncausalOffsetEstimator *filter =
+        filters_[result.channel_index];
+    CHECK(filter != nullptr);
+
+    // TODO(austin): We probably want to push this down into the timestamp
+    // mapper directly.
+    filter->Pop(event_loop_->node(), result.monotonic_event_time);
   }
-  return std::move(std::get<0>(result));
+  return result;
 }
 
 monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
-  if (sorted_messages_.size() > 0) {
-    VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
-            << std::get<0>(sorted_messages_.front()).monotonic_event_time;
-    return std::get<0>(sorted_messages_.front()).monotonic_event_time;
-  }
-
-  TimestampedMessage *m =
-      timestamp_mapper_ ? timestamp_mapper_->Front() : nullptr;
-  if (m == nullptr) {
+  if (timestamp_mapper_ == nullptr) {
     return monotonic_clock::max_time;
   }
-  return m->monotonic_event_time;
+  TimestampedMessage *result_ptr = timestamp_mapper_->Front();
+  if (result_ptr == nullptr) {
+    return monotonic_clock::max_time;
+  }
+  VLOG(2) << MaybeNodeName(event_loop_->node()) << "oldest message at "
+          << result_ptr->monotonic_event_time;
+  return result_ptr->monotonic_event_time;
 }
 
 void LogReader::State::SeedSortedMessages() {
   if (!timestamp_mapper_) return;
-  aos::monotonic_clock::time_point end_queue_time =
-      (sorted_messages_.empty()
-           ? timestamp_mapper_->monotonic_start_time()
-           : std::get<0>(sorted_messages_.front()).monotonic_event_time) +
-      chrono::duration_cast<chrono::seconds>(
-          chrono::duration<double>(FLAGS_time_estimation_buffer_seconds));
 
-  while (true) {
-    TimestampedMessage *m = timestamp_mapper_->Front();
-    if (m == nullptr) {
-      return;
-    }
-    if (!sorted_messages_.empty()) {
-      // Stop placing sorted messages on the list once we have
-      // --time_estimation_buffer_seconds seconds queued up (but queue at least
-      // until the log starts).  Only break if the queue isn't empty to make
-      // sure something is always queued.
-      if (end_queue_time <
-          std::get<0>(sorted_messages_.back()).monotonic_event_time) {
-        return;
-      }
-    } else {
-      // If we were empty, there's a chance the start time was
-      // monotonic_clock::min_time if the log file had no idea of the start.  In
-      // that case, we want to queue --time_estimation_buffer_seconds from the
-      // first message.  The most conservative thing to do is to take the max of
-      // that duration and the one computed using the start time.
-      end_queue_time = std::max(
-          end_queue_time,
-          m->monotonic_event_time +
-              chrono::duration_cast<chrono::seconds>(chrono::duration<double>(
-                  FLAGS_time_estimation_buffer_seconds)));
-    }
-
-    message_bridge::NoncausalOffsetEstimator *filter = nullptr;
-
-    TimestampedMessage timestamped_message = std::move(*m);
-    timestamp_mapper_->PopFront();
-
-    // Skip any messages without forwarding information.
-    if (timestamped_message.monotonic_remote_time !=
-        monotonic_clock::min_time) {
-      // Got a forwarding timestamp!
-      filter = filters_[timestamped_message.channel_index];
-
-      CHECK(filter != nullptr);
-
-      // Call the correct method depending on if we are the forward or
-      // reverse direction here.
-      filter->Sample(event_loop_->node(),
-                     timestamped_message.monotonic_event_time,
-                     timestamped_message.monotonic_remote_time);
-
-      if (timestamped_message.monotonic_timestamp_time !=
-          monotonic_clock::min_time) {
-        // TODO(austin): This assumes that this timestamp is only logged on the
-        // node which sent the data.  That is correct for now, but should be
-        // explicitly checked somewhere.
-        filter->ReverseSample(event_loop_->node(),
-                              timestamped_message.monotonic_event_time,
-                              timestamped_message.monotonic_timestamp_time);
-      }
-    }
-    sorted_messages_.emplace_back(std::move(timestamped_message), filter);
-  }
+  timestamp_mapper_->QueueFor(chrono::duration_cast<chrono::seconds>(
+      chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
 }
 
 void LogReader::State::Deregister() {
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e221695..3cb44f6 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -546,6 +546,8 @@
     // Connects up the timestamp mappers.
     void AddPeer(State *peer);
 
+    TimestampMapper *timestamp_mapper() { return timestamp_mapper_.get(); }
+
     // Returns the next sorted message with all the timestamps extracted and
     // matched.
     TimestampedMessage PopOldest();
@@ -647,37 +649,16 @@
 
     // Returns a debug string for the channel merger.
     std::string DebugString() const {
-      std::stringstream messages;
-      size_t i = 0;
-      for (const auto &message : sorted_messages_) {
-        if (i < 7 || i + 7 > sorted_messages_.size()) {
-          messages << "sorted_messages[" << i
-                   << "]: " << std::get<0>(message).monotonic_event_time << " "
-                   << configuration::StrippedChannelToString(
-                          event_loop_->configuration()->channels()->Get(
-                              std::get<0>(message).channel_index))
-                   << (std::get<0>(message).data.span().size() == 0 ? " null"
-                                                                    : " data")
-                   << "\n";
-        } else if (i == 7) {
-          messages << "...\n";
-        }
-        ++i;
-      }
       if (!timestamp_mapper_) {
-        return messages.str();
+        return "";
       }
-      return messages.str() + timestamp_mapper_->DebugString();
+      return timestamp_mapper_->DebugString();
     }
 
    private:
     // Log file.
     std::unique_ptr<TimestampMapper> timestamp_mapper_;
 
-    std::deque<std::tuple<TimestampedMessage,
-                          message_bridge::NoncausalOffsetEstimator *>>
-        sorted_messages_;
-
     // Senders.
     std::vector<std::unique_ptr<RawSender>> channels_;
     std::vector<RemoteMessageSender *> remote_timestamp_senders_;
diff --git a/aos/events/logging/timestamp_extractor.cc b/aos/events/logging/timestamp_extractor.cc
index 5ec7025..2b84975 100644
--- a/aos/events/logging/timestamp_extractor.cc
+++ b/aos/events/logging/timestamp_extractor.cc
@@ -72,6 +72,14 @@
   message_bridge::MultiNodeNoncausalOffsetEstimator multinode_estimator(
       config, config, FLAGS_skip_order_validation, chrono::seconds(0));
 
+  {
+    std::vector<TimestampMapper *> timestamp_mappers;
+    for (std::unique_ptr<TimestampMapper> &mapper : mappers) {
+      timestamp_mappers.emplace_back(mapper.get());
+    }
+    multinode_estimator.SetTimestampMappers(std::move(timestamp_mappers));
+  }
+
   // To make things more like the logger and faster, cache the node + channel ->
   // filter mapping in a set of vectors.
   std::vector<std::vector<message_bridge::NoncausalOffsetEstimator *>> filters;
@@ -95,6 +103,8 @@
     }
   }
 
+  multinode_estimator.CheckGraph();
+
   // Now, read all the timestamps for each node.  This is simpler than the
   // logger on purpose.  It loads in *all* the timestamps in 1 go per node,
   // ignoring memory usage.
@@ -110,25 +120,6 @@
       if (m == nullptr) {
         break;
       }
-
-      if (m->monotonic_remote_time != monotonic_clock::min_time) {
-        // Got a forwarding timestamp!
-        message_bridge::NoncausalOffsetEstimator *filter =
-            filters[node_index][m->channel_index];
-        CHECK(filter != nullptr);
-
-        filter->Sample(node, m->monotonic_event_time, m->monotonic_remote_time);
-
-        // Call the correct method depending on if we are the forward or
-        // reverse direction here.
-        if (m->monotonic_timestamp_time != monotonic_clock::min_time) {
-          // TODO(austin): This assumes that this timestamp is only logged on
-          // the node which sent the data.  That is correct for now, but should
-          // be explicitly checked somewhere.
-          filter->ReverseSample(node, m->monotonic_event_time,
-                                m->monotonic_timestamp_time);
-        }
-      }
       timestamp_mapper->PopFront();
     }
   }
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 4ceab73..d8ea2d3 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -473,6 +473,7 @@
         ":timestamp_filter",
         "//aos:configuration",
         "//aos/events:simulated_event_loop",
+        "//aos/events/logging:logfile_utils",
         "//aos/time",
         "@com_github_stevengj_nlopt//:nlopt",
         "@org_tuxfamily_eigen//:eigen",
diff --git a/aos/network/multinode_timestamp_filter.cc b/aos/network/multinode_timestamp_filter.cc
index b1a1600..b3c1110 100644
--- a/aos/network/multinode_timestamp_filter.cc
+++ b/aos/network/multinode_timestamp_filter.cc
@@ -571,6 +571,67 @@
   }
 }
 
+void MultiNodeNoncausalOffsetEstimator::SetTimestampMappers(
+    std::vector<logger::TimestampMapper *> timestamp_mappers) {
+  CHECK_EQ(timestamp_mappers.size(), NodesCount());
+  filters_per_channel_.resize(timestamp_mappers.size());
+
+  // Pre-build all the filters.  Why not?
+  for (const Node *node : configuration::GetNodes(logged_configuration())) {
+    const size_t node_index =
+        configuration::GetNodeIndex(configuration(), node);
+    filters_per_channel_[node_index].resize(
+        logged_configuration()->channels()->size(), nullptr);
+    for (size_t channel_index = 0;
+         channel_index < logged_configuration()->channels()->size();
+         ++channel_index) {
+      const Channel *channel =
+          logged_configuration()->channels()->Get(channel_index);
+
+      if (!configuration::ChannelIsSendableOnNode(channel, node) &&
+          configuration::ChannelIsReadableOnNode(channel, node)) {
+        // We've got a message which is being forwarded to this node.
+        const Node *source_node = configuration::GetNode(
+            configuration(), channel->source_node()->string_view());
+        filters_per_channel_[node_index][channel_index] =
+            GetFilter(configuration()->nodes()->Get(node_index), source_node);
+      }
+    }
+  }
+
+  size_t node_index = 0;
+  for (logger::TimestampMapper *timestamp_mapper : timestamp_mappers) {
+    if (timestamp_mapper != nullptr) {
+      CHECK_EQ(timestamp_mapper->sorted_until(), monotonic_clock::min_time)
+          << ": Timestamps queued before we registered the timestamp hooks.";
+      timestamp_mapper->set_timestamp_callback(
+          [this, node_index](logger::TimestampedMessage *msg) {
+            if (msg->monotonic_remote_time != monotonic_clock::min_time) {
+              // Got a forwarding timestamp!
+              NoncausalOffsetEstimator *filter =
+                  filters_per_channel_[node_index][msg->channel_index];
+              CHECK_NOTNULL(filter);
+              const Node *node = configuration()->nodes()->Get(node_index);
+
+              // Call the correct method depending on if we are the forward or
+              // reverse direction here.
+              filter->Sample(node, msg->monotonic_event_time,
+                             msg->monotonic_remote_time);
+
+              if (msg->monotonic_timestamp_time != monotonic_clock::min_time) {
+                // TODO(austin): This assumes that this timestamp is only logged
+                // on the node which sent the data.  That is correct for now,
+                // but should be explicitly checked somewhere.
+                filter->ReverseSample(node, msg->monotonic_event_time,
+                                      msg->monotonic_timestamp_time);
+              }
+            }
+          });
+    }
+    ++node_index;
+  }
+}
+
 TimeComparison CompareTimes(const std::vector<monotonic_clock::time_point> &ta,
                             const std::vector<monotonic_clock::time_point> &tb,
                             bool ignore_min_time) {
diff --git a/aos/network/multinode_timestamp_filter.h b/aos/network/multinode_timestamp_filter.h
index d3bba2c..523b384 100644
--- a/aos/network/multinode_timestamp_filter.h
+++ b/aos/network/multinode_timestamp_filter.h
@@ -7,6 +7,7 @@
 
 #include "Eigen/Dense"
 #include "aos/configuration.h"
+#include "aos/events/logging/logfile_utils.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/network/timestamp_filter.h"
 #include "aos/time/time.h"
@@ -297,6 +298,11 @@
 
   ~MultiNodeNoncausalOffsetEstimator() override;
 
+  // Sets the timestamp mappers for all the nodes.  This registers the timestamp
+  // callback to add elements to the filters.
+  void SetTimestampMappers(
+      std::vector<logger::TimestampMapper *> timestamp_mappers);
+
   std::optional<std::tuple<distributed_clock::time_point,
                            std::vector<monotonic_clock::time_point>>>
   NextTimestamp() override;
@@ -360,6 +366,9 @@
   distributed_clock::time_point last_distributed_ = distributed_clock::epoch();
   std::vector<aos::monotonic_clock::time_point> last_monotonics_;
 
+  // A mapping from node and channel to the relevant estimator.
+  std::vector<std::vector<NoncausalOffsetEstimator *>> filters_per_channel_;
+
   bool first_solution_ = true;
   bool all_done_ = false;