Run buffering in separate thread for realtime replay
This improves timing accuracy in replay substantially
by running all the buffering up of log messages in a separate thread.
This adds an additional buffer past the channel_storage_duration
required for strict correctness, so that the replay has a higher chance
of keeping up.
The threading model feels a bit tenuous, since I didn't see an obviously
clean way to do this in a way that would've been safe in a multi-node
world.
Change-Id: I471fefd96a4d043766b54dd4488726e24926a95f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index f7a5551..b045e14 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -1,10 +1,11 @@
load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
load("//aos:config.bzl", "aos_config")
+load("//aos:flatbuffers.bzl", "cc_static_flatbuffer")
flatbuffer_cc_library(
name = "logger_fbs",
srcs = ["logger.fbs"],
- gen_reflections = 1,
+ gen_reflections = True,
includes = [
"//aos:configuration_fbs_includes",
],
@@ -12,6 +13,20 @@
visibility = ["//visibility:public"],
)
+flatbuffer_cc_library(
+ name = "replay_timing_fbs",
+ srcs = ["replay_timing.fbs"],
+ gen_reflections = True,
+ target_compatible_with = ["@platforms//os:linux"],
+)
+
+cc_static_flatbuffer(
+ name = "replay_timing_schema",
+ function = "aos::timing::ReplayTimingSchema",
+ target = ":replay_timing_fbs_reflection_out",
+ visibility = ["//visibility:public"],
+)
+
cc_library(
name = "boot_timestamp",
srcs = ["boot_timestamp.cc"],
@@ -265,10 +280,13 @@
":log_writer",
":logfile_utils",
":logger_fbs",
+ ":replay_timing_fbs",
+ "//aos:condition",
"//aos:uuid",
"//aos/events:event_loop",
"//aos/events:shm_event_loop",
"//aos/events:simulated_event_loop",
+ "//aos/mutex",
"//aos/network:message_bridge_server_fbs",
"//aos/network:multinode_timestamp_filter",
"//aos/network:remote_message_fbs",
@@ -277,6 +295,7 @@
"//aos/network:timestamp_filter",
"//aos/time",
"//aos/util:file",
+ "//aos/util:threaded_queue",
"@com_github_google_flatbuffers//:flatbuffers",
"@com_google_absl//absl/strings",
],
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index a4d6cd0..bdee44f 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -53,6 +53,13 @@
"in timing with the original logfile, but means that you lose "
"access to fetched low-frequency messages.");
+DEFINE_double(
+ threaded_look_ahead_seconds, 2.0,
+ "Time, in seconds, to add to look-ahead when using multi-threaded replay. "
+ "Can validly be zero, but higher values are encouraged for realtime replay "
+ "in order to prevent the replay from ever having to block on waiting for "
+ "the reader to find the next message.");
+
namespace aos {
namespace configuration {
// We don't really want to expose this publicly, but log reader doesn't really
@@ -417,6 +424,46 @@
state->OnStart(std::move(fn));
}
+void LogReader::State::QueueThreadUntil(BootTimestamp time) {
+ if (threading_ == ThreadedBuffering::kYes) {
+ CHECK(!message_queuer_.has_value()) << "Can't start thread twice.";
+ message_queuer_.emplace(
+ [this](const BootTimestamp queue_until) {
+ // This will be called whenever anything prompts us for any state
+ // change; there may be wakeups that result in us not having any new
+ // data to push (even if we aren't done), in which case we will return
+ // nullopt but not done().
+ if (last_queued_message_.has_value() &&
+ queue_until < last_queued_message_) {
+ return util::ThreadedQueue<TimestampedMessage,
+ BootTimestamp>::PushResult{
+ std::nullopt, false,
+ last_queued_message_ == BootTimestamp::max_time()};
+ }
+ TimestampedMessage *message = timestamp_mapper_->Front();
+ // Upon reaching the end of the log, exit.
+ if (message == nullptr) {
+ last_queued_message_ = BootTimestamp::max_time();
+ return util::ThreadedQueue<TimestampedMessage,
+ BootTimestamp>::PushResult{std::nullopt,
+ false, true};
+ }
+ last_queued_message_ = message->monotonic_event_time;
+ const util::ThreadedQueue<TimestampedMessage,
+ BootTimestamp>::PushResult result{
+ *message, queue_until >= last_queued_message_, false};
+ timestamp_mapper_->PopFront();
+ SeedSortedMessages();
+ return result;
+ },
+ time);
+ // Spin until the first few seconds of messages are queued up so that we
+ // don't end up with delays/inconsistent timing during the first few seconds
+ // of replay.
+ message_queuer_->WaitForNoMoreWork();
+ }
+}
+
void LogReader::State::OnStart(std::function<void()> fn) {
on_starts_.emplace_back(std::move(fn));
}
@@ -476,6 +523,9 @@
stopped_ = true;
started_ = true;
+ if (message_queuer_.has_value()) {
+ message_queuer_->StopPushing();
+ }
}
void LogReader::Register() {
@@ -502,11 +552,15 @@
std::vector<LogParts> filtered_parts = FilterPartsForNode(
log_files_, node != nullptr ? node->name()->string_view() : "");
+ // We don't run with threading on the buffering for simulated event loops
+ // because we haven't attempted to validate how the interactions beteen the
+ // buffering and the timestamp mapper works when running multiple nodes
+ // concurrently.
states_[node_index] = std::make_unique<State>(
filtered_parts.size() == 0u
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
- filters_.get(), node);
+ filters_.get(), node, State::ThreadedBuffering::kNo);
State *state = states_[node_index].get();
state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -544,7 +598,7 @@
// If we didn't find any log files with data in them, we won't ever get a
// callback or be live. So skip the rest of the setup.
- if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
continue;
}
++live_nodes_;
@@ -695,7 +749,7 @@
filtered_parts.size() == 0u
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
- filters_.get(), node);
+ filters_.get(), node, State::ThreadedBuffering::kYes);
State *state = states_[node_index].get();
state->SetChannelCount(logged_configuration()->channels()->size());
@@ -733,7 +787,7 @@
// If we didn't find any log files with data in them, we won't ever get a
// callback or be live. So skip the rest of the setup.
- if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
return;
}
@@ -845,10 +899,11 @@
}
state->set_timer_handler(event_loop->AddTimer([this, state]() {
- if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ if (state->MultiThreadedOldestMessageTime() == BootTimestamp::max_time()) {
--live_nodes_;
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
- if (exit_on_finish_ && live_nodes_ == 0) {
+ if (exit_on_finish_ && live_nodes_ == 0 &&
+ event_loop_factory_ != nullptr) {
CHECK_NOTNULL(event_loop_factory_)->Exit();
}
return;
@@ -1064,7 +1119,7 @@
<< " timestamped_message.data is null";
}
- const BootTimestamp next_time = state->OldestMessageTime();
+ const BootTimestamp next_time = state->MultiThreadedOldestMessageTime();
if (next_time != BootTimestamp::max_time()) {
if (next_time.boot != state->boot_count()) {
VLOG(1) << "Next message for "
@@ -1085,6 +1140,8 @@
<< "wakeup for " << next_time.time << ", now is "
<< state->monotonic_now();
}
+ // TODO(james): This can result in negative times getting passed-through
+ // in realtime replay.
state->Setup(next_time.time);
} else {
VLOG(1) << MaybeNodeName(state->event_loop()->node())
@@ -1106,7 +1163,9 @@
<< state->monotonic_now();
}));
- if (state->OldestMessageTime() != BootTimestamp::max_time()) {
+ state->SeedSortedMessages();
+
+ if (state->SingleThreadedOldestMessageTime() != BootTimestamp::max_time()) {
state->set_startup_timer(
event_loop->AddTimer([state]() { state->NotifyLogfileStart(); }));
if (start_time_ != realtime_clock::min_time) {
@@ -1116,8 +1175,15 @@
state->SetEndTimeFlag(end_time_);
}
event_loop->OnRun([state]() {
- BootTimestamp next_time = state->OldestMessageTime();
+ BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
CHECK_EQ(next_time.boot, state->boot_count());
+ // Queue up messages and then set clock offsets (we don't want to set
+ // clock offsets before we've done the work of getting the first messages
+ // primed).
+ state->QueueThreadUntil(
+ next_time + std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(
+ FLAGS_threaded_look_ahead_seconds)));
state->SetClockOffset();
state->Setup(next_time.time);
state->SetupStartupTimer();
@@ -1536,10 +1602,11 @@
LogReader::State::State(
std::unique_ptr<TimestampMapper> timestamp_mapper,
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
- const Node *node)
+ const Node *node, LogReader::State::ThreadedBuffering threading)
: timestamp_mapper_(std::move(timestamp_mapper)),
node_(node),
- multinode_filters_(multinode_filters) {}
+ multinode_filters_(multinode_filters),
+ threading_(threading) {}
void LogReader::State::AddPeer(State *peer) {
if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1585,6 +1652,52 @@
factory_channel_index_[logged_channel_index] = factory_channel_index;
}
+void LogReader::State::TrackMessageSendTiming(
+ const RawSender &sender, monotonic_clock::time_point expected_send_time) {
+ if (event_loop_ == nullptr || !timing_statistics_sender_.valid()) {
+ return;
+ }
+
+ timing::MessageTimingT sample;
+ sample.channel = configuration::ChannelIndex(event_loop_->configuration(),
+ sender.channel());
+ sample.expected_send_time = expected_send_time.time_since_epoch().count();
+ sample.actual_send_time =
+ sender.monotonic_sent_time().time_since_epoch().count();
+ sample.send_time_error = aos::time::DurationInSeconds(
+ expected_send_time - sender.monotonic_sent_time());
+ send_timings_.push_back(sample);
+
+ // Somewhat arbitrarily send out timing information in batches of 100. No need
+ // to create excessive overhead in regenerated logfiles.
+ // TODO(james): The overhead may be fine.
+ constexpr size_t kMaxTimesPerStatisticsMessage = 100;
+ CHECK(timing_statistics_sender_.valid());
+ if (send_timings_.size() == kMaxTimesPerStatisticsMessage) {
+ SendMessageTimings();
+ }
+}
+
+void LogReader::State::SendMessageTimings() {
+ if (send_timings_.empty() || !timing_statistics_sender_.valid()) {
+ return;
+ }
+ auto builder = timing_statistics_sender_.MakeBuilder();
+ std::vector<flatbuffers::Offset<timing::MessageTiming>> timing_offsets;
+ for (const auto &timing : send_timings_) {
+ timing_offsets.push_back(
+ timing::MessageTiming::Pack(*builder.fbb(), &timing));
+ }
+ send_timings_.clear();
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<timing::MessageTiming>>>
+ timings_offset = builder.fbb()->CreateVector(timing_offsets);
+ timing::ReplayTiming::Builder timing_builder =
+ builder.MakeBuilder<timing::ReplayTiming>();
+ timing_builder.add_messages(timings_offset);
+ timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
+}
+
bool LogReader::State::Send(const TimestampedMessage ×tamped_message) {
aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
CHECK(sender);
@@ -1692,6 +1805,13 @@
timestamped_message.monotonic_remote_time.boot)
: event_loop_->boot_uuid()));
if (err != RawSender::Error::kOk) return false;
+ if (monotonic_start_time(timestamped_message.monotonic_event_time.boot) <=
+ timestamped_message.monotonic_event_time.time) {
+ // Only track errors for non-fetched messages.
+ TrackMessageSendTiming(
+ *sender,
+ timestamped_message.monotonic_event_time.time + clock_offset());
+ }
if (queue_index_map_[timestamped_message.channel_index]) {
CHECK_EQ(timestamped_message.monotonic_event_time.boot, boot_count());
@@ -1914,27 +2034,55 @@
}
TimestampedMessage LogReader::State::PopOldest() {
- CHECK(timestamp_mapper_ != nullptr);
- TimestampedMessage *result_ptr = timestamp_mapper_->Front();
- CHECK(result_ptr != nullptr);
+ if (message_queuer_.has_value()) {
+ std::optional<TimestampedMessage> message = message_queuer_->Pop();
+ CHECK(message.has_value()) << ": Unexpectedly ran out of messages.";
+ message_queuer_->SetState(
+ message.value().monotonic_event_time +
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(FLAGS_threaded_look_ahead_seconds)));
+ return message.value();
+ } else {
+ CHECK(timestamp_mapper_ != nullptr);
+ TimestampedMessage *result_ptr = timestamp_mapper_->Front();
+ CHECK(result_ptr != nullptr);
- TimestampedMessage result = std::move(*result_ptr);
+ TimestampedMessage result = std::move(*result_ptr);
- VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
- << result.monotonic_event_time;
- timestamp_mapper_->PopFront();
- SeedSortedMessages();
+ VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+ << result.monotonic_event_time;
+ timestamp_mapper_->PopFront();
+ SeedSortedMessages();
- CHECK_EQ(result.monotonic_event_time.boot, boot_count());
+ CHECK_EQ(result.monotonic_event_time.boot, boot_count());
- VLOG(1) << "Popped " << result
- << configuration::CleanedChannelToString(
- event_loop_->configuration()->channels()->Get(
- factory_channel_index_[result.channel_index]));
- return result;
+ VLOG(1) << "Popped " << result
+ << configuration::CleanedChannelToString(
+ event_loop_->configuration()->channels()->Get(
+ factory_channel_index_[result.channel_index]));
+ return result;
+ }
}
-BootTimestamp LogReader::State::OldestMessageTime() {
+BootTimestamp LogReader::State::MultiThreadedOldestMessageTime() {
+ if (!message_queuer_.has_value()) {
+ return SingleThreadedOldestMessageTime();
+ }
+ std::optional<TimestampedMessage> message = message_queuer_->Peek();
+ if (!message.has_value()) {
+ return BootTimestamp::max_time();
+ }
+ if (message.value().monotonic_event_time.boot == boot_count()) {
+ ObserveNextMessage(message.value().monotonic_event_time.time,
+ message.value().realtime_event_time);
+ }
+ return message.value().monotonic_event_time;
+}
+
+BootTimestamp LogReader::State::SingleThreadedOldestMessageTime() {
+ CHECK(!message_queuer_.has_value())
+ << "Cannot use SingleThreadedOldestMessageTime() once the queuer thread "
+ "is created.";
if (timestamp_mapper_ == nullptr) {
return BootTimestamp::max_time();
}
@@ -1944,12 +2092,10 @@
}
VLOG(2) << MaybeNodeName(node()) << "oldest message at "
<< result_ptr->monotonic_event_time.time;
-
if (result_ptr->monotonic_event_time.boot == boot_count()) {
ObserveNextMessage(result_ptr->monotonic_event_time.time,
result_ptr->realtime_event_time);
}
-
return result_ptr->monotonic_event_time;
}
@@ -1974,6 +2120,7 @@
event_loop_ = nullptr;
timer_handler_ = nullptr;
node_event_loop_factory_ = nullptr;
+ timing_statistics_sender_ = Sender<timing::ReplayTiming>();
}
void LogReader::State::SetStartTimeFlag(realtime_clock::time_point start_time) {
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index 1b87e9d..6ea3193 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -4,6 +4,7 @@
#include <chrono>
#include <deque>
#include <string_view>
+#include <queue>
#include <tuple>
#include <vector>
@@ -11,6 +12,7 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/events/logging/replay_timing_generated.h"
#include "aos/events/shm_event_loop.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/message_bridge_server_generated.h"
@@ -19,6 +21,9 @@
#include "aos/network/timestamp_filter.h"
#include "aos/time/time.h"
#include "aos/uuid.h"
+#include "aos/util/threaded_queue.h"
+#include "aos/mutex/mutex.h"
+#include "aos/condition.h"
#include "flatbuffers/flatbuffers.h"
namespace aos {
@@ -98,6 +103,14 @@
// only useful when replaying live.
void Register(EventLoop *event_loop);
+ // Sets a sender that should be used for tracking timing statistics. If not
+ // set, no statistics will be recorded.
+ void set_timing_accuracy_sender(
+ const Node *node, aos::Sender<timing::ReplayTiming> timing_sender) {
+ states_[configuration::GetNodeIndex(configuration(), node)]
+ ->set_timing_accuracy_sender(std::move(timing_sender));
+ }
+
// Called whenever a log file starts for a node.
void OnStart(std::function<void()> fn);
void OnStart(const Node *node, std::function<void()> fn);
@@ -287,9 +300,13 @@
// State per node.
class State {
public:
+ // Whether we should spin up a separate thread for buffering up messages.
+ // Only allowed in realtime replay--see comments on threading_ member for
+ // details.
+ enum class ThreadedBuffering { kYes, kNo };
State(std::unique_ptr<TimestampMapper> timestamp_mapper,
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
- const Node *node);
+ const Node *node, ThreadedBuffering threading);
// Connects up the timestamp mappers.
void AddPeer(State *peer);
@@ -301,7 +318,10 @@
TimestampedMessage PopOldest();
// Returns the monotonic time of the oldest message.
- BootTimestamp OldestMessageTime();
+ BootTimestamp SingleThreadedOldestMessageTime();
+ // Returns the monotonic time of the oldest message, handling querying the
+ // separate thread of ThreadedBuffering was set.
+ BootTimestamp MultiThreadedOldestMessageTime();
size_t boot_count() const {
// If we are replaying directly into an event loop, we can't reboot. So
@@ -544,7 +564,22 @@
return last_message_[channel_index];
}
+ void set_timing_accuracy_sender(
+ aos::Sender<timing::ReplayTiming> timing_sender) {
+ timing_statistics_sender_ = std::move(timing_sender);
+ OnEnd([this]() { SendMessageTimings(); });
+ }
+
+ // If running with ThreadedBuffering::kYes, will start the processing thread
+ // and queue up messages until the specified time. No-op of
+ // ThreadedBuffering::kNo is set. Should only be called once.
+ void QueueThreadUntil(BootTimestamp time);
+
private:
+ void TrackMessageSendTiming(
+ const RawSender &sender,
+ monotonic_clock::time_point expected_send_time);
+ void SendMessageTimings();
// Log file.
std::unique_ptr<TimestampMapper> timestamp_mapper_;
@@ -629,11 +664,38 @@
std::vector<std::function<void()>> on_starts_;
std::vector<std::function<void()>> on_ends_;
- bool stopped_ = false;
- bool started_ = false;
+ std::atomic<bool> stopped_ = false;
+ std::atomic<bool> started_ = false;
bool found_last_message_ = false;
std::vector<bool> last_message_;
+
+ std::vector<timing::MessageTimingT> send_timings_;
+ aos::Sender<timing::ReplayTiming> timing_statistics_sender_;
+
+ // Protects access to any internal state after Run() is called. Designed
+ // assuming that only one node is actually executing in replay.
+ // Threading design:
+ // * The worker passed to message_queuer_ has full ownership over all
+ // the log-reading code, timestamp filters, last_queued_message_, etc.
+ // * The main thread should only have exclusive access to the replay
+ // event loop and associated features (mainly senders).
+ // It will pop an item out of the queue (which does maintain a shared_ptr
+ // reference which may also be being used by the message_queuer_ thread,
+ // but having shared_ptr's accessing the same memory from
+ // separate threads is permissible).
+ // Enabling this in simulation is currently infeasible due to a lack of
+ // synchronization in the MultiNodeNoncausalOffsetEstimator. Essentially,
+ // when the message_queuer_ thread attempts to read/pop messages from the
+ // timestamp_mapper_, it will end up calling callbacks that update the
+ // internal state of the MultiNodeNoncausalOffsetEstimator. Simultaneously,
+ // the event scheduler that is running in the main thread to orchestrate the
+ // simulation will be querying the estimator to know what the clocks on the
+ // various nodes are at, leading to potential issues.
+ ThreadedBuffering threading_;
+ std::optional<BootTimestamp> last_queued_message_;
+ std::optional<util::ThreadedQueue<TimestampedMessage, BootTimestamp>>
+ message_queuer_;
};
// Node index -> State.
diff --git a/aos/events/logging/replay_timing.fbs b/aos/events/logging/replay_timing.fbs
new file mode 100644
index 0000000..dbe7159
--- /dev/null
+++ b/aos/events/logging/replay_timing.fbs
@@ -0,0 +1,16 @@
+namespace aos.timing;
+
+table MessageTiming {
+ channel:uint (id: 0);
+ // Expected and actual monotonic send times, in nanoseconds.
+ expected_send_time:int64 (id: 1);
+ actual_send_time:int64 (id: 2);
+ // expected - actual, in seconds (provides no additional information).
+ send_time_error:double (id: 3);
+}
+
+table ReplayTiming {
+ messages:[MessageTiming] (id: 0);
+}
+
+root_type ReplayTiming;