Run buffering in separate thread for realtime replay

This improves timing accuracy in replay substantially
by running all the buffering up of log messages in a separate thread.
This adds an additional buffer past the channel_storage_duration
required for strict correctness, so that the replay has a higher chance
of keeping up.

The threading model feels a bit tenuous, since I didn't see an obviously
clean way to do this in a way that would've been safe in a multi-node
world.

Change-Id: I471fefd96a4d043766b54dd4488726e24926a95f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index a4d6cd0..bdee44f 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -53,6 +53,13 @@
             "in timing with the original logfile, but means that you lose "
             "access to fetched low-frequency messages.");
 
+DEFINE_double(
+    threaded_look_ahead_seconds, 2.0,
+    "Time, in seconds, to add to look-ahead when using multi-threaded replay. "
+    "Can validly be zero, but higher values are encouraged for realtime replay "
+    "in order to prevent the replay from ever having to block on waiting for "
+    "the reader to find the next message.");
+
 namespace aos {
 namespace configuration {
 // We don't really want to expose this publicly, but log reader doesn't really
@@ -417,6 +424,46 @@
   state->OnStart(std::move(fn));
 }
 
+void LogReader::State::QueueThreadUntil(BootTimestamp time) {
+  if (threading_ == ThreadedBuffering::kYes) {
+    CHECK(!message_queuer_.has_value()) << "Can't start thread twice.";
+    message_queuer_.emplace(
+        [this](const BootTimestamp queue_until) {
+          // This will be called whenever anything prompts us for any state
+          // change; there may be wakeups that result in us not having any new
+          // data to push (even if we aren't done), in which case we will return
+          // nullopt but not done().
+          if (last_queued_message_.has_value() &&
+              queue_until < last_queued_message_) {
+            return util::ThreadedQueue<TimestampedMessage,
+                                       BootTimestamp>::PushResult{
+                std::nullopt, false,
+                last_queued_message_ == BootTimestamp::max_time()};
+          }
+          TimestampedMessage *message = timestamp_mapper_->Front();
+          // Upon reaching the end of the log, exit.
+          if (message == nullptr) {
+            last_queued_message_ = BootTimestamp::max_time();
+            return util::ThreadedQueue<TimestampedMessage,
+                                       BootTimestamp>::PushResult{std::nullopt,
+                                                                  false, true};
+          }
+          last_queued_message_ = message->monotonic_event_time;
+          const util::ThreadedQueue<TimestampedMessage,
+                                    BootTimestamp>::PushResult result{
+              *message, queue_until >= last_queued_message_, false};
+          timestamp_mapper_->PopFront();
+          SeedSortedMessages();
+          return result;
+        },
+        time);
+    // Spin until the first few seconds of messages are queued up so that we
+    // don't end up with delays/inconsistent timing during the first few seconds
+    // of replay.
+    message_queuer_->WaitForNoMoreWork();
+  }
+}
+
 void LogReader::State::OnStart(std::function<void()> fn) {
   on_starts_.emplace_back(std::move(fn));
 }
@@ -476,6 +523,9 @@
 
   stopped_ = true;
   started_ = true;
+  if (message_queuer_.has_value()) {
+    message_queuer_->StopPushing();
+  }
 }
 
 void LogReader::Register() {
@@ -502,11 +552,15 @@
     std::vector<LogParts> filtered_parts = FilterPartsForNode(
         log_files_, node != nullptr ? node->name()->string_view() : "");
 
+    // We don't run with threading on the buffering for simulated event loops
+    // because we haven't attempted to validate how the interactions beteen the
+    // buffering and the timestamp mapper works when running multiple nodes
+    // concurrently.
     states_[node_index] = std::make_unique<State>(
         filtered_parts.size() == 0u
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
-        filters_.get(), node);
+        filters_.get(), node, State::ThreadedBuffering::kNo);
     State *state = states_[node_index].get();
     state->SetNodeEventLoopFactory(
         event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -544,7 +598,7 @@
 
     // If we didn't find any log files with data in them, we won't ever get a
     // callback or be live.  So skip the rest of the setup.
-    if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+    if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
       continue;
     }
     ++live_nodes_;
@@ -695,7 +749,7 @@
         filtered_parts.size() == 0u
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
-        filters_.get(), node);
+        filters_.get(), node, State::ThreadedBuffering::kYes);
     State *state = states_[node_index].get();
 
     state->SetChannelCount(logged_configuration()->channels()->size());
@@ -733,7 +787,7 @@
 
   // If we didn't find any log files with data in them, we won't ever get a
   // callback or be live.  So skip the rest of the setup.
-  if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+  if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
     return;
   }
 
