Run buffering in separate thread for realtime replay
This improves timing accuracy in replay substantially
by running all the buffering up of log messages in a separate thread.
This adds an additional buffer past the channel_storage_duration
required for strict correctness, so that the replay has a higher chance
of keeping up.
The threading model feels a bit tenuous, since I didn't see an obviously
clean way to do this in a way that would've been safe in a multi-node
world.
Change-Id: I471fefd96a4d043766b54dd4488726e24926a95f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 1b87e9d..6ea3193 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -4,6 +4,7 @@
#include <chrono>
#include <deque>
#include <string_view>
+#include <queue>
#include <tuple>
#include <vector>
@@ -11,6 +12,7 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/replay_timing_generated.h"
#include "aos/events/shm_event_loop.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/message_bridge_server_generated.h"
@@ -19,6 +21,9 @@
#include "aos/network/timestamp_filter.h"
#include "aos/time/time.h"
#include "aos/uuid.h"
+#include "aos/util/threaded_queue.h"
+#include "aos/mutex/mutex.h"
+#include "aos/condition.h"
#include "flatbuffers/flatbuffers.h"
namespace aos {
@@ -98,6 +103,14 @@
// only useful when replaying live.
void Register(EventLoop *event_loop);
+ // Sets a sender that should be used for tracking timing statistics. If not
+ // set, no statistics will be recorded.
+ void set_timing_accuracy_sender(
+ const Node *node, aos::Sender<timing::ReplayTiming> timing_sender) {
+ states_[configuration::GetNodeIndex(configuration(), node)]
+ ->set_timing_accuracy_sender(std::move(timing_sender));
+ }
+
// Called whenever a log file starts for a node.
void OnStart(std::function<void()> fn);
void OnStart(const Node *node, std::function<void()> fn);
@@ -287,9 +300,13 @@
// State per node.
class State {
public:
+ // Whether we should spin up a separate thread for buffering up messages.
+ // Only allowed in realtime replay--see comments on threading_ member for
+ // details.
+ enum class ThreadedBuffering { kYes, kNo };
State(std::unique_ptr<TimestampMapper> timestamp_mapper,
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
- const Node *node);
+ const Node *node, ThreadedBuffering threading);
// Connects up the timestamp mappers.
void AddPeer(State *peer);
@@ -301,7 +318,10 @@
TimestampedMessage PopOldest();
// Returns the monotonic time of the oldest message.
- BootTimestamp OldestMessageTime();
+ BootTimestamp SingleThreadedOldestMessageTime();
+ // Returns the monotonic time of the oldest message, handling querying the
+ // separate thread of ThreadedBuffering was set.
+ BootTimestamp MultiThreadedOldestMessageTime();
size_t boot_count() const {
// If we are replaying directly into an event loop, we can't reboot. So
@@ -544,7 +564,22 @@
return last_message_[channel_index];
}
+ void set_timing_accuracy_sender(
+ aos::Sender<timing::ReplayTiming> timing_sender) {
+ timing_statistics_sender_ = std::move(timing_sender);
+ OnEnd([this]() { SendMessageTimings(); });
+ }
+
+ // If running with ThreadedBuffering::kYes, will start the processing thread
+ // and queue up messages until the specified time. No-op of
+ // ThreadedBuffering::kNo is set. Should only be called once.
+ void QueueThreadUntil(BootTimestamp time);
+
private:
+ void TrackMessageSendTiming(
+ const RawSender &sender,
+ monotonic_clock::time_point expected_send_time);
+ void SendMessageTimings();
// Log file.
std::unique_ptr<TimestampMapper> timestamp_mapper_;
@@ -629,11 +664,38 @@
std::vector<std::function<void()>> on_starts_;
std::vector<std::function<void()>> on_ends_;
- bool stopped_ = false;
- bool started_ = false;
+ std::atomic<bool> stopped_ = false;
+ std::atomic<bool> started_ = false;
bool found_last_message_ = false;
std::vector<bool> last_message_;
+
+ std::vector<timing::MessageTimingT> send_timings_;
+ aos::Sender<timing::ReplayTiming> timing_statistics_sender_;
+
+ // Protects access to any internal state after Run() is called. Designed
+ // assuming that only one node is actually executing in replay.
+ // Threading design:
+ // * The worker passed to message_queuer_ has full ownership over all
+ // the log-reading code, timestamp filters, last_queued_message_, etc.
+ // * The main thread should only have exclusive access to the replay
+ // event loop and associated features (mainly senders).
+ // It will pop an item out of the queue (which does maintain a shared_ptr
+ // reference which may also be being used by the message_queuer_ thread,
+ // but having shared_ptr's accessing the same memory from
+ // separate threads is permissible).
+ // Enabling this in simulation is currently infeasible due to a lack of
+ // synchronization in the MultiNodeNoncausalOffsetEstimator. Essentially,
+ // when the message_queuer_ thread attempts to read/pop messages from the
+ // timestamp_mapper_, it will end up calling callbacks that update the
+ // internal state of the MultiNodeNoncausalOffsetEstimator. Simultaneously,
+ // the event scheduler that is running in the main thread to orchestrate the
+ // simulation will be querying the estimator to know what the clocks on the
+ // various nodes are at, leading to potential issues.
+ ThreadedBuffering threading_;
+ std::optional<BootTimestamp> last_queued_message_;
+ std::optional<util::ThreadedQueue<TimestampedMessage, BootTimestamp>>
+ message_queuer_;
};
// Node index -> State.