Run buffering in separate thread for realtime replay

This improves timing accuracy in replay substantially
by running all the buffering up of log messages in a separate thread.
This adds an additional buffer past the channel_storage_duration
required for strict correctness, so that the replay has a higher chance
of keeping up.

The threading model feels a bit tenuous, since I didn't see an obviously
clean way to do this in a way that would've been safe in a multi-node
world.

Change-Id: I471fefd96a4d043766b54dd4488726e24926a95f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index f7a5551..b045e14 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -1,10 +1,11 @@
 load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
 load("//aos:config.bzl", "aos_config")
+load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
 
 flatbuffer_cc_library(
     name = "logger_fbs",
     srcs = ["logger.fbs"],
-    gen_reflections = 1,
+    gen_reflections = True,
     includes = [
         "//aos:configuration_fbs_includes",
     ],
@@ -12,6 +13,20 @@
     visibility = ["//visibility:public"],
 )
 
+flatbuffer_cc_library(
+    name = "replay_timing_fbs",
+    srcs = ["replay_timing.fbs"],
+    gen_reflections = True,
+    target_compatible_with = ["@platforms//os:linux"],
+)
+
+cc_static_flatbuffer(
+    name = "replay_timing_schema",
+    function = "aos::timing::ReplayTimingSchema",
+    target = ":replay_timing_fbs_reflection_out",
+    visibility = ["//visibility:public"],
+)
+
 cc_library(
     name = "boot_timestamp",
     srcs = ["boot_timestamp.cc"],
@@ -265,10 +280,13 @@
         ":log_writer",
         ":logfile_utils",
         ":logger_fbs",
+        ":replay_timing_fbs",
+        "//aos:condition",
         "//aos:uuid",
         "//aos/events:event_loop",
         "//aos/events:shm_event_loop",
         "//aos/events:simulated_event_loop",
+        "//aos/mutex",
         "//aos/network:message_bridge_server_fbs",
         "//aos/network:multinode_timestamp_filter",
         "//aos/network:remote_message_fbs",
@@ -277,6 +295,7 @@
         "//aos/network:timestamp_filter",
         "//aos/time",
         "//aos/util:file",
+        "//aos/util:threaded_queue",
         "@com_github_google_flatbuffers//:flatbuffers",
         "@com_google_absl//absl/strings",
     ],
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index a4d6cd0..bdee44f 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -53,6 +53,13 @@
             "in timing with the original logfile, but means that you lose "
             "access to fetched low-frequency messages.");
 
+DEFINE_double(
+    threaded_look_ahead_seconds, 2.0,
+    "Time, in seconds, to add to look-ahead when using multi-threaded replay. "
+    "Can validly be zero, but higher values are encouraged for realtime replay "
+    "in order to prevent the replay from ever having to block on waiting for "
+    "the reader to find the next message.");
+
 namespace aos {
 namespace configuration {
 // We don't really want to expose this publicly, but log reader doesn't really
@@ -417,6 +424,46 @@
   state->OnStart(std::move(fn));
 }
 
+void LogReader::State::QueueThreadUntil(BootTimestamp time) {
+  if (threading_ == ThreadedBuffering::kYes) {
+    CHECK(!message_queuer_.has_value()) << "Can't start thread twice.";
+    message_queuer_.emplace(
+        [this](const BootTimestamp queue_until) {
+          // This will be called whenever anything prompts us for any state
+          // change; there may be wakeups that result in us not having any new
+          // data to push (even if we aren't done), in which case we will return
+          // nullopt but not done().
+          if (last_queued_message_.has_value() &&
+              queue_until < last_queued_message_) {
+            return util::ThreadedQueue<TimestampedMessage,
+                                       BootTimestamp>::PushResult{
+                std::nullopt, false,
+                last_queued_message_ == BootTimestamp::max_time()};
+          }
+          TimestampedMessage *message = timestamp_mapper_->Front();
+          // Upon reaching the end of the log, exit.
+          if (message == nullptr) {
+            last_queued_message_ = BootTimestamp::max_time();
+            return util::ThreadedQueue<TimestampedMessage,
+                                       BootTimestamp>::PushResult{std::nullopt,
+                                                                  false, true};
+          }
+          last_queued_message_ = message->monotonic_event_time;
+          const util::ThreadedQueue<TimestampedMessage,
+                                    BootTimestamp>::PushResult result{
+              *message, queue_until >= last_queued_message_, false};
+          timestamp_mapper_->PopFront();
+          SeedSortedMessages();
+          return result;
+        },
+        time);
+    // Spin until the first few seconds of messages are queued up so that we
+    // don't end up with delays/inconsistent timing during the first few seconds
+    // of replay.
+    message_queuer_->WaitForNoMoreWork();
+  }
+}
+
 void LogReader::State::OnStart(std::function<void()> fn) {
   on_starts_.emplace_back(std::move(fn));
 }
@@ -476,6 +523,9 @@
 
   stopped_ = true;
   started_ = true;
+  if (message_queuer_.has_value()) {
+    message_queuer_->StopPushing();
+  }
 }
 
 void LogReader::Register() {
@@ -502,11 +552,15 @@
     std::vector<LogParts> filtered_parts = FilterPartsForNode(
         log_files_, node != nullptr ? node->name()->string_view() : "");
 
+    // We don't run with threading on the buffering for simulated event loops
+    // because we haven't attempted to validate how the interactions beteen the
+    // buffering and the timestamp mapper works when running multiple nodes
+    // concurrently.
     states_[node_index] = std::make_unique<State>(
         filtered_parts.size() == 0u
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
-        filters_.get(), node);
+        filters_.get(), node, State::ThreadedBuffering::kNo);
     State *state = states_[node_index].get();
     state->SetNodeEventLoopFactory(
         event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -544,7 +598,7 @@
 
     // If we didn't find any log files with data in them, we won't ever get a
     // callback or be live.  So skip the rest of the setup.
-    if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+    if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
       continue;
     }
     ++live_nodes_;