@@ -845,10 +899,11 @@
   }
 
   state->set_timer_handler(event_loop->AddTimer([this, state]() {
-    if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+    if (state->MultiThreadedOldestMessageTime() == BootTimestamp::max_time()) {
       --live_nodes_;
       VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
-      if (exit_on_finish_ && live_nodes_ == 0) {
+      if (exit_on_finish_ && live_nodes_ == 0 &&
+          event_loop_factory_ != nullptr) {
         CHECK_NOTNULL(event_loop_factory_)->Exit();
       }
       return;
@@ -1064,7 +1119,7 @@
           << " timestamped_message.data is null";
     }
 
-    const BootTimestamp next_time = state->OldestMessageTime();
+    const BootTimestamp next_time = state->MultiThreadedOldestMessageTime();
     if (next_time != BootTimestamp::max_time()) {
       if (next_time.boot != state->boot_count()) {
         VLOG(1) << "Next message for "
@@ -1085,6 +1140,8 @@
                 << "wakeup for " << next_time.time << ", now is "
                 << state->monotonic_now();
       }
+      // TODO(james): This can result in negative times getting passed-through
+      // in realtime replay.
       state->Setup(next_time.time);
     } else {
       VLOG(1) << MaybeNodeName(state->event_loop()->node())
@@ -1106,7 +1163,9 @@
             << state->monotonic_now();
   }));
 
-  if (state->OldestMessageTime() != BootTimestamp::max_time()) {
+  state->SeedSortedMessages();
+
+  if (state->SingleThreadedOldestMessageTime() != BootTimestamp::max_time()) {
     state->set_startup_timer(
         event_loop->AddTimer([state]() { state->NotifyLogfileStart(); }));
     if (start_time_ != realtime_clock::min_time) {
@@ -1116,8 +1175,15 @@
       state->SetEndTimeFlag(end_time_);
     }
     event_loop->OnRun([state]() {
-      BootTimestamp next_time = state->OldestMessageTime();
+      BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
       CHECK_EQ(next_time.boot, state->boot_count());
+      // Queue up messages and then set clock offsets (we don't want to set
+      // clock offsets before we've done the work of getting the first messages
+      // primed).
+      state->QueueThreadUntil(
+          next_time + std::chrono::duration_cast<std::chrono::nanoseconds>(
+                          std::chrono::duration<double>(
+                              FLAGS_threaded_look_ahead_seconds)));
       state->SetClockOffset();
       state->Setup(next_time.time);
       state->SetupStartupTimer();
@@ -1536,10 +1602,11 @@
 LogReader::State::State(
     std::unique_ptr<TimestampMapper> timestamp_mapper,
     message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
-    const Node *node)
+    const Node *node, LogReader::State::ThreadedBuffering threading)
     : timestamp_mapper_(std::move(timestamp_mapper)),
       node_(node),
-      multinode_filters_(multinode_filters) {}
+      multinode_filters_(multinode_filters),
+      threading_(threading) {}
 
 void LogReader::State::AddPeer(State *peer) {
   if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1585,6 +1652,52 @@
   factory_channel_index_[logged_channel_index] = factory_channel_index;
 }
 
+void LogReader::State::TrackMessageSendTiming(
+    const RawSender &sender, monotonic_clock::time_point expected_send_time) {
+  if (event_loop_ == nullptr || !timing_statistics_sender_.valid()) {
+    return;
+  }
+
+  timing::MessageTimingT sample;
+  sample.channel = configuration::ChannelIndex(event_loop_->configuration(),
+                                               sender.channel());
+  sample.expected_send_time = expected_send_time.time_since_epoch().count();
+  sample.actual_send_time =
+      sender.monotonic_sent_time().time_since_epoch().count();
+  sample.send_time_error = aos::time::DurationInSeconds(
+      expected_send_time - sender.monotonic_sent_time());
+  send_timings_.push_back(sample);
+
+  // Somewhat arbitrarily send out timing information in batches of 100. No need
+  // to create excessive overhead in regenerated logfiles.
+  // TODO(james): The overhead may be fine.
+  constexpr size_t kMaxTimesPerStatisticsMessage = 100;
+  CHECK(timing_statistics_sender_.valid());
+  if (send_timings_.size() == kMaxTimesPerStatisticsMessage) {
+    SendMessageTimings();
+  }
+}
+
+void LogReader::State::SendMessageTimings() {
+  if (send_timings_.empty() || !timing_statistics_sender_.valid()) {
+    return;
+  }
+  auto builder = timing_statistics_sender_.MakeBuilder();
+  std::vector<flatbuffers::Offset<timing::MessageTiming>> timing_offsets;
+  for (const auto &timing : send_timings_) {
+    timing_offsets.push_back(
+        timing::MessageTiming::Pack(*builder.fbb(), &timing));
+  }
+  send_timings_.clear();
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<timing::MessageTiming>>>
+      timings_offset = builder.fbb()->CreateVector(timing_offsets);
+  timing::ReplayTiming::Builder timing_builder =
+      builder.MakeBuilder<timing::ReplayTiming>();
+  timing_builder.add_messages(timings_offset);
+  timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
+}
+
 bool LogReader::State::Send(const TimestampedMessage &timestamped_message) {
   aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
   CHECK(sender);
@@ -1692,6 +1805,13 @@
                              timestamped_message.monotonic_remote_time.boot)
            : event_loop_->boot_uuid()));
   if (err != RawSender::Error::kOk) return false;
+  if (monotonic_start_time(timestamped_message.monotonic_event_time.boot) <=
+      timestamped_message.monotonic_event_time.time) {
+    // Only track errors for non-fetched messages.
+    TrackMessageSendTiming(
+        *sender,
+        timestamped_message.monotonic_event_time.time + clock_offset());
+  }
 
   if (queue_index_map_[timestamped_message.channel_index]) {
     CHECK_EQ(timestamped_message.monotonic_event_time.boot, boot_count());
@@ -1914,27 +2034,55 @@
 }
 
 TimestampedMessage LogReader::State::PopOldest() {
-  CHECK(timestamp_mapper_ != nullptr);
-  TimestampedMessage *result_ptr = timestamp_mapper_->Front();
-  CHECK(result_ptr != nullptr);
+  if (message_queuer_.has_value()) {
+    std::optional<TimestampedMessage> message = message_queuer_->Pop();
+    CHECK(message.has_value()) << ": Unexpectedly ran out of messages.";
+    message_queuer_->SetState(
+        message.value().monotonic_event_time +
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            std::chrono::duration<double>(FLAGS_threaded_look_ahead_seconds)));
+    return message.value();
+  } else {
+    CHECK(timestamp_mapper_ != nullptr);
+    TimestampedMessage *result_ptr = timestamp_mapper_->Front();
+    CHECK(result_ptr != nullptr);
 
-  TimestampedMessage result = std::move(*result_ptr);
+    TimestampedMessage result = std::move(*result_ptr);
 
-  VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
-          << result.monotonic_event_time;
-  timestamp_mapper_->PopFront();
-  SeedSortedMessages();
+    VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+            << result.monotonic_event_time;
+    timestamp_mapper_->PopFront();
+    SeedSortedMessages();
 
-  CHECK_EQ(result.monotonic_event_time.boot, boot_count());
+    CHECK_EQ(result.monotonic_event_time.boot, boot_count());
 
-  VLOG(1) << "Popped " << result
-          << configuration::CleanedChannelToString(
-                 event_loop_->configuration()->channels()->Get(
-                     factory_channel_index_[result.channel_index]));
-  return result;
+    VLOG(1) << "Popped " << result
+            << configuration::CleanedChannelToString(
+                   event_loop_->configuration()->channels()->Get(
+                       factory_channel_index_[result.channel_index]));
+    return result;
+  }
 }
 
-BootTimestamp LogReader::State::OldestMessageTime() {
+BootTimestamp LogReader::State::MultiThreadedOldestMessageTime() {
+  if (!message_queuer_.has_value()) {
+    return SingleThreadedOldestMessageTime();
+  }
+  std::optional<TimestampedMessage> message = message_queuer_->Peek();
+  if (!message.has_value()) {
+    return BootTimestamp::max_time();
+  }
+  if (message.value().monotonic_event_time.boot == boot_count()) {
+    ObserveNextMessage(message.value().monotonic_event_time.time,
+                       message.value().realtime_event_time);
+  }
+  return message.value().monotonic_event_time;
+}
+
+BootTimestamp LogReader::State::SingleThreadedOldestMessageTime() {
+  CHECK(!message_queuer_.has_value())
+      << "Cannot use SingleThreadedOldestMessageTime() once the queuer thread "
+         "is created.";
   if (timestamp_mapper_ == nullptr) {
     return BootTimestamp::max_time();
   }
@@ -1944,12 +2092,10 @@
   }
   VLOG(2) << MaybeNodeName(node()) << "oldest message at "
           << result_ptr->monotonic_event_time.time;
-
   if (result_ptr->monotonic_event_time.boot == boot_count()) {
     ObserveNextMessage(result_ptr->monotonic_event_time.time,
                        result_ptr->realtime_event_time);
   }
-
   return result_ptr->monotonic_event_time;
 }
 
@@ -1974,6 +2120,7 @@
   event_loop_ = nullptr;
   timer_handler_ = nullptr;
   node_event_loop_factory_ = nullptr;
+  timing_statistics_sender_ = Sender<timing::ReplayTiming>();
 }
 
 void LogReader::State::SetStartTimeFlag(realtime_clock::time_point start_time) {