Run buffering in separate thread for realtime replay

This improves timing accuracy in replay substantially
by running all the buffering up of log messages in a separate thread.
This adds an additional buffer past the channel_storage_duration
required for strict correctness, so that the replay has a higher chance
of keeping up.

The threading model feels a bit tenuous, since I didn't see an obviously
clean way to do this in a way that would've been safe in a multi-node
world.

Change-Id: I471fefd96a4d043766b54dd4488726e24926a95f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 1b87e9d..6ea3193 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -4,6 +4,7 @@
 #include <chrono>
 #include <deque>
 #include <string_view>
+#include <queue>
 #include <tuple>
 #include <vector>
 
@@ -11,6 +12,7 @@
 #include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logfile_utils.h"
 #include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/replay_timing_generated.h"
 #include "aos/events/shm_event_loop.h"
 #include "aos/events/simulated_event_loop.h"
 #include "aos/network/message_bridge_server_generated.h"
@@ -19,6 +21,9 @@
 #include "aos/network/timestamp_filter.h"
 #include "aos/time/time.h"
 #include "aos/uuid.h"
+#include "aos/util/threaded_queue.h"
+#include "aos/mutex/mutex.h"
+#include "aos/condition.h"
 #include "flatbuffers/flatbuffers.h"
 
 namespace aos {
@@ -98,6 +103,14 @@
   // only useful when replaying live.
   void Register(EventLoop *event_loop);
 
+  // Sets a sender that should be used for tracking timing statistics. If not
+  // set, no statistics will be recorded.
+  void set_timing_accuracy_sender(
+      const Node *node, aos::Sender<timing::ReplayTiming> timing_sender) {
+    states_[configuration::GetNodeIndex(configuration(), node)]
+        ->set_timing_accuracy_sender(std::move(timing_sender));
+  }
+
   // Called whenever a log file starts for a node.
   void OnStart(std::function<void()> fn);
   void OnStart(const Node *node, std::function<void()> fn);
@@ -287,9 +300,13 @@
   // State per node.
   class State {
    public:
+    // Whether we should spin up a separate thread for buffering up messages.
+    // Only allowed in realtime replay--see comments on threading_ member for
+    // details.
+    enum class ThreadedBuffering { kYes, kNo };
     State(std::unique_ptr<TimestampMapper> timestamp_mapper,
           message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
-          const Node *node);
+          const Node *node, ThreadedBuffering threading);
 
     // Connects up the timestamp mappers.
     void AddPeer(State *peer);
@@ -301,7 +318,10 @@
     TimestampedMessage PopOldest();
 
     // Returns the monotonic time of the oldest message.
-    BootTimestamp OldestMessageTime();
+    BootTimestamp SingleThreadedOldestMessageTime();
+    // Returns the monotonic time of the oldest message, handling querying the
+    // separate thread of ThreadedBuffering was set.
+    BootTimestamp MultiThreadedOldestMessageTime();
 
     size_t boot_count() const {
       // If we are replaying directly into an event loop, we can't reboot.  So
@@ -544,7 +564,22 @@
       return last_message_[channel_index];
     }
 
+    void set_timing_accuracy_sender(
+        aos::Sender<timing::ReplayTiming> timing_sender) {
+      timing_statistics_sender_ = std::move(timing_sender);
+      OnEnd([this]() { SendMessageTimings(); });
+    }
+
+    // If running with ThreadedBuffering::kYes, will start the processing thread
+    // and queue up messages until the specified time. No-op of
+    // ThreadedBuffering::kNo is set. Should only be called once.
+    void QueueThreadUntil(BootTimestamp time);
+
    private:
+    void TrackMessageSendTiming(
+        const RawSender &sender,
+        monotonic_clock::time_point expected_send_time);
+    void SendMessageTimings();
     // Log file.
     std::unique_ptr<TimestampMapper> timestamp_mapper_;
 
@@ -629,11 +664,38 @@
     std::vector<std::function<void()>> on_starts_;
     std::vector<std::function<void()>> on_ends_;
 
-    bool stopped_ = false;
-    bool started_ = false;
+    std::atomic<bool> stopped_ = false;
+    std::atomic<bool> started_ = false;
 
     bool found_last_message_ = false;
     std::vector<bool> last_message_;
+
+    std::vector<timing::MessageTimingT> send_timings_;
+    aos::Sender<timing::ReplayTiming> timing_statistics_sender_;
+
+    // Protects access to any internal state after Run() is called. Designed
+    // assuming that only one node is actually executing in replay.
+    // Threading design:
+    // * The worker passed to message_queuer_ has full ownership over all
+    //   the log-reading code, timestamp filters, last_queued_message_, etc.
+    // * The main thread should only have exclusive access to the replay
+    //   event loop and associated features (mainly senders).
+    //   It will pop an item out of the queue (which does maintain a shared_ptr
+    //   reference which may also be being used by the message_queuer_ thread,
+    //   but having shared_ptr's accessing the same memory from
+    //   separate threads is permissible).
+    // Enabling this in simulation is currently infeasible due to a lack of
+    // synchronization in the MultiNodeNoncausalOffsetEstimator. Essentially,
+    // when the message_queuer_ thread attempts to read/pop messages from the
+    // timestamp_mapper_, it will end up calling callbacks that update the
+    // internal state of the MultiNodeNoncausalOffsetEstimator. Simultaneously,
+    // the event scheduler that is running in the main thread to orchestrate the
+    // simulation will be querying the estimator to know what the clocks on the
+    // various nodes are at, leading to potential issues.
+    ThreadedBuffering threading_;
+    std::optional<BootTimestamp> last_queued_message_;
+    std::optional<util::ThreadedQueue<TimestampedMessage, BootTimestamp>>
+        message_queuer_;
   };
 
   // Node index -> State.