Run buffering in separate thread for realtime replay
This improves timing accuracy in replay substantially
by running all the buffering up of log messages in a separate thread.
This adds an additional buffer past the channel_storage_duration
required for strict correctness, so that the replay has a higher chance
of keeping up.
The threading model feels a bit tenuous, since I didn't see an obviously
clean way to do this in a way that would've been safe in a multi-node
world.
Change-Id: I471fefd96a4d043766b54dd4488726e24926a95f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index a4d6cd0..bdee44f 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -53,6 +53,13 @@
"in timing with the original logfile, but means that you lose "
"access to fetched low-frequency messages.");
+DEFINE_double(
+ threaded_look_ahead_seconds, 2.0,
+ "Time, in seconds, to add to look-ahead when using multi-threaded replay. "
+ "Can validly be zero, but higher values are encouraged for realtime replay "
+ "in order to prevent the replay from ever having to block on waiting for "
+ "the reader to find the next message.");
+
namespace aos {
namespace configuration {
// We don't really want to expose this publicly, but log reader doesn't really
@@ -417,6 +424,46 @@
state->OnStart(std::move(fn));
}
+void LogReader::State::QueueThreadUntil(BootTimestamp time) {
+ if (threading_ == ThreadedBuffering::kYes) {
+ CHECK(!message_queuer_.has_value()) << "Can't start thread twice.";
+ message_queuer_.emplace(
+ [this](const BootTimestamp queue_until) {
+ // This will be called whenever anything prompts us for any state
+ // change; there may be wakeups that result in us not having any new
+ // data to push (even if we aren't done), in which case we will return
+ // nullopt but not done().
+ if (last_queued_message_.has_value() &&
+ queue_until < last_queued_message_) {
+ return util::ThreadedQueue<TimestampedMessage,
+ BootTimestamp>::PushResult{
+ std::nullopt, false,
+ last_queued_message_ == BootTimestamp::max_time()};
+ }
+ TimestampedMessage *message = timestamp_mapper_->Front();
+ // Upon reaching the end of the log, exit.
+ if (message == nullptr) {
+ last_queued_message_ = BootTimestamp::max_time();
+ return util::ThreadedQueue<TimestampedMessage,
+ BootTimestamp>::PushResult{std::nullopt,
+ false, true};
+ }
+ last_queued_message_ = message->monotonic_event_time;
+ const util::ThreadedQueue<TimestampedMessage,
+ BootTimestamp>::PushResult result{
+ *message, queue_until >= last_queued_message_, false};
+ timestamp_mapper_->PopFront();
+ SeedSortedMessages();
+ return result;
+ },
+ time);
+ // Spin until the first few seconds of messages are queued up so that we
+ // don't end up with delays/inconsistent timing during the first few seconds
+ // of replay.
+ message_queuer_->WaitForNoMoreWork();
+ }
+}
+
void LogReader::State::OnStart(std::function<void()> fn) {
on_starts_.emplace_back(std::move(fn));
}
@@ -476,6 +523,9 @@
stopped_ = true;
started_ = true;
+ if (message_queuer_.has_value()) {
+ message_queuer_->StopPushing();
+ }
}
void LogReader::Register() {
@@ -502,11 +552,15 @@
std::vector<LogParts> filtered_parts = FilterPartsForNode(
log_files_, node != nullptr ? node->name()->string_view() : "");
+ // We don't run with threading on the buffering for simulated event loops
+ // because we haven't attempted to validate how the interactions beteen the
+ // buffering and the timestamp mapper works when running multiple nodes
+ // concurrently.
states_[node_index] = std::make_unique<State>(
filtered_parts.size() == 0u
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
- filters_.get(), node);
+ filters_.get(), node, State::ThreadedBuffering::kNo);
State *state = states_[node_index].get();
state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -544,7 +598,7 @@
// If we didn't find any log files with data in them, we won't ever get a
// callback or be live. So skip the rest of the setup.
- if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
continue;
}
++live_nodes_;
@@ -695,7 +749,7 @@
filtered_parts.size() == 0u
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
- filters_.get(), node);
+ filters_.get(), node, State::ThreadedBuffering::kYes);
State *state = states_[node_index].get();
state->SetChannelCount(logged_configuration()->channels()->size());
@@ -733,7 +787,7 @@
// If we didn't find any log files with data in them, we won't ever get a
// callback or be live. So skip the rest of the setup.
- if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
return;
}
@@ -845,10 +899,11 @@
}
state->set_timer_handler(event_loop->AddTimer([this, state]() {
- if (state->OldestMessageTime() == BootTimestamp::max_time()) {
+ if (state->MultiThreadedOldestMessageTime() == BootTimestamp::max_time()) {
--live_nodes_;
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
- if (exit_on_finish_ && live_nodes_ == 0) {
+ if (exit_on_finish_ && live_nodes_ == 0 &&
+ event_loop_factory_ != nullptr) {
CHECK_NOTNULL(event_loop_factory_)->Exit();
}
return;
@@ -1064,7 +1119,7 @@
<< " timestamped_message.data is null";
}
- const BootTimestamp next_time = state->OldestMessageTime();
+ const BootTimestamp next_time = state->MultiThreadedOldestMessageTime();
if (next_time != BootTimestamp::max_time()) {
if (next_time.boot != state->boot_count()) {
VLOG(1) << "Next message for "
@@ -1085,6 +1140,8 @@
<< "wakeup for " << next_time.time << ", now is "
<< state->monotonic_now();
}
+ // TODO(james): This can result in negative times getting passed-through
+ // in realtime replay.
state->Setup(next_time.time);
} else {
VLOG(1) << MaybeNodeName(state->event_loop()->node())
@@ -1106,7 +1163,9 @@
<< state->monotonic_now();
}));
- if (state->OldestMessageTime() != BootTimestamp::max_time()) {
+ state->SeedSortedMessages();
+
+ if (state->SingleThreadedOldestMessageTime() != BootTimestamp::max_time()) {
state->set_startup_timer(
event_loop->AddTimer([state]() { state->NotifyLogfileStart(); }));
if (start_time_ != realtime_clock::min_time) {
@@ -1116,8 +1175,15 @@
state->SetEndTimeFlag(end_time_);
}
event_loop->OnRun([state]() {
- BootTimestamp next_time = state->OldestMessageTime();
+ BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
CHECK_EQ(next_time.boot, state->boot_count());
+ // Queue up messages and then set clock offsets (we don't want to set
+ // clock offsets before we've done the work of getting the first messages
+ // primed).
+ state->QueueThreadUntil(
+ next_time + std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(
+ FLAGS_threaded_look_ahead_seconds)));
state->SetClockOffset();
state->Setup(next_time.time);
state->SetupStartupTimer();
@@ -1536,10 +1602,11 @@
LogReader::State::State(
std::unique_ptr<TimestampMapper> timestamp_mapper,
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
- const Node *node)
+ const Node *node, LogReader::State::ThreadedBuffering threading)
: timestamp_mapper_(std::move(timestamp_mapper)),
node_(node),
- multinode_filters_(multinode_filters) {}
+ multinode_filters_(multinode_filters),
+ threading_(threading) {}
void LogReader::State::AddPeer(State *peer) {
if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1585,6 +1652,52 @@
factory_channel_index_[logged_channel_index] = factory_channel_index;
}
+void LogReader::State::TrackMessageSendTiming(
+ const RawSender &sender, monotonic_clock::time_point expected_send_time) {
+ if (event_loop_ == nullptr || !timing_statistics_sender_.valid()) {
+ return;
+ }
+
+ timing::MessageTimingT sample;
+ sample.channel = configuration::ChannelIndex(event_loop_->configuration(),
+ sender.channel());
+ sample.expected_send_time = expected_send_time.time_since_epoch().count();
+ sample.actual_send_time =
+ sender.monotonic_sent_time().time_since_epoch().count();
+ sample.send_time_error = aos::time::DurationInSeconds(
+ expected_send_time - sender.monotonic_sent_time());
+ send_timings_.push_back(sample);
+
+ // Somewhat arbitrarily send out timing information in batches of 100. No need
+ // to create excessive overhead in regenerated logfiles.
+ // TODO(james): The overhead may be fine.
+ constexpr size_t kMaxTimesPerStatisticsMessage = 100;
+ CHECK(timing_statistics_sender_.valid());
+ if (send_timings_.size() == kMaxTimesPerStatisticsMessage) {
+ SendMessageTimings();
+ }
+}
+
+void LogReader::State::SendMessageTimings() {
+ if (send_timings_.empty() || !timing_statistics_sender_.valid()) {
+ return;
+ }
+ auto builder = timing_statistics_sender_.MakeBuilder();
+ std::vector<flatbuffers::Offset<timing::MessageTiming>> timing_offsets;
+ for (const auto &timing : send_timings_) {
+ timing_offsets.push_back(
+ timing::MessageTiming::Pack(*builder.fbb(), &timing));
+ }
+ send_timings_.clear();
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<timing::MessageTiming>>>
+ timings_offset = builder.fbb()->CreateVector(timing_offsets);
+ timing::ReplayTiming::Builder timing_builder =
+ builder.MakeBuilder<timing::ReplayTiming>();
+ timing_builder.add_messages(timings_offset);
+ timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
+}
+
bool LogReader::State::Send(const TimestampedMessage ×tamped_message) {
aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
CHECK(sender);
@@ -1692,6 +1805,13 @@
timestamped_message.monotonic_remote_time.boot)
: event_loop_->boot_uuid()));
if (err != RawSender::Error::kOk) return false;
+ if (monotonic_start_time(timestamped_message.monotonic_event_time.boot) <=
+ timestamped_message.monotonic_event_time.time) {
+ // Only track errors for non-fetched messages.
+ TrackMessageSendTiming(
+ *sender,
+ timestamped_message.monotonic_event_time.time + clock_offset());
+ }
if (queue_index_map_[timestamped_message.channel_index]) {
CHECK_EQ(timestamped_message.monotonic_event_time.boot, boot_count());
@@ -1914,27 +2034,55 @@
}
TimestampedMessage LogReader::State::PopOldest() {
- CHECK(timestamp_mapper_ != nullptr);
- TimestampedMessage *result_ptr = timestamp_mapper_->Front();
- CHECK(result_ptr != nullptr);
+ if (message_queuer_.has_value()) {
+ std::optional<TimestampedMessage> message = message_queuer_->Pop();
+ CHECK(message.has_value()) << ": Unexpectedly ran out of messages.";
+ message_queuer_->SetState(
+ message.value().monotonic_event_time +
+ std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::duration<double>(FLAGS_threaded_look_ahead_seconds)));
+ return message.value();
+ } else {
+ CHECK(timestamp_mapper_ != nullptr);
+ TimestampedMessage *result_ptr = timestamp_mapper_->Front();
+ CHECK(result_ptr != nullptr);
- TimestampedMessage result = std::move(*result_ptr);
+ TimestampedMessage result = std::move(*result_ptr);
- VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
- << result.monotonic_event_time;
- timestamp_mapper_->PopFront();
- SeedSortedMessages();
+ VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
+ << result.monotonic_event_time;
+ timestamp_mapper_->PopFront();
+ SeedSortedMessages();
- CHECK_EQ(result.monotonic_event_time.boot, boot_count());
+ CHECK_EQ(result.monotonic_event_time.boot, boot_count());
- VLOG(1) << "Popped " << result
- << configuration::CleanedChannelToString(
- event_loop_->configuration()->channels()->Get(
- factory_channel_index_[result.channel_index]));
- return result;
+ VLOG(1) << "Popped " << result
+ << configuration::CleanedChannelToString(
+ event_loop_->configuration()->channels()->Get(
+ factory_channel_index_[result.channel_index]));
+ return result;
+ }
}
-BootTimestamp LogReader::State::OldestMessageTime() {
+BootTimestamp LogReader::State::MultiThreadedOldestMessageTime() {
+ if (!message_queuer_.has_value()) {
+ return SingleThreadedOldestMessageTime();
+ }
+ std::optional<TimestampedMessage> message = message_queuer_->Peek();
+ if (!message.has_value()) {
+ return BootTimestamp::max_time();
+ }
+ if (message.value().monotonic_event_time.boot == boot_count()) {
+ ObserveNextMessage(message.value().monotonic_event_time.time,
+ message.value().realtime_event_time);
+ }
+ return message.value().monotonic_event_time;
+}
+
+BootTimestamp LogReader::State::SingleThreadedOldestMessageTime() {
+ CHECK(!message_queuer_.has_value())
+ << "Cannot use SingleThreadedOldestMessageTime() once the queuer thread "
+ "is created.";
if (timestamp_mapper_ == nullptr) {
return BootTimestamp::max_time();
}
@@ -1944,12 +2092,10 @@
}
VLOG(2) << MaybeNodeName(node()) << "oldest message at "
<< result_ptr->monotonic_event_time.time;
-
if (result_ptr->monotonic_event_time.boot == boot_count()) {
ObserveNextMessage(result_ptr->monotonic_event_time.time,
result_ptr->realtime_event_time);
}
-
return result_ptr->monotonic_event_time;
}
@@ -1974,6 +2120,7 @@
event_loop_ = nullptr;
timer_handler_ = nullptr;
node_event_loop_factory_ = nullptr;
+ timing_statistics_sender_ = Sender<timing::ReplayTiming>();
}
void LogReader::State::SetStartTimeFlag(realtime_clock::time_point start_time) {