@@ -695,7 +749,7 @@
         filtered_parts.size() == 0u
             ? nullptr
             : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
-        filters_.get(), node);
+        filters_.get(), node, State::ThreadedBuffering::kYes);
     State *state = states_[node_index].get();
 
     state->SetChannelCount(logged_configuration()->channels()->size());
@@ -733,7 +787,7 @@
 
   // If we didn't find any log files with data in them, we won't ever get a
   // callback or be live.  So skip the rest of the setup.
-  if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+  if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
     return;
   }
 
@@ -845,10 +899,11 @@
   }
 
   state->set_timer_handler(event_loop->AddTimer([this, state]() {
-    if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+    if (state->MultiThreadedOldestMessageTime() == BootTimestamp::max_time()) {
       --live_nodes_;
       VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
-      if (exit_on_finish_ && live_nodes_ == 0) {
+      if (exit_on_finish_ && live_nodes_ == 0 &&
+          event_loop_factory_ != nullptr) {
         CHECK_NOTNULL(event_loop_factory_)->Exit();
       }
       return;
@@ -1064,7 +1119,7 @@
           << " timestamped_message.data is null";
     }
 
-    const BootTimestamp next_time = state->OldestMessageTime();
+    const BootTimestamp next_time = state->MultiThreadedOldestMessageTime();
     if (next_time != BootTimestamp::max_time()) {
       if (next_time.boot != state->boot_count()) {
         VLOG(1) << "Next message for "
@@ -1085,6 +1140,8 @@
                 << "wakeup for " << next_time.time << ", now is "
                 << state->monotonic_now();
       }
+      // TODO(james): This can result in negative times getting passed-through
+      // in realtime replay.
       state->Setup(next_time.time);
     } else {
       VLOG(1) << MaybeNodeName(state->event_loop()->node())
@@ -1106,7 +1163,9 @@
             << state->monotonic_now();
   }));
 
-  if (state->OldestMessageTime() != BootTimestamp::max_time()) {
+  state->SeedSortedMessages();
+
+  if (state->SingleThreadedOldestMessageTime() != BootTimestamp::max_time()) {
     state->set_startup_timer(
         event_loop->AddTimer([state]() { state->NotifyLogfileStart(); }));
     if (start_time_ != realtime_clock::min_time) {
@@ -1116,8 +1175,15 @@
       state->SetEndTimeFlag(end_time_);
     }
     event_loop->OnRun([state]() {
-      BootTimestamp next_time = state->OldestMessageTime();
+      BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
       CHECK_EQ(next_time.boot, state->boot_count());
+      // Queue up messages and then set clock offsets (we don't want to set
+      // clock offsets before we've done the work of getting the first messages
+      // primed).
+      state->QueueThreadUntil(
+          next_time + std::chrono::duration_cast<std::chrono::nanoseconds>(
+                          std::chrono::duration<double>(
+                              FLAGS_threaded_look_ahead_seconds)));
       state->SetClockOffset();
       state->Setup(next_time.time);
       state->SetupStartupTimer();
@@ -1536,10 +1602,11 @@
 LogReader::State::State(
     std::unique_ptr<TimestampMapper> timestamp_mapper,
     message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
-    const Node *node)
+    const Node *node, LogReader::State::ThreadedBuffering threading)
     : timestamp_mapper_(std::move(timestamp_mapper)),
       node_(node),
-      multinode_filters_(multinode_filters) {}
+      multinode_filters_(multinode_filters),
+      threading_(threading) {}
 
 void LogReader::State::AddPeer(State *peer) {
   if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1585,6 +1652,52 @@
   factory_channel_index_[logged_channel_index] = factory_channel_index;
 }
 
+void LogReader::State::TrackMessageSendTiming(
+    const RawSender &sender, monotonic_clock::time_point expected_send_time) {
+  if (event_loop_ == nullptr || !timing_statistics_sender_.valid()) {
+    return;
+  }
+
+  timing::MessageTimingT sample;
+  sample.channel = configuration::ChannelIndex(event_loop_->configuration(),
+                                               sender.channel());
+  sample.expected_send_time = expected_send_time.time_since_epoch().count();
+  sample.actual_send_time =
+      sender.monotonic_sent_time().time_since_epoch().count();
+  sample.send_time_error = aos::time::DurationInSeconds(
+      expected_send_time - sender.monotonic_sent_time());
+  send_timings_.push_back(sample);
+
+  // Somewhat arbitrarily send out timing information in batches of 100. No need
+  // to create excessive overhead in regenerated logfiles.
+  // TODO(james): The overhead may be fine.
+  constexpr size_t kMaxTimesPerStatisticsMessage = 100;
+  CHECK(timing_statistics_sender_.valid());
+  if (send_timings_.size() == kMaxTimesPerStatisticsMessage) {
+    SendMessageTimings();
+  }
+}
+
+void LogReader::State::SendMessageTimings() {
+  if (send_timings_.empty() || !timing_statistics_sender_.valid()) {
+    return;
+  }
+  auto builder = timing_statistics_sender_.MakeBuilder();
+  std::vector<flatbuffers::Offset<timing::MessageTiming>> timing_offsets;
+  for (const auto &timing : send_timings_) {
+    timing_offsets.push_back(
+        timing::MessageTiming::Pack(*builder.fbb(), &timing));
+  }
+  send_timings_.clear();
+  flatbuffers::Offset<
+      flatbuffers::Vector<flatbuffers::Offset<timing::MessageTiming>>>
+      timings_offset = builder.fbb()->CreateVector(timing_offsets);
+  timing::ReplayTiming::Builder timing_builder =
+      builder.MakeBuilder<timing::ReplayTiming>();
+  timing_builder.add_messages(timings_offset);
+  timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
+}
+
 bool LogReader::State::Send(const TimestampedMessage &timestamped_message) {
   aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
   CHECK(sender);
@@ -1692,6 +1805,13 @@
                              timestamped_message.monotonic_remote_time.boot)
            : event_loop_->boot_uuid()));
   if (err != RawSender::Error::kOk) return false;
+  if (monotonic_start_time(timestamped_message.monotonic_event_time.boot) <=
+      timestamped_message.monotonic_event_time.time) {
+    // Only track errors for non-fetched messages.
+    TrackMessageSendTiming(
+        *sender,
+        timestamped_message.monotonic_event_time.time + clock_offset());
+  }
 
   if (queue_index_map_[timestamped_message.channel_index]) {
     CHECK_EQ(timestamped_message.monotonic_event_time.boot, boot_count());
@@ -1914,27 +2034,55 @@
 }
 
 TimestampedMessage LogReader::State::PopOldest() {
-  CHECK(timestamp_mapper_ != nullptr);
-  TimestampedMessage *result_ptr = timestamp_mapper_->Front();
-  CHECK(result_ptr != nullptr);
+  if (message_queuer_.has_value()) {
+    std::optional<TimestampedMessage> message = message_queuer_->Pop();
+    CHECK(message.has_value()) << ": Unexpectedly ran out of messages.";
+    message_queuer_->SetState(
+        message.value().monotonic_event_time +
+        std::chrono::duration_cast<std::chrono::nanoseconds>(
+            std::chrono::duration<double>(FLAGS_threaded_look_ahead_seconds)));
+    return message.value();
+  } else {
+    CHECK(timestamp_mapper_ != nullptr);
+    TimestampedMessage *result_ptr = timestamp_mapper_->Front();
+    CHECK(result_ptr != nullptr);
 
-  TimestampedMessage result = std::move(*result_ptr);
+    TimestampedMessage result = std::move(*result_ptr);
 
-  VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
-          << result.monotonic_event_time;
-  timestamp_mapper_->PopFront();
-  SeedSortedMessages();
+    VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+            << result.monotonic_event_time;
+    timestamp_mapper_->PopFront();
+    SeedSortedMessages();
 
-  CHECK_EQ(result.monotonic_event_time.boot, boot_count());
+    CHECK_EQ(result.monotonic_event_time.boot, boot_count());
 
-  VLOG(1) << "Popped " << result
-          << configuration::CleanedChannelToString(
-                 event_loop_->configuration()->channels()->Get(
-                     factory_channel_index_[result.channel_index]));
-  return result;
+    VLOG(1) << "Popped " << result
+            << configuration::CleanedChannelToString(
+                   event_loop_->configuration()->channels()->Get(
+                       factory_channel_index_[result.channel_index]));
+    return result;
+  }
 }
 
-BootTimestamp LogReader::State::OldestMessageTime() {
+BootTimestamp LogReader::State::MultiThreadedOldestMessageTime() {
+  if (!message_queuer_.has_value()) {
+    return SingleThreadedOldestMessageTime();
+  }
+  std::optional<TimestampedMessage> message = message_queuer_->Peek();
+  if (!message.has_value()) {
+    return BootTimestamp::max_time();
+  }
+  if (message.value().monotonic_event_time.boot == boot_count()) {
+    ObserveNextMessage(message.value().monotonic_event_time.time,
+                       message.value().realtime_event_time);
+  }
+  return message.value().monotonic_event_time;
+}
+
+BootTimestamp LogReader::State::SingleThreadedOldestMessageTime() {
+  CHECK(!message_queuer_.has_value())
+      << "Cannot use SingleThreadedOldestMessageTime() once the queuer thread "
+         "is created.";
   if (timestamp_mapper_ == nullptr) {
     return BootTimestamp::max_time();
   }
@@ -1944,12 +2092,10 @@
   }
   VLOG(2) << MaybeNodeName(node()) << "oldest message at "
           << result_ptr->monotonic_event_time.time;
-
   if (result_ptr->monotonic_event_time.boot == boot_count()) {
     ObserveNextMessage(result_ptr->monotonic_event_time.time,
                        result_ptr->realtime_event_time);
   }
-
   return result_ptr->monotonic_event_time;
 }
 
@@ -1974,6 +2120,7 @@
   event_loop_ = nullptr;
   timer_handler_ = nullptr;
   node_event_loop_factory_ = nullptr;
+  timing_statistics_sender_ = Sender<timing::ReplayTiming>();
 }
 
 void LogReader::State::SetStartTimeFlag(realtime_clock::time_point start_time) {
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 1b87e9d..6ea3193 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -4,6 +4,7 @@
 #include <chrono>
 #include <deque>
 #include <string_view>
+#include <queue>
 #include <tuple>
 #include <vector>
 
@@ -11,6 +12,7 @@
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/replay_timing_generated.h"
 #include "aos/events/shm_event_loop.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/network/message_bridge_server_generated.h"
@@ -19,6 +21,9 @@
 #include "aos/network/timestamp_filter.h"
 #include "aos/time/time.h"
 #include "aos/uuid.h"
+#include "aos/util/threaded_queue.h"
+#include "aos/mutex/mutex.h"
+#include "aos/condition.h"
 #include "flatbuffers/flatbuffers.h"
 
 namespace aos {
@@ -98,6 +103,14 @@
   // only useful when replaying live.
   void Register(EventLoop *event_loop);
 
+  // Sets a sender that should be used for tracking timing statistics. If not
+  // set, no statistics will be recorded.
+  void set_timing_accuracy_sender(
+      const Node *node, aos::Sender<timing::ReplayTiming> timing_sender) {
+    states_[configuration::GetNodeIndex(configuration(), node)]
+        ->set_timing_accuracy_sender(std::move(timing_sender));
+  }
+
   // Called whenever a log file starts for a node.
   void OnStart(std::function<void()> fn);
   void OnStart(const Node *node, std::function<void()> fn);
@@ -287,9 +300,13 @@
   // State per node.
   class State {
    public:
+    // Whether we should spin up a separate thread for buffering up messages.
+    // Only allowed in realtime replay--see comments on threading_ member for
+    // details.
+    enum class ThreadedBuffering { kYes, kNo };
     State(std::unique_ptr<TimestampMapper> timestamp_mapper,
           message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
-          const Node *node);
+          const Node *node, ThreadedBuffering threading);
 
     // Connects up the timestamp mappers.
     void AddPeer(State *peer);
@@ -301,7 +318,10 @@
     TimestampedMessage PopOldest();
 
     // Returns the monotonic time of the oldest message.
-    BootTimestamp OldestMessageTime();
+    BootTimestamp SingleThreadedOldestMessageTime();
+    // Returns the monotonic time of the oldest message, handling querying the
+    // separate thread of ThreadedBuffering was set.
+    BootTimestamp MultiThreadedOldestMessageTime();
 
     size_t boot_count() const {
       // If we are replaying directly into an event loop, we can't reboot.  So
@@ -544,7 +564,22 @@
       return last_message_[channel_index];
     }
 
+    void set_timing_accuracy_sender(
+        aos::Sender<timing::ReplayTiming> timing_sender) {
+      timing_statistics_sender_ = std::move(timing_sender);
+      OnEnd([this]() { SendMessageTimings(); });
+    }
+
+    // If running with ThreadedBuffering::kYes, will start the processing thread
+    // and queue up messages until the specified time. No-op of
+    // ThreadedBuffering::kNo is set. Should only be called once.
+    void QueueThreadUntil(BootTimestamp time);
+
    private:
+    void TrackMessageSendTiming(
+        const RawSender &sender,
+        monotonic_clock::time_point expected_send_time);
+    void SendMessageTimings();
     // Log file.
     std::unique_ptr<TimestampMapper> timestamp_mapper_;
 
@@ -629,11 +664,38 @@
     std::vector<std::function<void()>> on_starts_;
     std::vector<std::function<void()>> on_ends_;
 
-    bool stopped_ = false;
-    bool started_ = false;
+    std::atomic<bool> stopped_ = false;
+    std::atomic<bool> started_ = false;
 
     bool found_last_message_ = false;
     std::vector<bool> last_message_;
+
+    std::vector<timing::MessageTimingT> send_timings_;
+    aos::Sender<timing::ReplayTiming> timing_statistics_sender_;
+
+    // Protects access to any internal state after Run() is called. Designed
+    // assuming that only one node is actually executing in replay.
+    // Threading design:
+    // * The worker passed to message_queuer_ has full ownership over all
+    //   the log-reading code, timestamp filters, last_queued_message_, etc.
+    // * The main thread should only have exclusive access to the replay
+    //   event loop and associated features (mainly senders).
+    //   It will pop an item out of the queue (which does maintain a shared_ptr
+    //   reference which may also be being used by the message_queuer_ thread,
+    //   but having shared_ptr's accessing the same memory from
+    //   separate threads is permissible).
+    // Enabling this in simulation is currently infeasible due to a lack of
+    // synchronization in the MultiNodeNoncausalOffsetEstimator. Essentially,
+    // when the message_queuer_ thread attempts to read/pop messages from the
+    // timestamp_mapper_, it will end up calling callbacks that update the
+    // internal state of the MultiNodeNoncausalOffsetEstimator. Simultaneously,
+    // the event scheduler that is running in the main thread to orchestrate the
+    // simulation will be querying the estimator to know what the clocks on the
+    // various nodes are at, leading to potential issues.
+    ThreadedBuffering threading_;
+    std::optional<BootTimestamp> last_queued_message_;
+    std::optional<util::ThreadedQueue<TimestampedMessage, BootTimestamp>>
+        message_queuer_;
   };
 
   // Node index -> State.
diff --git a/aos/events/logging/replay_timing.fbs b/aos/events/logging/replay_timing.fbs
new file mode 100644
index 0000000..dbe7159
--- /dev/null
+++ b/aos/events/logging/replay_timing.fbs
@@ -0,0 +1,16 @@
+namespace aos.timing;
+
+table MessageTiming {
+  channel:uint (id: 0);
+  // Expected and actual monotonic send times, in nanoseconds.
+  expected_send_time:int64 (id: 1);
+  actual_send_time:int64 (id: 2);
+  // expected - actual, in seconds (provides no additional information).
+  send_time_error:double (id: 3);
+}
+
+table ReplayTiming {
+  messages:[MessageTiming] (id: 0);
+}
+
+root_type ReplayTiming;