blob: 5b437ba3ae82ea1fbd255ad59bc076937990e781 [file] [log] [blame]
#include "aos/events/logging/log_reader.h"
#include <dirent.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <climits>
#include <utility>
#include <vector>
#include "absl/strings/escaping.h"
#include "absl/types/span.h"
#include "flatbuffers/flatbuffers.h"
#include "openssl/sha.h"
#include "aos/events/event_loop.h"
#include "aos/events/logging/boot_timestamp.h"
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/logger_generated.h"
#include "aos/flatbuffer_merge.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/network/multinode_timestamp_filter.h"
#include "aos/network/remote_message_generated.h"
#include "aos/network/remote_message_schema.h"
#include "aos/network/team_number.h"
#include "aos/network/timestamp_channel.h"
#include "aos/time/time.h"
#include "aos/util/file.h"
#include "aos/uuid.h"
DEFINE_bool(skip_missing_forwarding_entries, false,
"If true, drop any forwarding entries with missing data. If "
"false, CHECK.");
DECLARE_bool(timestamps_to_csv);
DEFINE_bool(
enable_timestamp_loading, true,
"Enable loading all the timestamps into RAM at startup if they are in "
"separate files. This fixes any timestamp queueing problems for the cost "
"of storing timestamps in RAM only on logs with timestamps logged in "
"separate files from data. Only disable this if you are reading a really "
"long log file and are experiencing memory problems due to loading all the "
"timestamps into RAM.");
DEFINE_bool(
force_timestamp_loading, false,
"Force loading all the timestamps into RAM at startup. This fixes any "
"timestamp queueing problems for the cost of storing timestamps in RAM and "
"potentially reading each log twice.");
DEFINE_bool(skip_order_validation, false,
"If true, ignore any out of orderness in replay");
DEFINE_double(
time_estimation_buffer_seconds, 2.0,
"The time to buffer ahead in the log file to accurately reconstruct time.");
DEFINE_string(
start_time, "",
"If set, start at this point in time in the log on the realtime clock.");
DEFINE_string(
end_time, "",
"If set, end at this point in time in the log on the realtime clock.");
DEFINE_bool(drop_realtime_messages_before_start, false,
"If set, will drop any messages sent before the start of the "
"logfile in realtime replay. Setting this guarantees consistency "
"in timing with the original logfile, but means that you lose "
"access to fetched low-frequency messages.");
DEFINE_double(
threaded_look_ahead_seconds, 2.0,
"Time, in seconds, to add to look-ahead when using multi-threaded replay. "
"Can validly be zero, but higher values are encouraged for realtime replay "
"in order to prevent the replay from ever having to block on waiting for "
"the reader to find the next message.");
namespace aos {
namespace configuration {
// We don't really want to expose this publicly, but log reader doesn't really
// want to re-implement it.
void HandleMaps(const flatbuffers::Vector<flatbuffers::Offset<aos::Map>> *maps,
std::string *name, std::string_view type, const Node *node);
} // namespace configuration
namespace logger {
namespace {
namespace chrono = std::chrono;
using message_bridge::RemoteMessage;
} // namespace
// Class to manage triggering events on the RT clock while replaying logs. Since
// the RT clock can only change when we get a message, we only need to update
// our timers when new messages are read.
class EventNotifier {
public:
EventNotifier(EventLoop *event_loop, std::function<void()> fn,
std::string_view name,
realtime_clock::time_point realtime_event_time)
: event_loop_(event_loop),
fn_(std::move(fn)),
realtime_event_time_(realtime_event_time) {
CHECK(event_loop_);
event_timer_ = event_loop->AddTimer([this]() { HandleTime(); });
if (event_loop_->node() != nullptr) {
event_timer_->set_name(
absl::StrCat(event_loop_->node()->name()->string_view(), "_", name));
} else {
event_timer_->set_name(name);
}
}
~EventNotifier() { event_timer_->Disable(); }
// Sets the clock offset for realtime playback.
void SetClockOffset(std::chrono::nanoseconds clock_offset) {
clock_offset_ = clock_offset;
}
// Returns the event trigger time.
realtime_clock::time_point realtime_event_time() const {
return realtime_event_time_;
}
// Observes the next message and potentially calls the callback or updates the
// timer.
void ObserveNextMessage(monotonic_clock::time_point monotonic_message_time,
realtime_clock::time_point realtime_message_time) {
if (realtime_message_time < realtime_event_time_) {
return;
}
if (called_) {
return;
}
// Move the callback wakeup time to the correct time (or make it now if
// there's a gap in time) now that we know it is before the next
// message.
const monotonic_clock::time_point candidate_monotonic =
(realtime_event_time_ - realtime_message_time) + monotonic_message_time;
const monotonic_clock::time_point monotonic_now =
event_loop_->monotonic_now();
if (candidate_monotonic < monotonic_now) {
// Whops, time went backwards. Just do it now.
HandleTime();
} else {
event_timer_->Schedule(candidate_monotonic + clock_offset_);
}
}
private:
void HandleTime() {
if (!called_) {
called_ = true;
fn_();
}
}
EventLoop *event_loop_ = nullptr;
TimerHandler *event_timer_ = nullptr;
std::function<void()> fn_;
const realtime_clock::time_point realtime_event_time_ =
realtime_clock::min_time;
std::chrono::nanoseconds clock_offset_{0};
bool called_ = false;
};
LogReader::LogReader(std::string_view filename,
const Configuration *replay_configuration,
const ReplayChannels *replay_channels)
: LogReader(LogFilesContainer(SortParts({std::string(filename)})),
replay_configuration, replay_channels) {}
LogReader::LogReader(std::vector<LogFile> log_files,
const Configuration *replay_configuration,
const ReplayChannels *replay_channels)
: LogReader(LogFilesContainer(std::move(log_files)), replay_configuration,
replay_channels) {}
LogReader::LogReader(LogFilesContainer log_files,
const Configuration *replay_configuration,
const ReplayChannels *replay_channels)
: log_files_(std::move(log_files)),
replay_configuration_(replay_configuration),
replay_channels_(replay_channels),
config_remapper_(log_files_.config().get(), replay_configuration_,
replay_channels_) {
SetStartTime(FLAGS_start_time);
SetEndTime(FLAGS_end_time);
{
// Log files container validates that log files shared the same config.
const Configuration *config = log_files_.config().get();
CHECK_NOTNULL(config);
}
if (replay_channels_ != nullptr) {
CHECK(!replay_channels_->empty()) << "replay_channels is empty which means "
"no messages will get replayed.";
}
if (!configuration::MultiNode(configuration())) {
states_.resize(1);
} else {
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
replay_configuration->nodes()->size())
<< ": Log file and replay config need to have matching nodes "
"lists.";
for (const Node *node : *logged_configuration()->nodes()) {
if (configuration::GetNode(replay_configuration, node) == nullptr) {
LOG(FATAL) << "Found node " << FlatbufferToJson(node)
<< " in logged config that is not present in the replay "
"config.";
}
}
}
states_.resize(configuration()->nodes()->size());
}
before_send_callbacks_.resize(configuration()->channels()->size());
}
LogReader::~LogReader() {
if (event_loop_factory_unique_ptr_) {
Deregister();
} else if (event_loop_factory_ != nullptr) {
LOG(FATAL) << "Must call Deregister before the SimulatedEventLoopFactory "
"is destroyed";
}
}
const Configuration *LogReader::logged_configuration() const {
return config_remapper_.original_configuration();
}
const Configuration *LogReader::configuration() const {
return config_remapper_.remapped_configuration();
}
std::vector<const Node *> LogReader::LoggedNodes() const {
return configuration::GetNodes(logged_configuration());
}
monotonic_clock::time_point LogReader::monotonic_start_time(
const Node *node) const {
State *state =
states_[configuration::GetNodeIndex(configuration(), node)].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
return state->monotonic_start_time(state->boot_count());
}
realtime_clock::time_point LogReader::realtime_start_time(
const Node *node) const {
State *state =
states_[configuration::GetNodeIndex(configuration(), node)].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
return state->realtime_start_time(state->boot_count());
}
void LogReader::OnStart(std::function<void()> fn) {
CHECK(!configuration::MultiNode(configuration()));
OnStart(nullptr, std::move(fn));
}
void LogReader::OnStart(const Node *node, std::function<void()> fn) {
const int node_index = configuration::GetNodeIndex(configuration(), node);
CHECK_GE(node_index, 0);
CHECK_LT(node_index, static_cast<int>(states_.size()));
State *state = states_[node_index].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
state->OnStart(std::move(fn));
}
void LogReader::State::QueueThreadUntil(BootTimestamp time) {
if (threading_ == ThreadedBuffering::kYes) {
CHECK(!message_queuer_.has_value()) << "Can't start thread twice.";
message_queuer_.emplace(
[this](const BootTimestamp queue_until) {
// This will be called whenever anything prompts us for any state
// change; there may be wakeups that result in us not having any new
// data to push (even if we aren't done), in which case we will return
// nullopt but not done().
if (last_queued_message_.has_value() &&
queue_until < last_queued_message_) {
return util::ThreadedQueue<TimestampedMessage,
BootTimestamp>::PushResult{
std::nullopt, false,
last_queued_message_ == BootTimestamp::max_time()};
}
TimestampedMessage *message = timestamp_mapper_->Front();
// Upon reaching the end of the log, exit.
if (message == nullptr) {
last_queued_message_ = BootTimestamp::max_time();
return util::ThreadedQueue<TimestampedMessage,
BootTimestamp>::PushResult{std::nullopt,
false, true};
}
last_queued_message_ = message->monotonic_event_time;
const util::ThreadedQueue<TimestampedMessage,
BootTimestamp>::PushResult result{
*message, queue_until >= last_queued_message_, false};
timestamp_mapper_->PopFront();
MaybeSeedSortedMessages();
return result;
},
time);
// Spin until the first few seconds of messages are queued up so that we
// don't end up with delays/inconsistent timing during the first few seconds
// of replay.
message_queuer_->WaitForNoMoreWork();
}
}
void LogReader::State::OnStart(std::function<void()> fn) {
on_starts_.emplace_back(std::move(fn));
}
void LogReader::State::RunOnStart() {
SetRealtimeOffset(monotonic_start_time(boot_count()),
realtime_start_time(boot_count()));
VLOG(1) << "Starting for node '" << MaybeNodeName(node()) << "' at time "
<< monotonic_start_time(boot_count());
auto fn = [this]() {
for (size_t i = 0; i < on_starts_.size(); ++i) {
on_starts_[i]();
}
};
if (event_loop_factory_) {
event_loop_factory_->AllowApplicationCreationDuring(std::move(fn));
} else {
fn();
}
stopped_ = false;
started_ = true;
}
void LogReader::OnEnd(std::function<void()> fn) {
CHECK(!configuration::MultiNode(configuration()));
OnEnd(nullptr, std::move(fn));
}
void LogReader::OnEnd(const Node *node, std::function<void()> fn) {
const int node_index = configuration::GetNodeIndex(configuration(), node);
CHECK_GE(node_index, 0);
CHECK_LT(node_index, static_cast<int>(states_.size()));
State *state = states_[node_index].get();
CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node);
state->OnEnd(std::move(fn));
}
void LogReader::State::OnEnd(std::function<void()> fn) {
on_ends_.emplace_back(std::move(fn));
}
void LogReader::State::RunOnEnd() {
VLOG(1) << "Ending for node '" << MaybeNodeName(node()) << "' at time "
<< monotonic_start_time(boot_count());
auto fn = [this]() {
for (size_t i = 0; i < on_ends_.size(); ++i) {
on_ends_[i]();
}
};
if (event_loop_factory_) {
event_loop_factory_->AllowApplicationCreationDuring(std::move(fn));
} else {
fn();
}
stopped_ = true;
started_ = true;
}
std::vector<
std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
LogReader::State::NonExclusiveChannels() {
CHECK_NOTNULL(node_event_loop_factory_);
const aos::Configuration *config = node_event_loop_factory_->configuration();
std::vector<
std::pair<const aos::Channel *, NodeEventLoopFactory::ExclusiveSenders>>
result{// Timing reports can be sent by logged and replayed applications.
{aos::configuration::GetChannel(config, "/aos",
"aos.timing.Report", "", node_),
NodeEventLoopFactory::ExclusiveSenders::kNo},
// AOS_LOG may be used in the log and in replay.
{aos::configuration::GetChannel(
config, "/aos", "aos.logging.LogMessageFbs", "", node_),
NodeEventLoopFactory::ExclusiveSenders::kNo}};
for (const Node *const node : configuration::GetNodes(config)) {
if (node == nullptr) {
break;
}
const Channel *const old_timestamp_channel = aos::configuration::GetChannel(
config,
absl::StrCat("/aos/remote_timestamps/", node->name()->string_view()),
"aos.message_bridge.RemoteMessage", "", node_, /*quiet=*/true);
// The old-style remote timestamp channel can be populated from any
// channel, simulated or replayed.
if (old_timestamp_channel != nullptr) {
result.push_back(std::make_pair(
old_timestamp_channel, NodeEventLoopFactory::ExclusiveSenders::kNo));
}
}
// Remove any channels that weren't found due to not existing in the
// config.
for (size_t ii = 0; ii < result.size();) {
if (result[ii].first == nullptr) {
result.erase(result.begin() + ii);
} else {
++ii;
}
}
return result;
}
void LogReader::Register() {
event_loop_factory_unique_ptr_ =
std::make_unique<SimulatedEventLoopFactory>(configuration());
Register(event_loop_factory_unique_ptr_.get());
}
void LogReader::RegisterWithoutStarting(
SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
config_remapper_.set_configuration(event_loop_factory_->configuration());
const TimestampQueueStrategy timestamp_queue_strategy =
ComputeTimestampQueueStrategy();
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop_factory_->configuration(), logged_configuration(),
log_files_.boots(), FLAGS_skip_order_validation,
timestamp_queue_strategy ==
TimestampQueueStrategy::kQueueTimestampsAtStartup
? chrono::seconds(0)
: chrono::duration_cast<chrono::nanoseconds>(
chrono::duration<double>(
FLAGS_time_estimation_buffer_seconds)));
std::vector<TimestampMapper *> timestamp_mappers;
for (const Node *node : configuration::GetNodes(configuration())) {
size_t node_index = configuration::GetNodeIndex(configuration(), node);
std::string_view node_name = MaybeNodeName(node);
// We don't run with threading on the buffering for simulated event loops
// because we haven't attempted to validate how the interactions beteen the
// buffering and the timestamp mapper works when running multiple nodes
// concurrently.
states_[node_index] = std::make_unique<State>(
!log_files_.ContainsPartsForNode(node_name)
? nullptr
: std::make_unique<TimestampMapper>(node_name, log_files_,
timestamp_queue_strategy),
timestamp_queue_strategy, filters_.get(),
std::bind(&LogReader::NoticeRealtimeEnd, this), node,
State::ThreadedBuffering::kNo, MaybeMakeReplayChannelIndices(node),
before_send_callbacks_);
State *state = states_[node_index].get();
state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node),
event_loop_factory_);
state->SetChannelCount(logged_configuration()->channels()->size());
timestamp_mappers.emplace_back(state->timestamp_mapper());
}
filters_->SetTimestampMappers(std::move(timestamp_mappers));
// Note: this needs to be set before any times are pulled, or we won't observe
// the timestamps.
event_loop_factory_->SetTimeConverter(filters_.get());
for (const Node *node : configuration::GetNodes(configuration())) {
const size_t node_index =
configuration::GetNodeIndex(configuration(), node);
State *state = states_[node_index].get();
for (const Node *other_node : configuration::GetNodes(configuration())) {
const size_t other_node_index =
configuration::GetNodeIndex(configuration(), other_node);
State *other_state = states_[other_node_index].get();
if (other_state != state) {
state->AddPeer(other_state);
}
}
}
// Now that every state has a peer, load all the timestamps into RAM.
if (timestamp_queue_strategy ==
TimestampQueueStrategy::kQueueTimestampsAtStartup) {
for (std::unique_ptr<State> &state : states_) {
state->ReadTimestamps();
}
}
// Register after making all the State objects so we can build references
// between them.
for (const Node *node : configuration::GetNodes(configuration())) {
const size_t node_index =
configuration::GetNodeIndex(configuration(), node);
State *state = states_[node_index].get();
// If we didn't find any log files with data in them, we won't ever get a
// callback or be live. So skip the rest of the setup.
if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
continue;
}
++live_nodes_;
NodeEventLoopFactory *node_factory =
event_loop_factory_->GetNodeEventLoopFactory(node);
node_factory->OnStartup([this, state, node]() {
RegisterDuringStartup(state->MakeEventLoop(), node);
});
node_factory->OnShutdown([this, state, node]() {
RegisterDuringStartup(nullptr, node);
state->DestroyEventLoop();
});
}
if (live_nodes_ == 0) {
LOG(FATAL)
<< "Don't have logs from any of the nodes in the replay config--are "
"you sure that the replay config matches the original config?";
}
filters_->CheckGraph();
for (std::unique_ptr<State> &state : states_) {
state->MaybeSeedSortedMessages();
}
// Forwarding is tracked per channel. If it is enabled, we want to turn it
// off. Otherwise messages replayed will get forwarded across to the other
// nodes, and also replayed on the other nodes. This may not satisfy all
// our users, but it'll start the discussion.
if (configuration::NodesCount(event_loop_factory_->configuration()) > 1u) {
for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
const Channel *channel = logged_configuration()->channels()->Get(i);
const Node *node = configuration::GetNode(
configuration(), channel->source_node()->string_view());
State *state =
states_[configuration::GetNodeIndex(configuration(), node)].get();
const Channel *remapped_channel =
config_remapper_.RemapChannel(state->event_loop(), node, channel);
event_loop_factory_->DisableForwarding(remapped_channel);
}
// If we are replaying a log, we don't want a bunch of redundant messages
// from both the real message bridge and simulated message bridge.
event_loop_factory_->PermanentlyDisableStatistics();
}
// Write pseudo start times out to file now that we are all setup.
filters_->Start(event_loop_factory_);
}
void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
RegisterWithoutStarting(event_loop_factory);
StartAfterRegister(event_loop_factory);
}
void LogReader::StartAfterRegister(
SimulatedEventLoopFactory *event_loop_factory) {
// We want to start the log file at the last start time of the log files
// from all the nodes. Compute how long each node's simulation needs to run
// to move time to this point.
distributed_clock::time_point start_time = distributed_clock::min_time;
// TODO(austin): We want an "OnStart" callback for each node rather than
// running until the last node.
for (std::unique_ptr<State> &state : states_) {
CHECK(state);
VLOG(1) << "Start time is " << state->monotonic_start_time(0)
<< " for node '" << MaybeNodeName(state->node()) << "' now "
<< (state->event_loop() != nullptr ? state->monotonic_now()
: monotonic_clock::min_time);
if (state->monotonic_start_time(0) == monotonic_clock::min_time) {
continue;
}
// And start computing the start time on the distributed clock now that
// that works.
start_time = std::max(
start_time, state->ToDistributedClock(state->monotonic_start_time(0)));
}
// TODO(austin): If a node doesn't have a start time, we might not queue
// enough. If this happens, we'll explode with a frozen error eventually.
CHECK_GE(start_time, distributed_clock::epoch())
<< ": Hmm, we have a node starting before the start of time. Offset "
"everything.";
{
VLOG(1) << "Running until " << start_time << " in Register";
event_loop_factory_->RunFor(start_time.time_since_epoch());
VLOG(1) << "At start time";
}
for (std::unique_ptr<State> &state : states_) {
// Make the RT clock be correct before handing it to the user.
if (state->realtime_start_time(0) != realtime_clock::min_time) {
state->SetRealtimeOffset(state->monotonic_start_time(0),
state->realtime_start_time(0));
}
VLOG(1) << "Start time is " << state->monotonic_start_time(0)
<< " for node '" << MaybeNodeName(state->node()) << "' now "
<< (state->event_loop() != nullptr ? state->monotonic_now()
: monotonic_clock::min_time);
}
if (FLAGS_timestamps_to_csv) {
filters_->Start(event_loop_factory);
}
}
message_bridge::NoncausalOffsetEstimator *LogReader::GetFilter(
const Node *node_a, const Node *node_b) {
if (filters_) {
return filters_->GetFilter(node_a, node_b);
}
return nullptr;
}
TimestampQueueStrategy LogReader::ComputeTimestampQueueStrategy() const {
if ((log_files_.TimestampsStoredSeparately() &&
FLAGS_enable_timestamp_loading) ||
FLAGS_force_timestamp_loading) {
return TimestampQueueStrategy::kQueueTimestampsAtStartup;
} else {
return TimestampQueueStrategy::kQueueTogether;
}
}
// TODO(jkuszmaul): Make in-line modifications to
// ServerStatistics/ClientStatistics messages for ShmEventLoop-based replay to
// avoid messing up anything that depends on them having valid offsets.
void LogReader::Register(EventLoop *event_loop) {
const TimestampQueueStrategy timestamp_queue_strategy =
ComputeTimestampQueueStrategy();
filters_ =
std::make_unique<message_bridge::MultiNodeNoncausalOffsetEstimator>(
event_loop->configuration(), logged_configuration(),
log_files_.boots(), FLAGS_skip_order_validation,
timestamp_queue_strategy ==
TimestampQueueStrategy::kQueueTimestampsAtStartup
? chrono::seconds(0)
: chrono::duration_cast<chrono::nanoseconds>(
chrono::duration<double>(
FLAGS_time_estimation_buffer_seconds)));
std::vector<TimestampMapper *> timestamp_mappers;
for (const Node *node : configuration::GetNodes(configuration())) {
auto node_name = MaybeNodeName(node);
const size_t node_index =
configuration::GetNodeIndex(configuration(), node);
states_[node_index] = std::make_unique<State>(
!log_files_.ContainsPartsForNode(node_name)
? nullptr
: std::make_unique<TimestampMapper>(node_name, log_files_,
timestamp_queue_strategy),
timestamp_queue_strategy, filters_.get(),
std::bind(&LogReader::NoticeRealtimeEnd, this), node,
State::ThreadedBuffering::kYes, MaybeMakeReplayChannelIndices(node),
before_send_callbacks_);
State *state = states_[node_index].get();
state->SetChannelCount(logged_configuration()->channels()->size());
timestamp_mappers.emplace_back(state->timestamp_mapper());
}
filters_->SetTimestampMappers(std::move(timestamp_mappers));
for (const Node *node : configuration::GetNodes(configuration())) {
const size_t node_index =
configuration::GetNodeIndex(configuration(), node);
State *state = states_[node_index].get();
for (const Node *other_node : configuration::GetNodes(configuration())) {
const size_t other_node_index =
configuration::GetNodeIndex(configuration(), other_node);
State *other_state = states_[other_node_index].get();
if (other_state != state) {
state->AddPeer(other_state);
}
}
}
// Now that all the peers are added, we can buffer up all the timestamps if
// needed.
if (timestamp_queue_strategy ==
TimestampQueueStrategy::kQueueTimestampsAtStartup) {
for (std::unique_ptr<State> &state : states_) {
state->ReadTimestamps();
}
}
for (const Node *node : configuration::GetNodes(configuration())) {
if (node == nullptr || node->name()->string_view() ==
event_loop->node()->name()->string_view()) {
Register(event_loop, event_loop->node());
} else {
Register(nullptr, node);
}
}
}
void LogReader::Register(EventLoop *event_loop, const Node *node) {
State *state =
states_[configuration::GetNodeIndex(configuration(), node)].get();
// If we didn't find any log files with data in them, we won't ever get a
// callback or be live. So skip the rest of the setup.
if (state->SingleThreadedOldestMessageTime() == BootTimestamp::max_time()) {
return;
}
if (event_loop != nullptr) {
++live_nodes_;
}
if (event_loop_factory_ != nullptr) {
event_loop_factory_->GetNodeEventLoopFactory(node)->OnStartup(
[this, event_loop, node]() {
RegisterDuringStartup(event_loop, node);
});
} else {
RegisterDuringStartup(event_loop, node);
}
}
void LogReader::RegisterDuringStartup(EventLoop *event_loop, const Node *node) {
if (event_loop != nullptr) {
CHECK(event_loop->configuration() == configuration());
}
State *state =
states_[configuration::GetNodeIndex(configuration(), node)].get();
if (event_loop == nullptr) {
state->ClearTimeFlags();
}
state->set_event_loop(event_loop);
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
// This is only really relevant when we are replaying into a simulation.
if (event_loop != nullptr) {
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
}
for (size_t logged_channel_index = 0;
logged_channel_index < logged_configuration()->channels()->size();
++logged_channel_index) {
const Channel *channel = config_remapper_.RemapChannel(
event_loop, node,
logged_configuration()->channels()->Get(logged_channel_index));
const bool logged = channel->logger() != LoggerConfig::NOT_LOGGED;
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
State *source_state = nullptr;
if (!configuration::ChannelIsSendableOnNode(channel, node) &&
configuration::ChannelIsReadableOnNode(channel, node)) {
const Node *source_node = configuration::GetNode(
configuration(), channel->source_node()->string_view());
// We've got a message which is being forwarded to this node.
filter = GetFilter(node, source_node);
source_state =
states_[configuration::GetNodeIndex(configuration(), source_node)]
.get();
}
// We are the source, and it is forwarded.
const bool is_forwarded =
configuration::ChannelIsSendableOnNode(channel, node) &&
configuration::ConnectionCount(channel);
state->SetChannel(
logged_channel_index,
configuration::ChannelIndex(configuration(), channel),
event_loop && logged &&
configuration::ChannelIsReadableOnNode(channel, node)
? event_loop->MakeRawSender(channel)
: nullptr,
filter, is_forwarded, source_state);
if (is_forwarded && logged) {
const Node *source_node = configuration::GetNode(
configuration(), channel->source_node()->string_view());
for (const Connection *connection : *channel->destination_nodes()) {
const bool delivery_time_is_logged =
configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
source_node);
if (delivery_time_is_logged) {
State *destination_state =
states_[configuration::GetNodeIndex(
configuration(), connection->name()->string_view())]
.get();
if (destination_state) {
destination_state->SetRemoteTimestampSender(
logged_channel_index,
event_loop ? state->RemoteTimestampSender(channel, connection)
: nullptr);
}
}
}
}
}
if (!event_loop) {
state->ClearRemoteTimestampSenders();
state->set_timer_handler(nullptr);
state->set_startup_timer(nullptr);
return;
}
state->set_timer_handler(event_loop->AddTimer([this, state]() {
if (state->MultiThreadedOldestMessageTime() == BootTimestamp::max_time()) {
--live_nodes_;
VLOG(1) << "Node '" << MaybeNodeName(state->event_loop()->node())
<< "' down!";
if (exit_on_finish_ && live_nodes_ == 0 &&
event_loop_factory_ != nullptr) {
event_loop_factory_->Exit();
}
return;
}
TimestampedMessage timestamped_message = state->PopOldest();
CHECK_EQ(timestamped_message.monotonic_event_time.boot,
state->boot_count());
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
if (event_loop_factory_ != nullptr) {
// Only enforce exact timing in simulation.
if (!FLAGS_skip_order_validation) {
CHECK(monotonic_now == timestamped_message.monotonic_event_time.time)
<< ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
<< monotonic_now << " trying to send "
<< timestamped_message.monotonic_event_time << " failure "
<< state->DebugString();
} else if (BootTimestamp{.boot = state->boot_count(),
.time = monotonic_now} !=
timestamped_message.monotonic_event_time) {
LOG(WARNING) << "Check failed: monotonic_now == "
"timestamped_message.monotonic_event_time) ("
<< monotonic_now << " vs. "
<< timestamped_message.monotonic_event_time
<< "): " << FlatbufferToJson(state->event_loop()->node())
<< " Now " << monotonic_now << " trying to send "
<< timestamped_message.monotonic_event_time << " failure "
<< state->DebugString();
}
}
if (timestamped_message.monotonic_event_time.time >
state->monotonic_start_time(
timestamped_message.monotonic_event_time.boot) ||
event_loop_factory_ != nullptr ||
!FLAGS_drop_realtime_messages_before_start) {
if (timestamped_message.data != nullptr && !state->found_last_message()) {
if (timestamped_message.monotonic_remote_time !=
BootTimestamp::min_time() &&
!FLAGS_skip_order_validation && event_loop_factory_ != nullptr) {
// Confirm that the message was sent on the sending node before the
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
//
// TODO(austin): <= means that the cause message (which we know) could
// happen after the effect even though we know they are at the same
// time. I doubt anyone will notice for a bit, but we should really
// fix that.
BootTimestamp monotonic_remote_now =
state->monotonic_remote_now(timestamped_message.channel_index);
if (!FLAGS_skip_order_validation) {
CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
monotonic_remote_now.boot)
<< state->event_loop()->node()->name()->string_view() << " to "
<< state->remote_node(timestamped_message.channel_index)
->name()
->string_view()
<< " while trying to send a message on "
<< configuration::CleanedChannelToString(
logged_configuration()->channels()->Get(
timestamped_message.channel_index))
<< " " << timestamped_message << " " << state->DebugString();
CHECK_LE(timestamped_message.monotonic_remote_time,
monotonic_remote_now)
<< state->event_loop()->node()->name()->string_view() << " to "
<< state->remote_node(timestamped_message.channel_index)
->name()
->string_view()
<< " while trying to send a message on "
<< configuration::CleanedChannelToString(
logged_configuration()->channels()->Get(
timestamped_message.channel_index))
<< " " << state->DebugString();
} else if (monotonic_remote_now.boot !=
timestamped_message.monotonic_remote_time.boot) {
LOG(WARNING) << "Missmatched boots, " << monotonic_remote_now.boot
<< " vs "
<< timestamped_message.monotonic_remote_time.boot;
} else if (timestamped_message.monotonic_remote_time >
monotonic_remote_now) {
LOG(WARNING)
<< "Check failed: timestamped_message.monotonic_remote_time < "
"state->monotonic_remote_now(timestamped_message.channel_"
"index) ("
<< timestamped_message.monotonic_remote_time << " vs. "
<< state->monotonic_remote_now(
timestamped_message.channel_index)
<< ") " << state->event_loop()->node()->name()->string_view()
<< " to "
<< state->remote_node(timestamped_message.channel_index)
->name()
->string_view()
<< " currently " << timestamped_message.monotonic_event_time
<< " ("
<< state->ToDistributedClock(
timestamped_message.monotonic_event_time.time)
<< ") remote event time "
<< timestamped_message.monotonic_remote_time << " ("
<< state->RemoteToDistributedClock(
timestamped_message.channel_index,
timestamped_message.monotonic_remote_time.time)
<< ") " << state->DebugString();
}
}
// If we have access to the factory, use it to fix the realtime time.
state->SetRealtimeOffset(timestamped_message.monotonic_event_time.time,
timestamped_message.realtime_event_time);
VLOG(1) << "For node '" << MaybeNodeName(state->event_loop()->node())
<< "' sending at " << timestamped_message.monotonic_event_time
<< " : " << state->DebugString();
// TODO(austin): std::move channel_data in and make that efficient in
// simulation.
state->Send(std::move(timestamped_message));
} else if (state->found_last_message() ||
(!ignore_missing_data_ &&
// When starting up, we can have data which was sent before
// the log starts, but the timestamp was after the log
// starts. This is unreasonable to avoid, so ignore the
// missing data.
timestamped_message.monotonic_remote_time.time >=
state->monotonic_remote_start_time(
timestamped_message.monotonic_remote_time.boot,
timestamped_message.channel_index) &&
!FLAGS_skip_missing_forwarding_entries)) {
if (!state->found_last_message()) {
// We've found a timestamp without data that we expect to have data
// for. This likely means that we are at the end of the log file.
// Record it and CHECK that in the rest of the log file, we don't find
// any more data on that channel. Not all channels will end at the
// same point in time since they can be in different files.
VLOG(1) << "Found the last message on channel "
<< timestamped_message.channel_index << ", "
<< configuration::CleanedChannelToString(
logged_configuration()->channels()->Get(
timestamped_message.channel_index))
<< " on node '" << MaybeNodeName(state->event_loop()->node())
<< "' at " << timestamped_message;
// The user might be working with log files from 1 node but forgot to
// configure the infrastructure to log data for a remote channel on
// that node. That can be very hard to debug, even though the log
// reader is doing the right thing. At least log a warning in that
// case and tell the user what is happening so they can either update
// their config to log the channel or can find a log with the data.
const std::vector<std::string> logger_nodes =
log_files_.logger_nodes();
if (!logger_nodes.empty()) {
// We have old logs which don't have the logger nodes logged. In
// that case, we can't be helpful :(
bool data_logged = false;
const Channel *channel = logged_configuration()->channels()->Get(
timestamped_message.channel_index);
for (const std::string &node : logger_nodes) {
data_logged |=
configuration::ChannelMessageIsLoggedOnNode(channel, node);
}
if (!data_logged) {
LOG(WARNING) << "Got a timestamp without any logfiles which "
"could contain data for channel "
<< configuration::CleanedChannelToString(channel);
LOG(WARNING) << "Only have logs logged on ["
<< absl::StrJoin(logger_nodes, ", ") << "]";
LOG(WARNING)
<< "Dropping the rest of the data on "
<< state->event_loop()->node()->name()->string_view();
LOG(WARNING)
<< "Consider using --skip_missing_forwarding_entries to "
"bypass this, update your config to log it, or add data "
"from one of the nodes it is logged on.";
}
}
// The log file is now done, prod the callbacks.
state->NotifyLogfileEnd();
// Now that we found the end of one channel, artificially stop the
// rest by setting the found_last_message bit. It is confusing when
// part of your data gets replayed but not all. The rest of them will
// get dropped as they are replayed to keep memory usage down.
state->SetFoundLastMessage(true);
// Vector storing if we've seen a nullptr message or not per channel.
state->set_last_message(timestamped_message.channel_index);
}
// Make sure that once we have seen the last message on a channel,
// data doesn't start back up again. If the user wants to play
// through events like this, they can set
// --skip_missing_forwarding_entries or ignore_missing_data_.
if (timestamped_message.data == nullptr) {
state->set_last_message(timestamped_message.channel_index);
} else {
if (state->last_message(timestamped_message.channel_index)) {
LOG(FATAL) << "Found missing data in the middle of the log file on "
"channel "
<< timestamped_message.channel_index << " "
<< configuration::StrippedChannelToString(
logged_configuration()->channels()->Get(
timestamped_message.channel_index))
<< " " << timestamped_message << " "
<< state->DebugString();
}
}
}
} else {
LOG(WARNING)
<< "Not sending data from before the start of the log file. "
<< timestamped_message.monotonic_event_time.time.time_since_epoch()
.count()
<< " start "
<< monotonic_start_time(state->node()).time_since_epoch().count()
<< " timestamped_message.data is null";
}
const BootTimestamp next_time = state->MultiThreadedOldestMessageTime();
if (next_time != BootTimestamp::max_time()) {
if (next_time.boot != state->boot_count()) {
VLOG(1) << "Next message for node '"
<< MaybeNodeName(state->event_loop()->node())
<< "' is on the next boot, " << next_time << " now is "
<< state->monotonic_now();
CHECK(event_loop_factory_);
state->NotifyLogfileEnd();
return;
}
if (event_loop_factory_ != nullptr) {
VLOG(1) << "Scheduling for node '"
<< MaybeNodeName(state->event_loop()->node()) << "' wakeup for "
<< next_time.time << "("
<< state->ToDistributedClock(next_time.time)
<< " distributed), now is " << state->monotonic_now();
} else {
VLOG(1) << "Scheduling for node '"
<< MaybeNodeName(state->event_loop()->node()) << "' wakeup for "
<< next_time.time << ", now is " << state->monotonic_now();
}
// TODO(james): This can result in negative times getting passed-through
// in realtime replay.
state->Schedule(next_time.time);
} else {
VLOG(1) << "Node '" << MaybeNodeName(state->event_loop()->node())
<< "': No next message, scheduling shutdown";
state->NotifyLogfileEnd();
// Set a timer up immediately after now to die. If we don't do this,
// then the watchers waiting on the message we just read will never get
// called.
// Doesn't apply to single-EventLoop replay since the watchers in question
// are not under our control.
if (event_loop_factory_ != nullptr) {
state->Schedule(monotonic_now + event_loop_factory_->send_delay() +
std::chrono::nanoseconds(1));
}
}
VLOG(1) << "Node '" << MaybeNodeName(state->event_loop()->node())
<< "': Done sending at "
<< state->event_loop()->context().monotonic_event_time << " now "
<< state->monotonic_now();
}));
state->MaybeSeedSortedMessages();
if (state->SingleThreadedOldestMessageTime() != BootTimestamp::max_time()) {
state->set_startup_timer(
event_loop->AddTimer([state]() { state->NotifyLogfileStart(); }));
if (start_time_ != realtime_clock::min_time) {
state->SetStartTimeFlag(start_time_);
}
if (end_time_ != realtime_clock::max_time) {
state->SetEndTimeFlag(end_time_);
++live_nodes_with_realtime_time_end_;
}
event_loop->OnRun([state]() {
BootTimestamp next_time = state->SingleThreadedOldestMessageTime();
CHECK_EQ(next_time.boot, state->boot_count());
// Queue up messages and then set clock offsets (we don't want to set
// clock offsets before we've done the work of getting the first messages
// primed).
state->QueueThreadUntil(
next_time + std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(
FLAGS_threaded_look_ahead_seconds)));
state->MaybeSetClockOffset();
state->Schedule(next_time.time);
state->SetUpStartupTimer();
});
}
}
void LogReader::SetEndTime(std::string end_time) {
if (end_time.empty()) {
SetEndTime(realtime_clock::max_time);
} else {
std::optional<aos::realtime_clock::time_point> parsed_end_time =
aos::realtime_clock::FromString(end_time);
CHECK(parsed_end_time) << ": Failed to parse end time '" << end_time
<< "'. Expected a date in the format of "
"2021-01-15_15-30-35.000000000.";
SetEndTime(*parsed_end_time);
}
}
void LogReader::SetEndTime(realtime_clock::time_point end_time) {
end_time_ = end_time;
}
void LogReader::SetStartTime(std::string start_time) {
if (start_time.empty()) {
SetStartTime(realtime_clock::min_time);
} else {
std::optional<aos::realtime_clock::time_point> parsed_start_time =
aos::realtime_clock::FromString(start_time);
CHECK(parsed_start_time) << ": Failed to parse start time '" << start_time
<< "'. Expected a date in the format of "
"2021-01-15_15-30-35.000000000.";
SetStartTime(*parsed_start_time);
}
}
void LogReader::SetStartTime(realtime_clock::time_point start_time) {
start_time_ = start_time;
}
void LogReader::Deregister() {
// Make sure that things get destroyed in the correct order, rather than
// relying on getting the order correct in the class definition.
for (std::unique_ptr<State> &state : states_) {
state->Deregister();
}
event_loop_factory_unique_ptr_.reset();
event_loop_factory_ = nullptr;
}
namespace {
// Checks if the specified channel name/type exists in the config and, depending
// on the value of conflict_handling, calls conflict_handler or just dies.
template <typename F>
void CheckAndHandleRemapConflict(
std::string_view new_name, std::string_view new_type,
const Configuration *config,
ConfigRemapper::RemapConflict conflict_handling, F conflict_handler) {
const Channel *existing_channel =
configuration::GetChannel(config, new_name, new_type, "", nullptr, true);
if (existing_channel != nullptr) {
switch (conflict_handling) {
case ConfigRemapper::RemapConflict::kDisallow:
LOG(FATAL)
<< "Channel "
<< configuration::StrippedChannelToString(existing_channel)
<< " is already used--you can't remap a logged channel to it.";
break;
case ConfigRemapper::RemapConflict::kCascade:
LOG(INFO) << "Automatically remapping "
<< configuration::StrippedChannelToString(existing_channel)
<< " to avoid conflicts.";
conflict_handler();
break;
}
}
}
} // namespace
void LogReader::RemapLoggedChannel(
std::string_view name, std::string_view type, std::string_view add_prefix,
std::string_view new_type,
ConfigRemapper::RemapConflict conflict_handling) {
CheckEventsAreNotScheduled();
config_remapper_.RemapOriginalChannel(name, type, nullptr, add_prefix,
new_type, conflict_handling);
}
void LogReader::RemapLoggedChannel(
std::string_view name, std::string_view type, const Node *node,
std::string_view add_prefix, std::string_view new_type,
ConfigRemapper::RemapConflict conflict_handling) {
CheckEventsAreNotScheduled();
config_remapper_.RemapOriginalChannel(name, type, node, add_prefix, new_type,
conflict_handling);
}
void LogReader::RenameLoggedChannel(const std::string_view name,
const std::string_view type,
const std::string_view new_name,
const std::vector<MapT> &add_maps) {
CheckEventsAreNotScheduled();
RenameLoggedChannel(name, type, nullptr, new_name, add_maps);
}
void LogReader::RenameLoggedChannel(const std::string_view name,
const std::string_view type,
const Node *const node,
const std::string_view new_name,
const std::vector<MapT> &add_maps) {
CheckEventsAreNotScheduled();
config_remapper_.RenameOriginalChannel(name, type, node, new_name, add_maps);
}
void LogReader::CheckEventsAreNotScheduled() {
for (std::unique_ptr<State> &state : states_) {
if (state) {
CHECK(!state->event_loop())
<< ": Can't change the mapping after the events are scheduled.";
}
}
}
std::unique_ptr<const ReplayChannelIndices>
LogReader::MaybeMakeReplayChannelIndices(const Node *node) {
if (replay_channels_ == nullptr) {
return nullptr;
} else {
std::unique_ptr<ReplayChannelIndices> replay_channel_indices =
std::make_unique<ReplayChannelIndices>();
for (auto const &channel : *replay_channels_) {
const Channel *ch = configuration::GetChannel(
logged_configuration(), channel.first, channel.second, "", node);
if (ch == nullptr) {
LOG(WARNING) << "Channel: " << channel.first << " " << channel.second
<< " not found in configuration for node: "
<< node->name()->string_view() << " Skipping ...";
continue;
}
const size_t channel_index =
configuration::ChannelIndex(logged_configuration(), ch);
replay_channel_indices->emplace_back(channel_index);
}
std::sort(replay_channel_indices->begin(), replay_channel_indices->end());
return replay_channel_indices;
}
}
std::vector<const Channel *> LogReader::RemappedChannels() const {
return config_remapper_.RemappedChannels();
}
const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
const Node *node,
const Channel *channel) {
return config_remapper_.RemapChannel(event_loop, node, channel);
}
LogReader::State::State(
std::unique_ptr<TimestampMapper> timestamp_mapper,
TimestampQueueStrategy timestamp_queue_strategy,
message_bridge::MultiNodeNoncausalOffsetEstimator *multinode_filters,
std::function<void()> notice_realtime_end, const Node *node,
LogReader::State::ThreadedBuffering threading,
std::unique_ptr<const ReplayChannelIndices> replay_channel_indices,
const std::vector<std::function<void(void *message)>>
&before_send_callbacks)
: timestamp_mapper_(std::move(timestamp_mapper)),
timestamp_queue_strategy_(timestamp_queue_strategy),
notice_realtime_end_(notice_realtime_end),
node_(node),
multinode_filters_(multinode_filters),
threading_(threading),
replay_channel_indices_(std::move(replay_channel_indices)),
before_send_callbacks_(before_send_callbacks) {
// If timestamp_mapper_ is nullptr, then there are no log parts associated
// with this node. If there are no log parts for the node, there will be no
// log data, and so we do not need to worry about the replay channel filters.
if (replay_channel_indices_ != nullptr && timestamp_mapper_ != nullptr) {
timestamp_mapper_->set_replay_channels_callback(
[filter = replay_channel_indices_.get()](
const TimestampedMessage &message) -> bool {
auto const begin = filter->cbegin();
auto const end = filter->cend();
// TODO: benchmark strategies for channel_index matching
return std::binary_search(begin, end, message.channel_index);
});
}
}
void LogReader::State::AddPeer(State *peer) {
if (timestamp_mapper_ && peer->timestamp_mapper_) {
timestamp_mapper_->AddPeer(peer->timestamp_mapper_.get());
}
}
void LogReader::State::SetNodeEventLoopFactory(
NodeEventLoopFactory *node_event_loop_factory,
SimulatedEventLoopFactory *event_loop_factory) {
node_event_loop_factory_ = node_event_loop_factory;
event_loop_factory_ = event_loop_factory;
}
void LogReader::State::SetChannelCount(size_t count) {
channels_.resize(count);
remote_timestamp_senders_.resize(count);
filters_.resize(count);
channel_source_state_.resize(count);
factory_channel_index_.resize(count);
queue_index_map_.resize(count);
}
void LogReader::State::SetRemoteTimestampSender(
size_t logged_channel_index, RemoteMessageSender *remote_timestamp_sender) {
remote_timestamp_senders_[logged_channel_index] = remote_timestamp_sender;
}
void LogReader::State::SetChannel(
size_t logged_channel_index, size_t factory_channel_index,
std::unique_ptr<RawSender> sender,
message_bridge::NoncausalOffsetEstimator *filter, bool is_forwarded,
State *source_state) {
channels_[logged_channel_index] = std::move(sender);
filters_[logged_channel_index] = filter;
channel_source_state_[logged_channel_index] = source_state;
if (is_forwarded) {
queue_index_map_[logged_channel_index] =
std::make_unique<std::vector<State::ContiguousSentTimestamp>>();
}
factory_channel_index_[logged_channel_index] = factory_channel_index;
}
void LogReader::State::TrackMessageSendTiming(
const RawSender &sender, monotonic_clock::time_point expected_send_time) {
if (event_loop_ == nullptr || !timing_statistics_sender_.valid()) {
return;
}
timing::MessageTimingT sample;
sample.channel = configuration::ChannelIndex(event_loop_->configuration(),
sender.channel());
sample.expected_send_time = expected_send_time.time_since_epoch().count();
sample.actual_send_time =
sender.monotonic_sent_time().time_since_epoch().count();
sample.send_time_error = aos::time::DurationInSeconds(
expected_send_time - sender.monotonic_sent_time());
send_timings_.push_back(sample);
// Somewhat arbitrarily send out timing information in batches of 100. No need
// to create excessive overhead in regenerated logfiles.
// TODO(james): The overhead may be fine.
constexpr size_t kMaxTimesPerStatisticsMessage = 100;
CHECK(timing_statistics_sender_.valid());
if (send_timings_.size() == kMaxTimesPerStatisticsMessage) {
SendMessageTimings();
}
}
void LogReader::State::SendMessageTimings() {
if (send_timings_.empty() || !timing_statistics_sender_.valid()) {
return;
}
auto builder = timing_statistics_sender_.MakeBuilder();
std::vector<flatbuffers::Offset<timing::MessageTiming>> timing_offsets;
for (const auto &timing : send_timings_) {
timing_offsets.push_back(
timing::MessageTiming::Pack(*builder.fbb(), &timing));
}
send_timings_.clear();
flatbuffers::Offset<
flatbuffers::Vector<flatbuffers::Offset<timing::MessageTiming>>>
timings_offset = builder.fbb()->CreateVector(timing_offsets);
timing::ReplayTiming::Builder timing_builder =
builder.MakeBuilder<timing::ReplayTiming>();
timing_builder.add_messages(timings_offset);
timing_statistics_sender_.CheckOk(builder.Send(timing_builder.Finish()));
}
bool LogReader::State::Send(const TimestampedMessage &&timestamped_message) {
aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
CHECK(sender);
uint32_t remote_queue_index = 0xffffffff;
if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
State *source_state =
CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
std::vector<ContiguousSentTimestamp> *queue_index_map = CHECK_NOTNULL(
source_state->queue_index_map_[timestamped_message.channel_index]
.get());
struct SentTimestamp {
monotonic_clock::time_point monotonic_event_time;
uint32_t queue_index;
} search;
CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
source_state->boot_count());
search.monotonic_event_time =
timestamped_message.monotonic_remote_time.time;
search.queue_index = timestamped_message.remote_queue_index.index;
// Find the sent time if available.
auto element = std::lower_bound(
queue_index_map->begin(), queue_index_map->end(), search,
[](ContiguousSentTimestamp a, SentTimestamp b) {
if (a.ending_monotonic_event_time < b.monotonic_event_time) {
return true;
}
if (a.starting_monotonic_event_time > b.monotonic_event_time) {
return false;
}
if (a.ending_queue_index < b.queue_index) {
return true;
}
if (a.starting_queue_index >= b.queue_index) {
return false;
}
// If it isn't clearly below or above, it is below. Since we return
// the last element <, this will return a match.
return false;
});
// TODO(austin): Be a bit more principled here, but we will want to do that
// after the logger rewrite. We hit this when one node finishes, but the
// other node isn't done yet. So there is no send time, but there is a
// receive time.
if (element != queue_index_map->end()) {
CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
source_state->boot_count());
CHECK_GE(timestamped_message.monotonic_remote_time.time,
element->starting_monotonic_event_time);
CHECK_LE(timestamped_message.monotonic_remote_time.time,
element->ending_monotonic_event_time);
CHECK_GE(timestamped_message.remote_queue_index.index,
element->starting_queue_index);
CHECK_LE(timestamped_message.remote_queue_index.index,
element->ending_queue_index);
remote_queue_index = timestamped_message.remote_queue_index.index +
element->actual_queue_index -
element->starting_queue_index;
} else {
VLOG(1) << "No timestamp match in the map.";
}
CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
source_state->boot_count());
}
if (event_loop_factory_ != nullptr &&
channel_source_state_[timestamped_message.channel_index] != nullptr &&
multinode_filters_ != nullptr) {
// Sanity check that we are using consistent boot uuids.
State *source_state =
channel_source_state_[timestamped_message.channel_index];
CHECK_EQ(multinode_filters_->boot_uuid(
configuration::GetNodeIndex(event_loop_->configuration(),
source_state->node()),
timestamped_message.monotonic_remote_time.boot),
CHECK_NOTNULL(
CHECK_NOTNULL(
channel_source_state_[timestamped_message.channel_index])
->event_loop_)
->boot_uuid());
}
// Right before sending allow the user to process the message.
if (before_send_callbacks_[timestamped_message.channel_index]) {
// Only channels that are forwarded and sent from this State's node will be
// in the queue_index_map_
if (queue_index_map_[timestamped_message.channel_index]) {
before_send_callbacks_[timestamped_message.channel_index](
timestamped_message.data->mutable_data());
}
}
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
const RawSender::Error err = sender->Send(
SharedSpan(timestamped_message.data, &timestamped_message.data->span),
timestamped_message.monotonic_remote_time.time,
timestamped_message.realtime_remote_time, remote_queue_index,
(channel_source_state_[timestamped_message.channel_index] != nullptr
? CHECK_NOTNULL(multinode_filters_)
->boot_uuid(configuration::GetNodeIndex(
event_loop_->configuration(),
channel_source_state_[timestamped_message
.channel_index]
->node()),
timestamped_message.monotonic_remote_time.boot)
: event_loop_->boot_uuid()));
if (err != RawSender::Error::kOk) return false;
if (monotonic_start_time(timestamped_message.monotonic_event_time.boot) <=
timestamped_message.monotonic_event_time.time) {
// Only track errors for non-fetched messages.
TrackMessageSendTiming(
*sender,
timestamped_message.monotonic_event_time.time + clock_offset());
}
if (queue_index_map_[timestamped_message.channel_index]) {
CHECK_EQ(timestamped_message.monotonic_event_time.boot, boot_count());
if (queue_index_map_[timestamped_message.channel_index]->empty()) {
// Nothing here, start a range with 0 length.
ContiguousSentTimestamp timestamp;
timestamp.starting_monotonic_event_time =
timestamp.ending_monotonic_event_time =
timestamped_message.monotonic_event_time.time;
timestamp.starting_queue_index = timestamp.ending_queue_index =
timestamped_message.queue_index.index;
timestamp.actual_queue_index = sender->sent_queue_index();
queue_index_map_[timestamped_message.channel_index]->emplace_back(
timestamp);
} else {
// We've got something. See if the next timestamp is still contiguous. If
// so, grow it.
ContiguousSentTimestamp *back =
&queue_index_map_[timestamped_message.channel_index]->back();
if ((back->starting_queue_index - back->actual_queue_index) ==
(timestamped_message.queue_index.index -
sender->sent_queue_index())) {
back->ending_queue_index = timestamped_message.queue_index.index;
back->ending_monotonic_event_time =
timestamped_message.monotonic_event_time.time;
} else {
// Otherwise, make a new one.
ContiguousSentTimestamp timestamp;
timestamp.starting_monotonic_event_time =
timestamp.ending_monotonic_event_time =
timestamped_message.monotonic_event_time.time;
timestamp.starting_queue_index = timestamp.ending_queue_index =
timestamped_message.queue_index.index;
timestamp.actual_queue_index = sender->sent_queue_index();
queue_index_map_[timestamped_message.channel_index]->emplace_back(
timestamp);
}
}
// TODO(austin): Should we prune the map? On a many day log, I only saw the
// queue index diverge a couple of elements, which would be a very small
// map.
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
// TODO(james): Currently, If running replay against a single event loop,
// remote timestamps will not get replayed because this code-path only
// gets triggered on the event loop that receives the forwarded message
// that the timestamps correspond to. This code, as written, also doesn't
// correctly handle a non-zero clock_offset for the *_remote_time fields.
State *source_state =
CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index]);
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> boot_uuid_offset =
event_loop_->boot_uuid().PackVector(&fbb);
RemoteMessage::Builder message_header_builder(fbb);
message_header_builder.add_channel_index(
factory_channel_index_[timestamped_message.channel_index]);
// Swap the remote and sent metrics. They are from the sender's
// perspective, not the receiver's perspective.
message_header_builder.add_monotonic_sent_time(
sender->monotonic_sent_time().time_since_epoch().count());
message_header_builder.add_realtime_sent_time(
sender->realtime_sent_time().time_since_epoch().count());
message_header_builder.add_queue_index(sender->sent_queue_index());
CHECK_EQ(timestamped_message.monotonic_remote_time.boot,
source_state->boot_count());
message_header_builder.add_monotonic_remote_time(
timestamped_message.monotonic_remote_time.time.time_since_epoch()
.count());
message_header_builder.add_realtime_remote_time(
timestamped_message.realtime_remote_time.time_since_epoch().count());
message_header_builder.add_remote_queue_index(remote_queue_index);
message_header_builder.add_boot_uuid(boot_uuid_offset);
fbb.Finish(message_header_builder.Finish());
remote_timestamp_senders_[timestamped_message.channel_index]->Send(
FlatbufferDetachedBuffer<RemoteMessage>(fbb.Release()),
timestamped_message.monotonic_timestamp_time,
source_state->boot_count());
}
return true;
}
LogReader::RemoteMessageSender::RemoteMessageSender(
aos::Sender<message_bridge::RemoteMessage> sender, EventLoop *event_loop)
: event_loop_(event_loop),
sender_(std::move(sender)),
timer_(event_loop->AddTimer([this]() { SendTimestamp(); })) {}
void LogReader::RemoteMessageSender::ScheduleTimestamp() {
if (remote_timestamps_.empty()) {
CHECK_NOTNULL(timer_);
timer_->Disable();
scheduled_time_ = monotonic_clock::min_time;
return;
}
if (scheduled_time_ != remote_timestamps_.front().monotonic_timestamp_time) {
CHECK_NOTNULL(timer_);
timer_->Schedule(remote_timestamps_.front().monotonic_timestamp_time);
scheduled_time_ = remote_timestamps_.front().monotonic_timestamp_time;
CHECK_GE(scheduled_time_, event_loop_->monotonic_now())
<< event_loop_->node()->name()->string_view();
}
}
void LogReader::RemoteMessageSender::Send(
FlatbufferDetachedBuffer<RemoteMessage> remote_message,
BootTimestamp monotonic_timestamp_time, size_t source_boot_count) {
// There are 2 variants of logs.
// 1) Logs without monotonic_timestamp_time
// 2) Logs with monotonic_timestamp_time
//
// As of Jan 2021, we shouldn't have any more logs without
// monotonic_timestamp_time. We don't have data locked up in those logs worth
// the effort of saving.
//
// This gives us 3 cases, 2 of which are undistinguishable.
// 1) Old log without monotonic_timestamp_time.
// 2) New log with monotonic_timestamp_time where the timestamp was logged
// remotely so we actually have monotonic_timestamp_time.
// 3) New log, but the timestamp was logged on the node receiving the message
// so there is no monotonic_timestamp_time.
//
// Our goal when replaying is to accurately reproduce the state of the world
// present when logging. If a timestamp wasn't sent back across the network,
// we shouldn't replay one back across the network.
//
// Given that we don't really care about 1, we can use the presence of the
// timestamp to distinguish 2 and 3, and ignore 1. If we don't have a
// monotonic_timestamp_time, this means the message was logged locally and
// remote timestamps can be ignored.
if (monotonic_timestamp_time == BootTimestamp::min_time()) {
return;
}
CHECK_EQ(monotonic_timestamp_time.boot, source_boot_count);
remote_timestamps_.emplace(
std::upper_bound(
remote_timestamps_.begin(), remote_timestamps_.end(),
monotonic_timestamp_time.time,
[](const aos::monotonic_clock::time_point monotonic_timestamp_time,
const Timestamp &timestamp) {
return monotonic_timestamp_time <
timestamp.monotonic_timestamp_time;
}),
std::move(remote_message), monotonic_timestamp_time.time);
ScheduleTimestamp();
}
void LogReader::RemoteMessageSender::SendTimestamp() {
CHECK_EQ(event_loop_->context().monotonic_event_time, scheduled_time_)
<< event_loop_->node()->name()->string_view();
CHECK(!remote_timestamps_.empty());
// Send out all timestamps at the currently scheduled time.
while (remote_timestamps_.front().monotonic_timestamp_time ==
scheduled_time_) {
CHECK_EQ(sender_.Send(std::move(remote_timestamps_.front().remote_message)),
RawSender::Error::kOk);
remote_timestamps_.pop_front();
if (remote_timestamps_.empty()) {
break;
}
}
scheduled_time_ = monotonic_clock::min_time;
ScheduleTimestamp();
}
LogReader::RemoteMessageSender *LogReader::State::RemoteTimestampSender(
const Channel *channel, const Connection *connection) {
message_bridge::ChannelTimestampFinder finder(event_loop_);
// Look at any pre-created channel/connection pairs.
{
auto it =
channel_timestamp_loggers_.find(std::make_pair(channel, connection));
if (it != channel_timestamp_loggers_.end()) {
return it->second.get();
}
}
// That failed, so resolve the RemoteMessage channel timestamps will be logged
// to.
const Channel *timestamp_channel = finder.ForChannel(channel, connection);
{
// See if that has been created before. If so, cache it in
// channel_timestamp_loggers_ and return.
auto it = timestamp_loggers_.find(timestamp_channel);
if (it != timestamp_loggers_.end()) {
CHECK(channel_timestamp_loggers_
.try_emplace(std::make_pair(channel, connection), it->second)
.second);
return it->second.get();
}
}
// Otherwise, make a sender, save it, and cache it.
auto result = channel_timestamp_loggers_.try_emplace(
std::make_pair(channel, connection),
std::make_shared<RemoteMessageSender>(
event_loop()->MakeSender<RemoteMessage>(
timestamp_channel->name()->string_view()),
event_loop()));
CHECK(timestamp_loggers_.try_emplace(timestamp_channel, result.first->second)
.second);
return result.first->second.get();
}
TimestampedMessage LogReader::State::PopOldest() {
// multithreaded
if (message_queuer_.has_value()) {
std::optional<TimestampedMessage> message = message_queuer_->Pop();
CHECK(message.has_value()) << ": Unexpectedly ran out of messages.";
message_queuer_->SetState(
message.value().monotonic_event_time +
std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(FLAGS_threaded_look_ahead_seconds)));
VLOG(1) << "Popped " << message.value()
<< configuration::CleanedChannelToString(
event_loop_->configuration()->channels()->Get(
factory_channel_index_[message->channel_index]));
return message.value();
} else { // single threaded
CHECK(timestamp_mapper_ != nullptr);
TimestampedMessage *result_ptr = timestamp_mapper_->Front();
CHECK(result_ptr != nullptr);
TimestampedMessage result = std::move(*result_ptr);
VLOG(2) << "Node '" << MaybeNodeName(event_loop_->node())
<< "': PopOldest Popping " << result.monotonic_event_time;
timestamp_mapper_->PopFront();
MaybeSeedSortedMessages();
CHECK_EQ(result.monotonic_event_time.boot, boot_count());
VLOG(1) << "Popped " << result
<< configuration::CleanedChannelToString(
event_loop_->configuration()->channels()->Get(
factory_channel_index_[result.channel_index]));
return result;
}
}
BootTimestamp LogReader::State::MultiThreadedOldestMessageTime() {
if (!message_queuer_.has_value()) {
return SingleThreadedOldestMessageTime();
}
std::optional<TimestampedMessage> message = message_queuer_->Peek();
if (!message.has_value()) {
return BootTimestamp::max_time();
}
if (message.value().monotonic_event_time.boot == boot_count()) {
ObserveNextMessage(message.value().monotonic_event_time.time,
message.value().realtime_event_time);
}
return message.value().monotonic_event_time;
}
BootTimestamp LogReader::State::SingleThreadedOldestMessageTime() {
CHECK(!message_queuer_.has_value())
<< "Cannot use SingleThreadedOldestMessageTime() once the queuer thread "
"is created.";
if (timestamp_mapper_ == nullptr) {
return BootTimestamp::max_time();
}
TimestampedMessage *result_ptr = timestamp_mapper_->Front();
if (result_ptr == nullptr) {
return BootTimestamp::max_time();
}
VLOG(2) << "Node '" << MaybeNodeName(node()) << "': oldest message at "
<< result_ptr->monotonic_event_time.time;
if (result_ptr->monotonic_event_time.boot == boot_count()) {
ObserveNextMessage(result_ptr->monotonic_event_time.time,
result_ptr->realtime_event_time);
}
return result_ptr->monotonic_event_time;
}
void LogReader::State::ReadTimestamps() {
if (!timestamp_mapper_) return;
timestamp_mapper_->QueueTimestamps();
// TODO(austin): Maybe make timestamp mapper do this so we don't need to clear
// it out?
//
// If we don't clear the timestamp callback, we end up getting a callback both
// when SplitTimestampBootMerger sees the timestamp, and when TimestampMapper
// sees it.
timestamp_mapper_->set_timestamp_callback([](TimestampedMessage *) {});
}
void LogReader::State::MaybeSeedSortedMessages() {
if (!timestamp_mapper_) return;
// The whole purpose of seeding is to make timestamp_mapper load timestamps.
// So if we have already loaded them, skip seeding.
if (timestamp_queue_strategy_ ==
TimestampQueueStrategy::kQueueTimestampsAtStartup)
return;
timestamp_mapper_->QueueFor(chrono::duration_cast<chrono::seconds>(
chrono::duration<double>(FLAGS_time_estimation_buffer_seconds)));
}
void LogReader::State::Deregister() {
if (started_ && !stopped_) {
NotifyLogfileEnd();
}
for (size_t i = 0; i < channels_.size(); ++i) {
channels_[i].reset();
}
ClearTimeFlags();
channel_timestamp_loggers_.clear();
timestamp_loggers_.clear();
event_loop_unique_ptr_.reset();
event_loop_ = nullptr;
timer_handler_ = nullptr;
node_event_loop_factory_ = nullptr;
timing_statistics_sender_ = Sender<timing::ReplayTiming>();
}
void LogReader::State::SetStartTimeFlag(realtime_clock::time_point start_time) {
if (start_time != realtime_clock::min_time) {
start_event_notifier_ = std::make_unique<EventNotifier>(
event_loop_, [this]() { NotifyFlagStart(); }, "flag_start", start_time);
}
}
void LogReader::State::SetEndTimeFlag(realtime_clock::time_point end_time) {
if (end_time != realtime_clock::max_time) {
end_event_notifier_ = std::make_unique<EventNotifier>(
event_loop_, [this]() { NotifyFlagEnd(); }, "flag_end", end_time);
}
}
void LogReader::State::ObserveNextMessage(
monotonic_clock::time_point monotonic_event,
realtime_clock::time_point realtime_event) {
if (start_event_notifier_) {
start_event_notifier_->ObserveNextMessage(monotonic_event, realtime_event);
}
if (end_event_notifier_) {
end_event_notifier_->ObserveNextMessage(monotonic_event, realtime_event);
}
}
void LogReader::State::ClearTimeFlags() {
start_event_notifier_.reset();
end_event_notifier_.reset();
}
void LogReader::State::NotifyLogfileStart() {
// If the start_event_notifier_ is set, that means that a realtime start time
// was set manually; when the override is set, we want to delay any startup
// handlers that would've happened before requested start time until that
// start time.
if (start_event_notifier_) {
// Only call OnStart() if the start time for this node
// (realtime_start_time())
if (start_event_notifier_->realtime_event_time() >
realtime_start_time(boot_count())) {
VLOG(1) << "Skipping, " << start_event_notifier_->realtime_event_time()
<< " > " << realtime_start_time(boot_count());
return;
}
}
if (found_last_message_) {
VLOG(1) << "Last message already found, bailing";
return;
}
RunOnStart();
}
void LogReader::State::NotifyFlagStart() {
// Should only be called if start_event_notifier_ has been set (which happens
// as part of setting an explicit start time); only call the startup functions
// that occurred *before* the start flag value.
if (start_event_notifier_->realtime_event_time() >=
realtime_start_time(boot_count())) {
RunOnStart();
}
}
void LogReader::State::NotifyLogfileEnd() {
// Don't execute the OnEnd handlers if the logfile was ended artifically
// early.
if (found_last_message_) {
return;
}
// Ensure that we only call OnEnd() if OnStart() was already called for this
// boot (and don't call OnEnd() twice).
if (!stopped_ && started_) {
RunOnEnd();
}
}
void LogReader::State::NotifyFlagEnd() {
// Ensure that we only call OnEnd() if OnStart() was already called for this
// boot (and don't call OnEnd() twice).
if (!stopped_ && started_) {
RunOnEnd();
SetFoundLastMessage(true);
CHECK(notice_realtime_end_);
notice_realtime_end_();
if (message_queuer_.has_value()) {
message_queuer_->StopPushing();
}
}
}
void LogReader::State::MaybeSetClockOffset() {
if (node_event_loop_factory_ == nullptr) {
// If not running with simulated event loop, set the monotonic clock
// offset.
clock_offset_ = event_loop()->monotonic_now() - monotonic_start_time(0);
if (start_event_notifier_) {
start_event_notifier_->SetClockOffset(clock_offset_);
}
if (end_event_notifier_) {
end_event_notifier_->SetClockOffset(clock_offset_);
}
}
}
void LogReader::SetRealtimeReplayRate(double replay_rate) {
CHECK(event_loop_factory_ != nullptr)
<< ": Can't set replay rate without an event loop factory (have you "
"called Register()?).";
event_loop_factory_->SetRealtimeReplayRate(replay_rate);
}
void LogReader::NoticeRealtimeEnd() {
CHECK_GE(live_nodes_with_realtime_time_end_, 1u);
--live_nodes_with_realtime_time_end_;
if (live_nodes_with_realtime_time_end_ == 0 && exit_on_finish() &&
event_loop_factory_ != nullptr) {
event_loop_factory_->Exit();
}
}
bool LogReader::AreStatesInitialized() const {
for (const auto &state : states_) {
if (state) {
return true;
}
}
return false;
}
} // namespace logger
} // namespace aos