Fix AOS support for realtime replay
This patch fixes the use-case where you provide a single event loop and
just want to replay logged events from the perspective of that event
loop's node.
Includes a test running a ShmEventLoop against a single-node logfile,
and running a multi-node replay into a single EventLoop in
simulation (the choice of single vs multi node here is arbitrary--it
should work with both single and multi node configs in both
simulation and shm).
This has a few caveats:
* Doesn't replay remote timestamps currently.
* Doesn't correct for implied changes in node<->node offsets due to
changes in the clock.
* Had to add a flag to choose how to manage fetcher behavior for
messages from before the start of the log.
Change-Id: I8f101e8774e0923bacc4f7e1bf58c9da02fd0d3f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 1c3a349..a4d6cd0 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -15,6 +15,7 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/flatbuffer_merge.h"
+#include "aos/json_to_flatbuffer.h"
#include "aos/network/multinode_timestamp_filter.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/remote_message_schema.h"
@@ -46,6 +47,12 @@
end_time, "",
"If set, end at this point in time in the log on the realtime clock.");
+DEFINE_bool(drop_realtime_messages_before_start, false,
+ "If set, will drop any messages sent before the start of the "
+ "logfile in realtime replay. Setting this guarantees consistency "
+ "in timing with the original logfile, but means that you lose "
+ "access to fetched low-frequency messages.");
+
namespace aos {
namespace configuration {
// We don't really want to expose this publicly, but log reader doesn't really
@@ -162,6 +169,11 @@
~EventNotifier() { event_timer_->Disable(); }
+ // Sets the clock offset for realtime playback.
+ void SetClockOffset(std::chrono::nanoseconds clock_offset) {
+ clock_offset_ = clock_offset;
+ }
+
// Returns the event trigger time.
realtime_clock::time_point realtime_event_time() const {
return realtime_event_time_;
@@ -189,7 +201,7 @@
// Whops, time went backwards. Just do it now.
HandleTime();
} else {
- event_timer_->Setup(candidate_monotonic);
+ event_timer_->Setup(candidate_monotonic + clock_offset_);
}
}
@@ -208,6 +220,8 @@
const realtime_clock::time_point realtime_event_time_ =
realtime_clock::min_time;
+ std::chrono::nanoseconds clock_offset_{0};
+
bool called_ = false;
};
@@ -325,9 +339,7 @@
}
if (!configuration::MultiNode(configuration())) {
- states_.emplace_back(std::make_unique<State>(
- std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, "")),
- nullptr));
+ states_.resize(1);
} else {
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -494,7 +506,7 @@
filtered_parts.size() == 0u
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
- node);
+ filters_.get(), node);
State *state = states_[node_index].get();
state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -661,8 +673,58 @@
return nullptr;
}
+// TODO(jkuszmaul): Make in-line modifications to
+// ServerStatistics/ClientStatistics messages for ShmEventLoop-based replay to
+// avoid messing up anything that depends on them having valid offsets.
void LogReader::Register(EventLoop *event_loop) {
- Register(event_loop, event_loop->node());
+ filters_ =
+ std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
+ event_loop->configuration(), logged_configuration(),
+ log_files_[0].boots, FLAGS_skip_order_validation,
+ chrono::duration_cast<chrono::nanoseconds>(
+ chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
+
+ std::vector<TimestampMapper *> timestamp_mappers;
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ const size_t node_index =
+ configuration::GetNodeIndex(configuration(), node);
+ std::vector<LogParts> filtered_parts = FilterPartsForNode(
+ log_files_, node != nullptr ? node->name()->string_view() : "");
+
+ states_[node_index] = std::make_unique<State>(
+ filtered_parts.size() == 0u
+ ? nullptr
+ : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+ filters_.get(), node);
+ State *state = states_[node_index].get();
+
+ state->SetChannelCount(logged_configuration()->channels()->size());
+ timestamp_mappers.emplace_back(state->timestamp_mapper());
+ }
+
+ filters_->SetTimestampMappers(std::move(timestamp_mappers));
+
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ const size_t node_index =
+ configuration::GetNodeIndex(configuration(), node);
+ State *state = states_[node_index].get();
+ for (const Node *other_node : configuration::GetNodes(configuration())) {
+ const size_t other_node_index =
+ configuration::GetNodeIndex(configuration(), other_node);
+ State *other_state = states_[other_node_index].get();
+ if (other_state != state) {
+ state->AddPeer(other_state);
+ }
+ }
+ }
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ if (node == nullptr || node->name()->string_view() ==
+ event_loop->node()->name()->string_view()) {
+ Register(event_loop, event_loop->node());
+ } else {
+ Register(nullptr, node);
+ }
+ }
}
void LogReader::Register(EventLoop *event_loop, const Node *node) {
@@ -674,7 +736,10 @@
if (state->OldestMessageTime() == BootTimestamp::max_time()) {
return;
}
- ++live_nodes_;
+
+ if (event_loop != nullptr) {
+ ++live_nodes_;
+ }
if (event_loop_factory_ != nullptr) {
event_loop_factory_->GetNodeEventLoopFactory(node)->OnStartup(
@@ -687,14 +752,14 @@
}
void LogReader::RegisterDuringStartup(EventLoop *event_loop, const Node *node) {
- if (event_loop) {
+ if (event_loop != nullptr) {
CHECK(event_loop->configuration() == configuration());
}
State *state =
states_[configuration::GetNodeIndex(configuration(), node)].get();
- if (!event_loop) {
+ if (event_loop == nullptr) {
state->ClearTimeFlags();
}
@@ -703,7 +768,7 @@
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
// This is only really relevant when we are replaying into a simulation.
- if (event_loop) {
+ if (event_loop != nullptr) {
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
}
@@ -716,10 +781,10 @@
logged_configuration()->channels()->Get(logged_channel_index));
const bool logged = channel->logger() != LoggerConfig::NOT_LOGGED;
-
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
State *source_state = nullptr;
+
if (!configuration::ChannelIsSendableOnNode(channel, node) &&
configuration::ChannelIsReadableOnNode(channel, node)) {
const Node *source_node = configuration::GetNode(
@@ -741,7 +806,10 @@
state->SetChannel(
logged_channel_index,
configuration::ChannelIndex(configuration(), channel),
- event_loop && logged ? event_loop->MakeRawSender(channel) : nullptr,
+ event_loop && logged &&
+ configuration::ChannelIsReadableOnNode(channel, node)
+ ? event_loop->MakeRawSender(channel)
+ : nullptr,
filter, is_forwarded, source_state);
if (is_forwarded && logged) {
@@ -758,10 +826,12 @@
states_[configuration::GetNodeIndex(
configuration(), connection->name()->string_view())]
.get();
- destination_state->SetRemoteTimestampSender(
- logged_channel_index,
- event_loop ? state->RemoteTimestampSender(channel, connection)
- : nullptr);
+ if (destination_state) {
+ destination_state->SetRemoteTimestampSender(
+ logged_channel_index,
+ event_loop ? state->RemoteTimestampSender(channel, connection)
+ : nullptr);
+ }
}
}
}
@@ -775,14 +845,11 @@
}
state->set_timer_handler(event_loop->AddTimer([this, state]() {
- VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
- << "at " << state->event_loop()->context().monotonic_event_time
- << " now " << state->monotonic_now();
if (state->OldestMessageTime() == BootTimestamp::max_time()) {
--live_nodes_;
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
if (exit_on_finish_ && live_nodes_ == 0) {
- event_loop_factory_->Exit();
+ CHECK_NOTNULL(event_loop_factory_)->Exit();
}
return;
}
@@ -794,32 +861,37 @@
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
- if (!FLAGS_skip_order_validation) {
- CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
- << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
- << monotonic_now << " trying to send "
- << timestamped_message.monotonic_event_time << " failure "
- << state->DebugString();
- } else if (BootTimestamp{.boot = state->boot_count(),
- .time = monotonic_now} !=
- timestamped_message.monotonic_event_time) {
- LOG(WARNING) << "Check failed: monotonic_now == "
- "timestamped_message.monotonic_event_time) ("
- << monotonic_now << " vs. "
- << timestamped_message.monotonic_event_time
- << "): " << FlatbufferToJson(state->event_loop()->node())
- << " Now " << monotonic_now << " trying to send "
- << timestamped_message.monotonic_event_time << " failure "
- << state->DebugString();
+ if (event_loop_factory_ != nullptr) {
+ // Only enforce exact timing in simulation.
+ if (!FLAGS_skip_order_validation) {
+ CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
+ << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
+ << monotonic_now << " trying to send "
+ << timestamped_message.monotonic_event_time << " failure "
+ << state->DebugString();
+ } else if (BootTimestamp{.boot = state->boot_count(),
+ .time = monotonic_now} !=
+ timestamped_message.monotonic_event_time) {
+ LOG(WARNING) << "Check failed: monotonic_now == "
+ "timestamped_message.monotonic_event_time) ("
+ << monotonic_now << " vs. "
+ << timestamped_message.monotonic_event_time
+ << "): " << FlatbufferToJson(state->event_loop()->node())
+ << " Now " << monotonic_now << " trying to send "
+ << timestamped_message.monotonic_event_time << " failure "
+ << state->DebugString();
+ }
}
if (timestamped_message.monotonic_event_time.time >
state->monotonic_start_time(
timestamped_message.monotonic_event_time.boot) ||
- event_loop_factory_ != nullptr) {
+ event_loop_factory_ != nullptr ||
+ !FLAGS_drop_realtime_messages_before_start) {
if (timestamped_message.data != nullptr && !state->found_last_message()) {
if (timestamped_message.monotonic_remote_time !=
- BootTimestamp::min_time()) {
+ BootTimestamp::min_time() &&
+ !FLAGS_skip_order_validation && event_loop_factory_ != nullptr) {
// Confirm that the message was sent on the sending node before the
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
@@ -890,7 +962,8 @@
timestamped_message.realtime_event_time);
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
- << timestamped_message.monotonic_event_time;
+ << timestamped_message.monotonic_event_time << " "
+ << state->DebugString();
// TODO(austin): std::move channel_data in and make that efficient in
// simulation.
state->Send(std::move(timestamped_message));
@@ -982,13 +1055,13 @@
}
}
} else {
- LOG(WARNING) << "Not sending data from before the start of the log file. "
- << timestamped_message.monotonic_event_time.time
- .time_since_epoch()
- .count()
- << " start "
- << monotonic_start_time().time_since_epoch().count() << " "
- << *timestamped_message.data;
+ LOG(WARNING)
+ << "Not sending data from before the start of the log file. "
+ << timestamped_message.monotonic_event_time.time.time_since_epoch()
+ .count()
+ << " start "
+ << monotonic_start_time(state->node()).time_since_epoch().count()
+ << " timestamped_message.data is null";
}
const BootTimestamp next_time = state->OldestMessageTime();
@@ -1002,18 +1075,26 @@
state->NotifyLogfileEnd();
return;
}
- VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
- << "wakeup for " << next_time.time << "("
- << state->ToDistributedClock(next_time.time)
- << " distributed), now is " << state->monotonic_now();
+ if (event_loop_factory_ != nullptr) {
+ VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+ << "wakeup for " << next_time.time << "("
+ << state->ToDistributedClock(next_time.time)
+ << " distributed), now is " << state->monotonic_now();
+ } else {
+ VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+ << "wakeup for " << next_time.time << ", now is "
+ << state->monotonic_now();
+ }
state->Setup(next_time.time);
} else {
VLOG(1) << MaybeNodeName(state->event_loop()->node())
<< "No next message, scheduling shutdown";
state->NotifyLogfileEnd();
// Set a timer up immediately after now to die. If we don't do this,
- // then the senders waiting on the message we just read will never get
+ // then the watchers waiting on the message we just read will never get
// called.
+ // Doesn't apply to single-EventLoop replay since the watchers in question
+ // are not under our control.
if (event_loop_factory_ != nullptr) {
state->Setup(monotonic_now + event_loop_factory_->send_delay() +
std::chrono::nanoseconds(1));
@@ -1037,6 +1118,7 @@
event_loop->OnRun([state]() {
BootTimestamp next_time = state->OldestMessageTime();
CHECK_EQ(next_time.boot, state->boot_count());
+ state->SetClockOffset();
state->Setup(next_time.time);
state->SetupStartupTimer();
});
@@ -1451,9 +1533,13 @@
return remapped_channel;
}
-LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper,
- const Node *node)
- : timestamp_mapper_(std::move(timestamp_mapper)), node_(node) {}
+LogReader::State::State(
+ std::unique_ptr<TimestampMapper> timestamp_mapper,
+ message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+ const Node *node)
+ : timestamp_mapper_(std::move(timestamp_mapper)),
+ node_(node),
+ multinode_filters_(multinode_filters) {}
void LogReader::State::AddPeer(State *peer) {
if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1572,6 +1658,23 @@
source_state->boot_count());
}
+ if (event_loop_factory_ != nullptr &&
+ channel_source_state_[timestamped_message.channel_index] != nullptr &&
+ multinode_filters_ != nullptr) {
+ // Sanity check that we are using consistent boot uuids.
+ State *source_state =
+ channel_source_state_[timestamped_message.channel_index];
+ CHECK_EQ(multinode_filters_->boot_uuid(
+ configuration::GetNodeIndex(event_loop_->configuration(),
+ source_state->node()),
+ timestamped_message.monotonic_remote_time.boot),
+ CHECK_NOTNULL(
+ CHECK_NOTNULL(
+ channel_source_state_[timestamped_message.channel_index])
+ ->event_loop_)
+ ->boot_uuid());
+ }
+
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
const auto err = sender->Send(
@@ -1580,9 +1683,13 @@
timestamped_message.monotonic_remote_time.time,
timestamped_message.realtime_remote_time, remote_queue_index,
(channel_source_state_[timestamped_message.channel_index] != nullptr
- ? CHECK_NOTNULL(
- channel_source_state_[timestamped_message.channel_index])
- ->event_loop_->boot_uuid()
+ ? CHECK_NOTNULL(multinode_filters_)
+ ->boot_uuid(configuration::GetNodeIndex(
+ event_loop_->configuration(),
+ channel_source_state_[timestamped_message
+ .channel_index]
+ ->node()),
+ timestamped_message.monotonic_remote_time.boot)
: event_loop_->boot_uuid()));
if (err != RawSender::Error::kOk) return false;
@@ -1629,6 +1736,11 @@
// map.
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
+ // TODO(james): Currently, If running replay against a single event loop,
+ // remote timestamps will not get replayed because this code-path only
+ // gets triggered on the event loop that receives the forwarded message
+ // that the timestamps correspond to. This code, as written, also doesn't
+ // correctly handle a non-zero clock_offset for the *_remote_time fields.
State *source_state =
CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
@@ -1934,5 +2046,20 @@
}
}
+void LogReader::State::SetClockOffset() {
+ if (node_event_loop_factory_ == nullptr) {
+ // If not running with simulated event loop, set the monotonic clock
+ // offset.
+ clock_offset_ = event_loop()->monotonic_now() - monotonic_start_time(0);
+
+ if (start_event_notifier_) {
+ start_event_notifier_->SetClockOffset(clock_offset_);
+ }
+ if (end_event_notifier_) {
+ end_event_notifier_->SetClockOffset(clock_offset_);
+ }
+ }
+}
+
} // namespace logger
} // namespace aos