Fix AOS support for realtime replay
This patch fixes the use-case where you provide a single event loop and
just want to replay logged events from the perspective of that event
loop's node.
Includes a test running a ShmEventLoop against a single-node logfile,
and running a multi-node replay into a single EventLoop in
simulation (the choice of single vs multi node here is arbitrary--it
should work with both single and multi node configs in both
simulation and shm).
This has a few caveats:
* Doesn't replay remote timestamps currently.
* Doesn't correct for implied changes in node<->node offsets due to
changes in the clock.
* Had to add a flag to choose how to manage fetcher behavior for
messages from before the start of the log.
Change-Id: I8f101e8774e0923bacc4f7e1bf58c9da02fd0d3f
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 9f6234a..f7a5551 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -267,6 +267,7 @@
":logger_fbs",
"//aos:uuid",
"//aos/events:event_loop",
+ "//aos/events:shm_event_loop",
"//aos/events:simulated_event_loop",
"//aos/network:message_bridge_server_fbs",
"//aos/network:multinode_timestamp_filter",
@@ -447,6 +448,26 @@
)
cc_test(
+ name = "realtime_replay_test",
+ srcs = ["realtime_replay_test.cc"],
+ data = [
+ "//aos/events:pingpong_config",
+ ],
+ target_compatible_with = ["@platforms//os:linux"],
+ deps = [
+ ":log_reader",
+ ":log_writer",
+ "//aos/events:ping_lib",
+ "//aos/events:pong_lib",
+ "//aos/events:shm_event_loop",
+ "//aos/events:simulated_event_loop",
+ "//aos/testing:googletest",
+ "//aos/testing:path",
+ "//aos/testing:tmpdir",
+ ],
+)
+
+cc_test(
name = "logger_test",
srcs = ["logger_test.cc"],
copts = select({
diff --git a/aos/events/logging/log_reader.cc b/aos/events/logging/log_reader.cc
index 1c3a349..a4d6cd0 100644
--- a/aos/events/logging/log_reader.cc
+++ b/aos/events/logging/log_reader.cc
@@ -15,6 +15,7 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/flatbuffer_merge.h"
+#include "aos/json_to_flatbuffer.h"
#include "aos/network/multinode_timestamp_filter.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/remote_message_schema.h"
@@ -46,6 +47,12 @@
end_time, "",
"If set, end at this point in time in the log on the realtime clock.");
+DEFINE_bool(drop_realtime_messages_before_start, false,
+ "If set, will drop any messages sent before the start of the "
+ "logfile in realtime replay. Setting this guarantees consistency "
+ "in timing with the original logfile, but means that you lose "
+ "access to fetched low-frequency messages.");
+
namespace aos {
namespace configuration {
// We don't really want to expose this publicly, but log reader doesn't really
@@ -162,6 +169,11 @@
~EventNotifier() { event_timer_->Disable(); }
+ // Sets the clock offset for realtime playback.
+ void SetClockOffset(std::chrono::nanoseconds clock_offset) {
+ clock_offset_ = clock_offset;
+ }
+
// Returns the event trigger time.
realtime_clock::time_point realtime_event_time() const {
return realtime_event_time_;
@@ -189,7 +201,7 @@
// Whops, time went backwards. Just do it now.
HandleTime();
} else {
- event_timer_->Setup(candidate_monotonic);
+ event_timer_->Setup(candidate_monotonic + clock_offset_);
}
}
@@ -208,6 +220,8 @@
const realtime_clock::time_point realtime_event_time_ =
realtime_clock::min_time;
+ std::chrono::nanoseconds clock_offset_{0};
+
bool called_ = false;
};
@@ -325,9 +339,7 @@
}
if (!configuration::MultiNode(configuration())) {
- states_.emplace_back(std::make_unique<State>(
- std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, "")),
- nullptr));
+ states_.resize(1);
} else {
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -494,7 +506,7 @@
filtered_parts.size() == 0u
? nullptr
: std::make_unique<TimestampMapper>(std::move(filtered_parts)),
- node);
+ filters_.get(), node);
State *state = states_[node_index].get();
state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node),
@@ -661,8 +673,58 @@
return nullptr;
}
+// TODO(jkuszmaul): Make in-line modifications to
+// ServerStatistics/ClientStatistics messages for ShmEventLoop-based replay to
+// avoid messing up anything that depends on them having valid offsets.
void LogReader::Register(EventLoop *event_loop) {
- Register(event_loop, event_loop->node());
+ filters_ =
+ std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
+ event_loop->configuration(), logged_configuration(),
+ log_files_[0].boots, FLAGS_skip_order_validation,
+ chrono::duration_cast<chrono::nanoseconds>(
+ chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
+
+ std::vector<TimestampMapper *> timestamp_mappers;
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ const size_t node_index =
+ configuration::GetNodeIndex(configuration(), node);
+ std::vector<LogParts> filtered_parts = FilterPartsForNode(
+ log_files_, node != nullptr ? node->name()->string_view() : "");
+
+ states_[node_index] = std::make_unique<State>(
+ filtered_parts.size() == 0u
+ ? nullptr
+ : std::make_unique<TimestampMapper>(std::move(filtered_parts)),
+ filters_.get(), node);
+ State *state = states_[node_index].get();
+
+ state->SetChannelCount(logged_configuration()->channels()->size());
+ timestamp_mappers.emplace_back(state->timestamp_mapper());
+ }
+
+ filters_->SetTimestampMappers(std::move(timestamp_mappers));
+
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ const size_t node_index =
+ configuration::GetNodeIndex(configuration(), node);
+ State *state = states_[node_index].get();
+ for (const Node *other_node : configuration::GetNodes(configuration())) {
+ const size_t other_node_index =
+ configuration::GetNodeIndex(configuration(), other_node);
+ State *other_state = states_[other_node_index].get();
+ if (other_state != state) {
+ state->AddPeer(other_state);
+ }
+ }
+ }
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ if (node == nullptr || node->name()->string_view() ==
+ event_loop->node()->name()->string_view()) {
+ Register(event_loop, event_loop->node());
+ } else {
+ Register(nullptr, node);
+ }
+ }
}
void LogReader::Register(EventLoop *event_loop, const Node *node) {
@@ -674,7 +736,10 @@
if (state->OldestMessageTime() == BootTimestamp::max_time()) {
return;
}
- ++live_nodes_;
+
+ if (event_loop != nullptr) {
+ ++live_nodes_;
+ }
if (event_loop_factory_ != nullptr) {
event_loop_factory_->GetNodeEventLoopFactory(node)->OnStartup(
@@ -687,14 +752,14 @@
}
void LogReader::RegisterDuringStartup(EventLoop *event_loop, const Node *node) {
- if (event_loop) {
+ if (event_loop != nullptr) {
CHECK(event_loop->configuration() == configuration());
}
State *state =
states_[configuration::GetNodeIndex(configuration(), node)].get();
- if (!event_loop) {
+ if (event_loop == nullptr) {
state->ClearTimeFlags();
}
@@ -703,7 +768,7 @@
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
// This is only really relevant when we are replaying into a simulation.
- if (event_loop) {
+ if (event_loop != nullptr) {
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
}
@@ -716,10 +781,10 @@
logged_configuration()->channels()->Get(logged_channel_index));
const bool logged = channel->logger() != LoggerConfig::NOT_LOGGED;
-
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
State *source_state = nullptr;
+
if (!configuration::ChannelIsSendableOnNode(channel, node) &&
configuration::ChannelIsReadableOnNode(channel, node)) {
const Node *source_node = configuration::GetNode(
@@ -741,7 +806,10 @@
state->SetChannel(
logged_channel_index,
configuration::ChannelIndex(configuration(), channel),
- event_loop && logged ? event_loop->MakeRawSender(channel) : nullptr,
+ event_loop && logged &&
+ configuration::ChannelIsReadableOnNode(channel, node)
+ ? event_loop->MakeRawSender(channel)
+ : nullptr,
filter, is_forwarded, source_state);
if (is_forwarded && logged) {
@@ -758,10 +826,12 @@
states_[configuration::GetNodeIndex(
configuration(), connection->name()->string_view())]
.get();
- destination_state->SetRemoteTimestampSender(
- logged_channel_index,
- event_loop ? state->RemoteTimestampSender(channel, connection)
- : nullptr);
+ if (destination_state) {
+ destination_state->SetRemoteTimestampSender(
+ logged_channel_index,
+ event_loop ? state->RemoteTimestampSender(channel, connection)
+ : nullptr);
+ }
}
}
}
@@ -775,14 +845,11 @@
}
state->set_timer_handler(event_loop->AddTimer([this, state]() {
- VLOG(1) << "Starting sending " << MaybeNodeName(state->event_loop()->node())
- << "at " << state->event_loop()->context().monotonic_event_time
- << " now " << state->monotonic_now();
if (state->OldestMessageTime() == BootTimestamp::max_time()) {
--live_nodes_;
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Node down!";
if (exit_on_finish_ && live_nodes_ == 0) {
- event_loop_factory_->Exit();
+ CHECK_NOTNULL(event_loop_factory_)->Exit();
}
return;
}
@@ -794,32 +861,37 @@
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
- if (!FLAGS_skip_order_validation) {
- CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
- << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
- << monotonic_now << " trying to send "
- << timestamped_message.monotonic_event_time << " failure "
- << state->DebugString();
- } else if (BootTimestamp{.boot = state->boot_count(),
- .time = monotonic_now} !=
- timestamped_message.monotonic_event_time) {
- LOG(WARNING) << "Check failed: monotonic_now == "
- "timestamped_message.monotonic_event_time) ("
- << monotonic_now << " vs. "
- << timestamped_message.monotonic_event_time
- << "): " << FlatbufferToJson(state->event_loop()->node())
- << " Now " << monotonic_now << " trying to send "
- << timestamped_message.monotonic_event_time << " failure "
- << state->DebugString();
+ if (event_loop_factory_ != nullptr) {
+ // Only enforce exact timing in simulation.
+ if (!FLAGS_skip_order_validation) {
+ CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
+ << ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
+ << monotonic_now << " trying to send "
+ << timestamped_message.monotonic_event_time << " failure "
+ << state->DebugString();
+ } else if (BootTimestamp{.boot = state->boot_count(),
+ .time = monotonic_now} !=
+ timestamped_message.monotonic_event_time) {
+ LOG(WARNING) << "Check failed: monotonic_now == "
+ "timestamped_message.monotonic_event_time) ("
+ << monotonic_now << " vs. "
+ << timestamped_message.monotonic_event_time
+ << "): " << FlatbufferToJson(state->event_loop()->node())
+ << " Now " << monotonic_now << " trying to send "
+ << timestamped_message.monotonic_event_time << " failure "
+ << state->DebugString();
+ }
}
if (timestamped_message.monotonic_event_time.time >
state->monotonic_start_time(
timestamped_message.monotonic_event_time.boot) ||
- event_loop_factory_ != nullptr) {
+ event_loop_factory_ != nullptr ||
+ !FLAGS_drop_realtime_messages_before_start) {
if (timestamped_message.data != nullptr && !state->found_last_message()) {
if (timestamped_message.monotonic_remote_time !=
- BootTimestamp::min_time()) {
+ BootTimestamp::min_time() &&
+ !FLAGS_skip_order_validation && event_loop_factory_ != nullptr) {
// Confirm that the message was sent on the sending node before the
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
@@ -890,7 +962,8 @@
timestamped_message.realtime_event_time);
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
- << timestamped_message.monotonic_event_time;
+ << timestamped_message.monotonic_event_time << " "
+ << state->DebugString();
// TODO(austin): std::move channel_data in and make that efficient in
// simulation.
state->Send(std::move(timestamped_message));
@@ -982,13 +1055,13 @@
}
}
} else {
- LOG(WARNING) << "Not sending data from before the start of the log file. "
- << timestamped_message.monotonic_event_time.time
- .time_since_epoch()
- .count()
- << " start "
- << monotonic_start_time().time_since_epoch().count() << " "
- << *timestamped_message.data;
+ LOG(WARNING)
+ << "Not sending data from before the start of the log file. "
+ << timestamped_message.monotonic_event_time.time.time_since_epoch()
+ .count()
+ << " start "
+ << monotonic_start_time(state->node()).time_since_epoch().count()
+ << " timestamped_message.data is null";
}
const BootTimestamp next_time = state->OldestMessageTime();
@@ -1002,18 +1075,26 @@
state->NotifyLogfileEnd();
return;
}
- VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
- << "wakeup for " << next_time.time << "("
- << state->ToDistributedClock(next_time.time)
- << " distributed), now is " << state->monotonic_now();
+ if (event_loop_factory_ != nullptr) {
+ VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+ << "wakeup for " << next_time.time << "("
+ << state->ToDistributedClock(next_time.time)
+ << " distributed), now is " << state->monotonic_now();
+ } else {
+ VLOG(1) << "Scheduling " << MaybeNodeName(state->event_loop()->node())
+ << "wakeup for " << next_time.time << ", now is "
+ << state->monotonic_now();
+ }
state->Setup(next_time.time);
} else {
VLOG(1) << MaybeNodeName(state->event_loop()->node())
<< "No next message, scheduling shutdown";
state->NotifyLogfileEnd();
// Set a timer up immediately after now to die. If we don't do this,
- // then the senders waiting on the message we just read will never get
+ // then the watchers waiting on the message we just read will never get
// called.
+ // Doesn't apply to single-EventLoop replay since the watchers in question
+ // are not under our control.
if (event_loop_factory_ != nullptr) {
state->Setup(monotonic_now + event_loop_factory_->send_delay() +
std::chrono::nanoseconds(1));
@@ -1037,6 +1118,7 @@
event_loop->OnRun([state]() {
BootTimestamp next_time = state->OldestMessageTime();
CHECK_EQ(next_time.boot, state->boot_count());
+ state->SetClockOffset();
state->Setup(next_time.time);
state->SetupStartupTimer();
});
@@ -1451,9 +1533,13 @@
return remapped_channel;
}
-LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper,
- const Node *node)
- : timestamp_mapper_(std::move(timestamp_mapper)), node_(node) {}
+LogReader::State::State(
+ std::unique_ptr<TimestampMapper> timestamp_mapper,
+ message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+ const Node *node)
+ : timestamp_mapper_(std::move(timestamp_mapper)),
+ node_(node),
+ multinode_filters_(multinode_filters) {}
void LogReader::State::AddPeer(State *peer) {
if (timestamp_mapper_ && peer->timestamp_mapper_) {
@@ -1572,6 +1658,23 @@
source_state->boot_count());
}
+ if (event_loop_factory_ != nullptr &&
+ channel_source_state_[timestamped_message.channel_index] != nullptr &&
+ multinode_filters_ != nullptr) {
+ // Sanity check that we are using consistent boot uuids.
+ State *source_state =
+ channel_source_state_[timestamped_message.channel_index];
+ CHECK_EQ(multinode_filters_->boot_uuid(
+ configuration::GetNodeIndex(event_loop_->configuration(),
+ source_state->node()),
+ timestamped_message.monotonic_remote_time.boot),
+ CHECK_NOTNULL(
+ CHECK_NOTNULL(
+ channel_source_state_[timestamped_message.channel_index])
+ ->event_loop_)
+ ->boot_uuid());
+ }
+
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
const auto err = sender->Send(
@@ -1580,9 +1683,13 @@
timestamped_message.monotonic_remote_time.time,
timestamped_message.realtime_remote_time, remote_queue_index,
(channel_source_state_[timestamped_message.channel_index] != nullptr
- ? CHECK_NOTNULL(
- channel_source_state_[timestamped_message.channel_index])
- ->event_loop_->boot_uuid()
+ ? CHECK_NOTNULL(multinode_filters_)
+ ->boot_uuid(configuration::GetNodeIndex(
+ event_loop_->configuration(),
+ channel_source_state_[timestamped_message
+ .channel_index]
+ ->node()),
+ timestamped_message.monotonic_remote_time.boot)
: event_loop_->boot_uuid()));
if (err != RawSender::Error::kOk) return false;
@@ -1629,6 +1736,11 @@
// map.
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
+ // TODO(james): Currently, If running replay against a single event loop,
+ // remote timestamps will not get replayed because this code-path only
+ // gets triggered on the event loop that receives the forwarded message
+ // that the timestamps correspond to. This code, as written, also doesn't
+ // correctly handle a non-zero clock_offset for the *_remote_time fields.
State *source_state =
CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
@@ -1934,5 +2046,20 @@
}
}
+void LogReader::State::SetClockOffset() {
+ if (node_event_loop_factory_ == nullptr) {
+ // If not running with simulated event loop, set the monotonic clock
+ // offset.
+ clock_offset_ = event_loop()->monotonic_now() - monotonic_start_time(0);
+
+ if (start_event_notifier_) {
+ start_event_notifier_->SetClockOffset(clock_offset_);
+ }
+ if (end_event_notifier_) {
+ end_event_notifier_->SetClockOffset(clock_offset_);
+ }
+ }
+}
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/log_reader.h b/aos/events/logging/log_reader.h
index c1333c6..2c31e0f 100644
--- a/aos/events/logging/log_reader.h
+++ b/aos/events/logging/log_reader.h
@@ -11,6 +11,7 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logfile_utils.h"
#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
#include "aos/events/simulated_event_loop.h"
#include "aos/network/message_bridge_server_generated.h"
#include "aos/network/multinode_timestamp_filter.h"
@@ -92,6 +93,7 @@
// Creates an SimulatedEventLoopFactory accessible via event_loop_factory(),
// and then calls Register.
void Register();
+
// Registers callbacks for all the events after the log file starts. This is
// only useful when replaying live.
void Register(EventLoop *event_loop);
@@ -285,7 +287,9 @@
// State per node.
class State {
public:
- State(std::unique_ptr<TimestampMapper> timestamp_mapper, const Node *node);
+ State(std::unique_ptr<TimestampMapper> timestamp_mapper,
+ message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
+ const Node *node);
// Connects up the timestamp mappers.
void AddPeer(State *peer);
@@ -302,7 +306,17 @@
size_t boot_count() const {
// If we are replaying directly into an event loop, we can't reboot. So
// we will stay stuck on the 0th boot.
- if (!node_event_loop_factory_) return 0u;
+ if (!node_event_loop_factory_) {
+ if (event_loop_ == nullptr) {
+ // If boot_count is being checked after startup for any of the
+ // non-primary nodes, then returning 0 may not be accurate (since
+ // remote nodes *can* reboot even if the EventLoop being played to
+ // can't).
+ CHECK(!started_);
+ CHECK(!stopped_);
+ }
+ return 0u;
+ }
return node_event_loop_factory_->boot_count();
}
@@ -319,8 +333,10 @@
NotifyLogfileStart();
return;
}
- CHECK_GE(start_time, event_loop_->monotonic_now());
- startup_timer_->Setup(start_time);
+ if (node_event_loop_factory_) {
+ CHECK_GE(start_time + clock_offset(), event_loop_->monotonic_now());
+ }
+ startup_timer_->Setup(start_time + clock_offset());
}
void set_startup_timer(TimerHandler *timer_handler) {
@@ -382,6 +398,7 @@
// distributed clock.
distributed_clock::time_point ToDistributedClock(
monotonic_clock::time_point time) {
+ CHECK(node_event_loop_factory_);
return node_event_loop_factory_->ToDistributedClock(time);
}
@@ -415,6 +432,7 @@
distributed_clock::time_point RemoteToDistributedClock(
size_t channel_index, monotonic_clock::time_point time) {
+ CHECK(node_event_loop_factory_);
return channel_source_state_[channel_index]
->node_event_loop_factory_->ToDistributedClock(time);
}
@@ -425,7 +443,7 @@
}
monotonic_clock::time_point monotonic_now() const {
- return node_event_loop_factory_->monotonic_now();
+ return event_loop_->monotonic_now();
}
// Sets the number of channels.
@@ -487,12 +505,15 @@
// Sets the next wakeup time on the replay callback.
void Setup(monotonic_clock::time_point next_time) {
- timer_handler_->Setup(next_time);
+ timer_handler_->Setup(next_time + clock_offset());
}
// Sends a buffer on the provided channel index.
bool Send(const TimestampedMessage ×tamped_message);
+ void SetClockOffset();
+ std::chrono::nanoseconds clock_offset() const { return clock_offset_; }
+
// Returns a debug string for the channel merger.
std::string DebugString() const {
if (!timestamp_mapper_) {
@@ -582,6 +603,7 @@
// going between 2 nodes. The second element in the tuple indicates if this
// is the primary direction or not.
std::vector<message_bridge::NoncausalOffsetEstimator *> filters_;
+ message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters_;
// List of NodeEventLoopFactorys (or nullptr if it isn't a forwarded
// channel) which correspond to the originating node.
@@ -598,6 +620,11 @@
absl::btree_map<const Channel *, std::shared_ptr<RemoteMessageSender>>
timestamp_loggers_;
+ // Time offset between the log's monotonic clock and the current event
+ // loop's monotonic clock. Useful when replaying logs with non-simulated
+ // event loops.
+ std::chrono::nanoseconds clock_offset_{0};
+
std::vector<std::function<void()>> on_starts_;
std::vector<std::function<void()>> on_ends_;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 5f265e2..02081fd 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -2192,6 +2192,149 @@
reader.Deregister();
}
+// Tests that we observe all the same events in log replay (for a given node)
+// whether we just register an event loop for that node or if we register a full
+// event loop factory.
+TEST_P(MultinodeLoggerTest, SingleNodeReplay) {
+ time_converter_.StartEqual();
+ constexpr chrono::milliseconds kStartupDelay(95);
+ {
+ LoggerState pi1_logger = MakeLogger(pi1_);
+ LoggerState pi2_logger = MakeLogger(pi2_);
+
+ event_loop_factory_.RunFor(kStartupDelay);
+
+ StartLogger(&pi1_logger);
+ StartLogger(&pi2_logger);
+
+ event_loop_factory_.RunFor(chrono::milliseconds(20000));
+ }
+
+ LogReader full_reader(SortParts(logfiles_));
+ LogReader single_node_reader(SortParts(logfiles_));
+
+ SimulatedEventLoopFactory full_factory(full_reader.configuration());
+ SimulatedEventLoopFactory single_node_factory(
+ single_node_reader.configuration());
+ std::unique_ptr<EventLoop> replay_event_loop =
+ single_node_factory.GetNodeEventLoopFactory("pi1")->MakeEventLoop(
+ "log_reader");
+
+ full_reader.Register(&full_factory);
+ single_node_reader.Register(replay_event_loop.get());
+ single_node_factory.SkipTimingReport();
+ single_node_factory.DisableStatistics();
+
+ const Node *full_pi1 =
+ configuration::GetNode(full_factory.configuration(), "pi1");
+
+ // Confirm we can read the data on the remapped channel, just for pi1. Nothing
+ // else should have moved.
+ std::unique_ptr<EventLoop> full_event_loop =
+ full_factory.MakeEventLoop("test", full_pi1);
+ full_event_loop->SkipTimingReport();
+ full_event_loop->SkipAosLog();
+ // maps are indexed on channel index.
+ // observed_messages: {channel_index: [(message_sent_time, was_fetched),...]}
+ std::map<size_t, std::vector<std::pair<monotonic_clock::time_point, bool>>>
+ observed_messages;
+ std::map<size_t, std::unique_ptr<RawFetcher>> fetchers;
+ for (size_t ii = 0; ii < full_event_loop->configuration()->channels()->size();
+ ++ii) {
+ const Channel *channel =
+ full_event_loop->configuration()->channels()->Get(ii);
+ // We currently don't support replaying remote timestamp channels in
+ // realtime replay.
+ if (channel->name()->string_view().find("remote_timestamp") !=
+ std::string_view::npos) {
+ continue;
+ }
+ if (configuration::ChannelIsReadableOnNode(channel, full_pi1)) {
+ observed_messages[ii] = {};
+ fetchers[ii] = full_event_loop->MakeRawFetcher(channel);
+ full_event_loop->OnRun([ii, &observed_messages, &fetchers]() {
+ if (fetchers[ii]->Fetch()) {
+ observed_messages[ii].push_back(std::make_pair(
+ fetchers[ii]->context().monotonic_event_time, true));
+ }
+ });
+ full_event_loop->MakeRawNoArgWatcher(
+ channel, [ii, &observed_messages](const Context &context) {
+ observed_messages[ii].push_back(
+ std::make_pair(context.monotonic_event_time, false));
+ });
+ }
+ }
+
+ full_factory.Run();
+ fetchers.clear();
+ full_reader.Deregister();
+
+ const Node *single_node_pi1 =
+ configuration::GetNode(single_node_factory.configuration(), "pi1");
+ std::map<size_t, std::unique_ptr<RawFetcher>> single_node_fetchers;
+
+ std::unique_ptr<EventLoop> single_node_event_loop =
+ single_node_factory.MakeEventLoop("test", single_node_pi1);
+ single_node_event_loop->SkipTimingReport();
+ single_node_event_loop->SkipAosLog();
+ for (size_t ii = 0;
+ ii < single_node_event_loop->configuration()->channels()->size(); ++ii) {
+ const Channel *channel =
+ single_node_event_loop->configuration()->channels()->Get(ii);
+ single_node_factory.DisableForwarding(channel);
+ if (configuration::ChannelIsReadableOnNode(channel, single_node_pi1)) {
+ single_node_fetchers[ii] =
+ single_node_event_loop->MakeRawFetcher(channel);
+ single_node_event_loop->OnRun([channel, ii, &single_node_fetchers]() {
+ EXPECT_FALSE(single_node_fetchers[ii]->Fetch())
+ << "Single EventLoop replay doesn't support pre-loading fetchers. "
+ << configuration::StrippedChannelToString(channel);
+ });
+ single_node_event_loop->MakeRawNoArgWatcher(
+ channel, [ii, &observed_messages, channel,
+ kStartupDelay](const Context &context) {
+ if (observed_messages[ii].empty()) {
+ FAIL() << "Observed extra message at "
+ << context.monotonic_event_time << " on "
+ << configuration::StrippedChannelToString(channel);
+ return;
+ }
+ const std::pair<monotonic_clock::time_point, bool> &message =
+ observed_messages[ii].front();
+ if (message.second) {
+ EXPECT_LE(message.first,
+ context.monotonic_event_time + kStartupDelay)
+ << "Mismatched message times " << context.monotonic_event_time
+ << " and " << message.first << " on "
+ << configuration::StrippedChannelToString(channel);
+ } else {
+ EXPECT_EQ(message.first,
+ context.monotonic_event_time + kStartupDelay)
+ << "Mismatched message times " << context.monotonic_event_time
+ << " and " << message.first << " on "
+ << configuration::StrippedChannelToString(channel);
+ }
+ observed_messages[ii].erase(observed_messages[ii].begin());
+ });
+ }
+ }
+
+ single_node_factory.Run();
+
+ single_node_fetchers.clear();
+
+ single_node_reader.Deregister();
+
+ for (const auto &pair : observed_messages) {
+ EXPECT_TRUE(pair.second.empty())
+ << "Missed " << pair.second.size() << " messages on "
+ << configuration::StrippedChannelToString(
+ single_node_event_loop->configuration()->channels()->Get(
+ pair.first));
+ }
+}
+
// Tests that we properly recreate forwarded timestamps when replaying a log.
// This should be enough that we can then re-run the logger and get a valid log
// back.
diff --git a/aos/events/logging/realtime_replay_test.cc b/aos/events/logging/realtime_replay_test.cc
new file mode 100644
index 0000000..c7744d4
--- /dev/null
+++ b/aos/events/logging/realtime_replay_test.cc
@@ -0,0 +1,76 @@
+#include "aos/events/logging/log_reader.h"
+#include "aos/events/logging/log_writer.h"
+#include "aos/events/ping_lib.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/json_to_flatbuffer.h"
+#include "aos/testing/path.h"
+#include "aos/testing/tmpdir.h"
+#include "gtest/gtest.h"
+
+namespace aos::logger::testing {
+
+class RealtimeLoggerTest : public ::testing::Test {
+ protected:
+ RealtimeLoggerTest()
+ : shm_dir_(aos::testing::TestTmpDir() + "/aos"),
+ config_file_(
+ aos::testing::ArtifactPath("aos/events/pingpong_config.json")),
+ config_(aos::configuration::ReadConfig(config_file_)),
+ event_loop_factory_(&config_.message()),
+ ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
+ ping_(ping_event_loop_.get()) {
+ FLAGS_shm_base = shm_dir_;
+
+ // Nuke the shm dir, to ensure we aren't being affected by any preexisting
+ // tests.
+ aos::util::UnlinkRecursive(shm_dir_);
+ }
+
+ gflags::FlagSaver flag_saver_;
+ std::string shm_dir_;
+
+ const std::string config_file_;
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
+
+ // Factory and Ping class to generate a test logfile.
+ SimulatedEventLoopFactory event_loop_factory_;
+ std::unique_ptr<EventLoop> ping_event_loop_;
+ Ping ping_;
+};
+
+TEST_F(RealtimeLoggerTest, RealtimeReplay) {
+ const std::string tmpdir = aos::testing::TestTmpDir();
+ const std::string base_name = tmpdir + "/logfile/";
+ aos::util::UnlinkRecursive(base_name);
+ {
+ std::unique_ptr<EventLoop> logger_event_loop =
+ event_loop_factory_.MakeEventLoop("logger");
+
+ event_loop_factory_.RunFor(std::chrono::milliseconds(95));
+
+ Logger logger(logger_event_loop.get());
+ logger.set_separate_config(false);
+ logger.set_polling_period(std::chrono::milliseconds(100));
+ logger.StartLoggingLocalNamerOnRun(base_name);
+ event_loop_factory_.RunFor(std::chrono::milliseconds(2000));
+ }
+
+ LogReader reader(logger::SortParts(logger::FindLogs(base_name)));
+ ShmEventLoop shm_event_loop(reader.configuration());
+ reader.Register(&shm_event_loop);
+ reader.OnEnd(shm_event_loop.node(),
+ [&shm_event_loop]() { shm_event_loop.Exit(); });
+
+ Fetcher<examples::Ping> ping_fetcher =
+ shm_event_loop.MakeFetcher<examples::Ping>("/test");
+
+ shm_event_loop.AddTimer([]() { LOG(INFO) << "Hello, World!"; })
+ ->Setup(shm_event_loop.monotonic_now(), std::chrono::seconds(1));
+
+ shm_event_loop.Run();
+ reader.Deregister();
+
+ ASSERT_TRUE(ping_fetcher.Fetch());
+ ASSERT_EQ(ping_fetcher->value(), 210);
+}
+} // namespace aos::logger::testing