Add multi-node log file reading
This handles timestamps, sorting, and merging with data.
For simplicity, we read the log files once per node. Once benchmarks
show if this is a bad idea, we can fix it.
Change-Id: I445ac5bfc7186bda25cc899602ac8d95a4cb946d
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index c501d7b..84e1f43 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -26,12 +26,22 @@
Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop,
std::chrono::milliseconds polling_period)
+ : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()),
+ event_loop, polling_period) {}
+
+Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop,
+ std::chrono::milliseconds polling_period)
: event_loop_(event_loop),
- writer_(writer),
+ log_namer_(std::move(log_namer)),
timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })),
polling_period_(polling_period) {
+ VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node());
+ int channel_index = 0;
for (const Channel *channel : *event_loop_->configuration()->channels()) {
FetcherStruct fs;
+ const bool is_local =
+ configuration::ChannelIsSendableOnNode(channel, event_loop_->node());
+
const bool is_readable =
configuration::ChannelIsReadableOnNode(channel, event_loop_->node());
const bool log_message = configuration::ChannelMessageIsLoggedOnNode(
@@ -50,28 +60,21 @@
<< configuration::CleanedChannelToString(channel);
if (log_delivery_times) {
- if (log_message) {
- VLOG(1) << " Logging message and delivery times";
- fs.log_type = LogType::kLogMessageAndDeliveryTime;
- } else {
- VLOG(1) << " Logging delivery times only";
- fs.log_type = LogType::kLogDeliveryTimeOnly;
- }
- } else {
- // We don't have a particularly great use case right now for logging a
- // forwarded message, but either not logging the delivery times, or
- // logging them on another node. Fail rather than produce bad results.
- CHECK(configuration::ChannelIsSendableOnNode(channel,
- event_loop_->node()))
- << ": Logger only knows how to log remote messages with "
- "forwarding timestamps.";
- VLOG(1) << " Logging message only";
- fs.log_type = LogType::kLogMessage;
+ VLOG(1) << " Delivery times";
+ fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
}
+ if (log_message) {
+ VLOG(1) << " Data";
+ fs.writer = log_namer_->MakeWriter(channel);
+ if (!is_local) {
+ fs.log_type = LogType::kLogRemoteMessage;
+ }
+ }
+ fs.channel_index = channel_index;
+ fs.written = false;
+ fetchers_.emplace_back(std::move(fs));
}
-
- fs.written = false;
- fetchers_.emplace_back(std::move(fs));
+ ++channel_index;
}
// When things start, we want to log the header, then the most recent messages
@@ -82,9 +85,7 @@
// so we can capture the latest message on each channel. This lets us have
// non periodic messages with configuration that now get logged.
for (FetcherStruct &f : fetchers_) {
- if (f.fetcher.get() != nullptr) {
- f.written = !f.fetcher->Fetch();
- }
+ f.written = !f.fetcher->Fetch();
}
// We need to pick a point in time to declare the log file "started". This
@@ -105,6 +106,11 @@
}
void Logger::WriteHeader() {
+ for (const Node *node : log_namer_->nodes()) {
+ WriteHeader(node);
+ }
+}
+void Logger::WriteHeader(const Node *node) {
// Now write the header with this timestamp in it.
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(1);
@@ -117,7 +123,7 @@
flatbuffers::Offset<Node> node_offset;
if (event_loop_->node() != nullptr) {
- node_offset = CopyFlatBuffer(event_loop_->node(), &fbb);
+ node_offset = CopyFlatBuffer(node, &fbb);
}
aos::logger::LogFileHeader::Builder log_file_header_builder(fbb);
@@ -125,7 +131,7 @@
log_file_header_builder.add_name(string_offset);
// Only add the node if we are running in a multinode configuration.
- if (event_loop_->node() != nullptr) {
+ if (node != nullptr) {
log_file_header_builder.add_node(node_offset);
}
@@ -149,15 +155,32 @@
.count());
fbb.FinishSizePrefixed(log_file_header_builder.Finish());
- writer_->QueueSizedFlatbuffer(&fbb);
+ log_namer_->WriteHeader(&fbb, node);
}
void Logger::Rotate(DetachedBufferWriter *writer) {
+ Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node()));
+}
+
+void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) {
// Force data up until now to be written.
DoLogData();
// Swap the writer out, and re-write the header.
- writer_ = writer;
+ log_namer_ = std::move(log_namer);
+
+ // And then update the writers.
+ for (FetcherStruct &f : fetchers_) {
+ const Channel *channel =
+ event_loop_->configuration()->channels()->Get(f.channel_index);
+ if (f.timestamp_writer != nullptr) {
+ f.timestamp_writer = log_namer_->MakeTimestampWriter(channel);
+ }
+ if (f.writer != nullptr) {
+ f.writer = log_namer_->MakeWriter(channel);
+ }
+ }
+
WriteHeader();
}
@@ -173,61 +196,81 @@
// per iteration, even if it is small.
last_synchronized_time_ =
std::min(last_synchronized_time_ + polling_period_, monotonic_now);
- size_t channel_index = 0;
// Write each channel to disk, one at a time.
for (FetcherStruct &f : fetchers_) {
- // Skip any channels which we aren't supposed to log.
- if (f.fetcher.get() != nullptr) {
- while (true) {
- if (f.written) {
- if (!f.fetcher->FetchNext()) {
- VLOG(2) << "No new data on "
- << configuration::CleanedChannelToString(
- f.fetcher->channel());
- break;
- } else {
- f.written = false;
- }
+ while (true) {
+ if (f.written) {
+ if (!f.fetcher->FetchNext()) {
+ VLOG(2) << "No new data on "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel());
+ break;
+ } else {
+ f.written = false;
}
+ }
- CHECK(!f.written);
+ CHECK(!f.written);
- // TODO(james): Write tests to exercise this logic.
- if (f.fetcher->context().monotonic_event_time <
- last_synchronized_time_) {
+ // TODO(james): Write tests to exercise this logic.
+ if (f.fetcher->context().monotonic_event_time <
+ last_synchronized_time_) {
+ if (f.writer != nullptr) {
// Write!
flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size +
max_header_size_);
fbb.ForceDefaults(1);
fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
- channel_index, f.log_type));
+ f.channel_index, f.log_type));
- VLOG(2) << "Writing data for channel "
+ VLOG(1) << "Writing data as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
<< configuration::CleanedChannelToString(
- f.fetcher->channel());
+ f.fetcher->channel())
+ << " to " << f.writer->filename()
+ << " data "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
max_header_size_ = std::max(
max_header_size_, fbb.GetSize() - f.fetcher->context().size);
- writer_->QueueSizedFlatbuffer(&fbb);
-
- f.written = true;
- } else {
- break;
+ f.writer->QueueSizedFlatbuffer(&fbb);
}
+
+ if (f.timestamp_writer != nullptr) {
+ // And now handle timestamps.
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.ForceDefaults(1);
+
+ fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(),
+ f.channel_index,
+ LogType::kLogDeliveryTimeOnly));
+
+ VLOG(1) << "Writing timestamps as node "
+ << FlatbufferToJson(event_loop_->node()) << " for channel "
+ << configuration::CleanedChannelToString(
+ f.fetcher->channel())
+ << " to " << f.timestamp_writer->filename()
+ << " timestamp "
+ << FlatbufferToJson(
+ flatbuffers::GetSizePrefixedRoot<MessageHeader>(
+ fbb.GetBufferPointer()));
+
+ f.timestamp_writer->QueueSizedFlatbuffer(&fbb);
+ }
+
+ f.written = true;
+ } else {
+ break;
}
}
-
- ++channel_index;
}
- CHECK_EQ(channel_index, fetchers_.size());
-
// If we missed cycles, we could be pretty far behind. Spin until we are
// caught up.
} while (last_synchronized_time_ + polling_period_ < monotonic_now);
-
- writer_->Flush();
}
LogReader::LogReader(std::string_view filename,
@@ -237,41 +280,58 @@
LogReader::LogReader(const std::vector<std::string> &filenames,
const Configuration *replay_configuration)
- : sorted_message_reader_(filenames),
+ : LogReader(std::vector<std::vector<std::string>>{filenames},
+ replay_configuration) {}
+
+LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
+ const Configuration *replay_configuration)
+ : filenames_(filenames),
+ log_file_header_(ReadHeader(filenames[0][0])),
replay_configuration_(replay_configuration) {
- channels_.resize(logged_configuration()->channels()->size());
MakeRemappedConfig();
+
+ if (!configuration::MultiNode(configuration())) {
+ auto it = channel_mergers_.insert(std::make_pair(nullptr, State{}));
+ State *state = &(it.first->second);
+
+ state->channel_merger = std::make_unique<ChannelMerger>(filenames);
+ }
}
LogReader::~LogReader() { Deregister(); }
const Configuration *LogReader::logged_configuration() const {
- return sorted_message_reader_.configuration();
+ return log_file_header_.message().configuration();
}
const Configuration *LogReader::configuration() const {
return remapped_configuration_;
}
-const Node *LogReader::node() const {
+std::vector<const Node *> LogReader::Nodes() const {
// Because the Node pointer will only be valid if it actually points to memory
// owned by remapped_configuration_, we need to wait for the
// remapped_configuration_ to be populated before accessing it.
+ //
+ // Also, note, that when ever a map is changed, the nodes in here are
+ // invalidated.
CHECK(remapped_configuration_ != nullptr)
<< ": Need to call Register before the node() pointer will be valid.";
- if (sorted_message_reader_.node() == nullptr) {
- return nullptr;
- }
- return configuration::GetNode(
- configuration(), sorted_message_reader_.node()->name()->string_view());
+ return configuration::GetNodes(remapped_configuration_);
}
-monotonic_clock::time_point LogReader::monotonic_start_time() {
- return sorted_message_reader_.monotonic_start_time();
+monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) {
+ auto it = channel_mergers_.find(node);
+ CHECK(it != channel_mergers_.end())
+ << ": Unknown node " << FlatbufferToJson(node);
+ return it->second.channel_merger->monotonic_start_time();
}
-realtime_clock::time_point LogReader::realtime_start_time() {
- return sorted_message_reader_.realtime_start_time();
+realtime_clock::time_point LogReader::realtime_start_time(const Node *node) {
+ auto it = channel_mergers_.find(node);
+ CHECK(it != channel_mergers_.end())
+ << ": Unknown node " << FlatbufferToJson(node);
+ return it->second.channel_merger->realtime_start_time();
}
void LogReader::Register() {
@@ -282,126 +342,156 @@
void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) {
event_loop_factory_ = event_loop_factory;
- node_event_loop_factory_ =
- event_loop_factory_->GetNodeEventLoopFactory(node());
- event_loop_unique_ptr_ =
- event_loop_factory->MakeEventLoop("log_reader", node());
- // We don't run timing reports when trying to print out logged data, because
- // otherwise we would end up printing out the timing reports themselves...
- // This is only really relevant when we are replaying into a simulation.
- event_loop_unique_ptr_->SkipTimingReport();
+ // We want to start the log file at the last start time of the log files from
+ // all the nodes. Compute how long each node's simulation needs to run to
+ // move time to this point.
+ monotonic_clock::duration run_time = monotonic_clock::duration(0);
- Register(event_loop_unique_ptr_.get());
- event_loop_factory_->RunFor(monotonic_start_time() -
- event_loop_->monotonic_now());
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ auto it = channel_mergers_.insert(std::make_pair(node, State{}));
+
+ State *state = &(it.first->second);
+
+ state->channel_merger = std::make_unique<ChannelMerger>(filenames_);
+
+ state->node_event_loop_factory =
+ event_loop_factory_->GetNodeEventLoopFactory(node);
+ state->event_loop_unique_ptr =
+ event_loop_factory->MakeEventLoop("log_reader", node);
+
+ Register(state->event_loop_unique_ptr.get());
+
+ const monotonic_clock::duration startup_time =
+ state->channel_merger->monotonic_start_time() -
+ state->event_loop->monotonic_now();
+ if (startup_time > run_time) {
+ run_time = startup_time;
+ }
+ }
+
+ // Forwarding is tracked per channel. If it is enabled, we want to turn it
+ // off. Otherwise messages replayed will get forwarded across to the other
+ // nodes, and also replayed on the other nodes. This may not satisfy all our
+ // users, but it'll start the discussion.
+ if (configuration::MultiNode(event_loop_factory_->configuration())) {
+ for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) {
+ const Channel *channel = logged_configuration()->channels()->Get(i);
+ const Node *node = configuration::GetNode(
+ configuration(), channel->source_node()->string_view());
+
+ auto state_pair = channel_mergers_.find(node);
+ CHECK(state_pair != channel_mergers_.end());
+ State *state = &(state_pair->second);
+
+ const Channel *remapped_channel =
+ RemapChannel(state->event_loop, channel);
+
+ event_loop_factory_->DisableForwarding(remapped_channel);
+ }
+ }
+
+ event_loop_factory_->RunFor(run_time);
}
void LogReader::Register(EventLoop *event_loop) {
- event_loop_ = event_loop;
+ auto state_pair = channel_mergers_.find(event_loop->node());
+ CHECK(state_pair != channel_mergers_.end());
+ State *state = &(state_pair->second);
+
+ state->event_loop = event_loop;
// We don't run timing reports when trying to print out logged data, because
// otherwise we would end up printing out the timing reports themselves...
// This is only really relevant when we are replaying into a simulation.
- // Otherwise we replay the timing report and try to resend it...
- event_loop_->SkipTimingReport();
- event_loop_->SkipAosLog();
+ event_loop->SkipTimingReport();
+ event_loop->SkipAosLog();
- for (size_t i = 0; i < channels_.size(); ++i) {
- const Channel *const original_channel =
- logged_configuration()->channels()->Get(i);
+ state->channel_merger->SetNode(event_loop->node());
- std::string_view channel_name = original_channel->name()->string_view();
- std::string_view channel_type = original_channel->type()->string_view();
- // If the channel is remapped, find the correct channel name to use.
- if (remapped_channels_.count(i) > 0) {
- VLOG(2) << "Got remapped channel on "
- << configuration::CleanedChannelToString(original_channel);
- channel_name = remapped_channels_[i];
- }
+ state->channels.resize(logged_configuration()->channels()->size());
- VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
- const Channel *channel = configuration::GetChannel(
- event_loop_->configuration(), channel_name, channel_type,
- event_loop_->name(), event_loop_->node());
+ for (size_t i = 0; i < state->channels.size(); ++i) {
+ const Channel *channel =
+ RemapChannel(event_loop, logged_configuration()->channels()->Get(i));
- CHECK(channel != nullptr)
- << ": Unable to send {\"name\": \"" << channel_name
- << "\", \"type\": \"" << channel_type
- << "\"} because it is not in the provided configuration.";
-
- channels_[i] = event_loop_->MakeRawSender(channel);
+ state->channels[i] = event_loop->MakeRawSender(channel);
}
- timer_handler_ = event_loop_->AddTimer([this]() {
- if (sorted_message_reader_.active_channel_count() == 0u) {
- event_loop_factory_->Exit();
+ state->timer_handler = event_loop->AddTimer([this, state]() {
+ if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) {
+ --live_nodes_;
+ if (live_nodes_ == 0) {
+ event_loop_factory_->Exit();
+ }
return;
}
- monotonic_clock::time_point channel_timestamp;
+ TimestampMerger::DeliveryTimestamp channel_timestamp;
int channel_index;
FlatbufferVector<MessageHeader> channel_data =
FlatbufferVector<MessageHeader>::Empty();
std::tie(channel_timestamp, channel_index, channel_data) =
- sorted_message_reader_.PopOldestChannel();
+ state->channel_merger->PopOldest();
const monotonic_clock::time_point monotonic_now =
- event_loop_->context().monotonic_event_time;
- CHECK(monotonic_now == channel_timestamp)
+ state->event_loop->context().monotonic_event_time;
+ CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
<< ": Now " << monotonic_now.time_since_epoch().count()
- << " trying to send " << channel_timestamp.time_since_epoch().count();
+ << " trying to send "
+ << channel_timestamp.monotonic_event_time.time_since_epoch().count();
- if (channel_timestamp > monotonic_start_time() ||
+ if (channel_timestamp.monotonic_event_time >
+ state->channel_merger->monotonic_start_time() ||
event_loop_factory_ != nullptr) {
if (!FLAGS_skip_missing_forwarding_entries ||
channel_data.message().data() != nullptr) {
CHECK(channel_data.message().data() != nullptr)
<< ": Got a message without data. Forwarding entry which was "
- "not "
- "matched? Use --skip_missing_forwarding_entries to ignore "
+ "not matched? Use --skip_missing_forwarding_entries to ignore "
"this.";
// If we have access to the factory, use it to fix the realtime time.
- if (node_event_loop_factory_ != nullptr) {
- node_event_loop_factory_->SetRealtimeOffset(
- monotonic_clock::time_point(chrono::nanoseconds(
- channel_data.message().monotonic_sent_time())),
- realtime_clock::time_point(chrono::nanoseconds(
- channel_data.message().realtime_sent_time())));
+ if (state->node_event_loop_factory != nullptr) {
+ state->node_event_loop_factory->SetRealtimeOffset(
+ channel_timestamp.monotonic_event_time,
+ channel_timestamp.realtime_event_time);
}
- channels_[channel_index]->Send(
+ state->channels[channel_index]->Send(
channel_data.message().data()->Data(),
channel_data.message().data()->size(),
- monotonic_clock::time_point(chrono::nanoseconds(
- channel_data.message().monotonic_remote_time())),
- realtime_clock::time_point(chrono::nanoseconds(
- channel_data.message().realtime_remote_time())),
- channel_data.message().remote_queue_index());
+ channel_timestamp.monotonic_remote_time,
+ channel_timestamp.realtime_remote_time,
+ channel_timestamp.remote_queue_index);
}
} else {
- LOG(WARNING) << "Not sending data from before the start of the log file. "
- << channel_timestamp.time_since_epoch().count() << " start "
- << monotonic_start_time().time_since_epoch().count() << " "
- << FlatbufferToJson(channel_data);
+ LOG(WARNING)
+ << "Not sending data from before the start of the log file. "
+ << channel_timestamp.monotonic_event_time.time_since_epoch().count()
+ << " start " << monotonic_start_time().time_since_epoch().count()
+ << " " << FlatbufferToJson(channel_data);
}
- if (sorted_message_reader_.active_channel_count() > 0u) {
- timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
+ const monotonic_clock::time_point next_time =
+ state->channel_merger->OldestMessage();
+ if (next_time != monotonic_clock::max_time) {
+ state->timer_handler->Setup(next_time);
} else {
// Set a timer up immediately after now to die. If we don't do this, then
// the senders waiting on the message we just read will never get called.
if (event_loop_factory_ != nullptr) {
- timer_handler_->Setup(monotonic_now +
- event_loop_factory_->send_delay() +
- std::chrono::nanoseconds(1));
+ state->timer_handler->Setup(monotonic_now +
+ event_loop_factory_->send_delay() +
+ std::chrono::nanoseconds(1));
}
}
});
- if (sorted_message_reader_.active_channel_count() > 0u) {
- event_loop_->OnRun([this]() {
- timer_handler_->Setup(sorted_message_reader_.oldest_message().first);
+ ++live_nodes_;
+
+ if (state->channel_merger->OldestMessage() != monotonic_clock::max_time) {
+ event_loop->OnRun([state]() {
+ state->timer_handler->Setup(state->channel_merger->OldestMessage());
});
}
}
@@ -409,15 +499,20 @@
void LogReader::Deregister() {
// Make sure that things get destroyed in the correct order, rather than
// relying on getting the order correct in the class definition.
- for (size_t i = 0; i < channels_.size(); ++i) {
- channels_[i].reset();
+ for (const Node *node : Nodes()) {
+ auto state_pair = channel_mergers_.find(node);
+ CHECK(state_pair != channel_mergers_.end());
+ State *state = &(state_pair->second);
+ for (size_t i = 0; i < state->channels.size(); ++i) {
+ state->channels[i].reset();
+ }
+ state->event_loop_unique_ptr.reset();
+ state->event_loop = nullptr;
+ state->node_event_loop_factory = nullptr;
}
- event_loop_unique_ptr_.reset();
- event_loop_ = nullptr;
event_loop_factory_unique_ptr_.reset();
event_loop_factory_ = nullptr;
- node_event_loop_factory_ = nullptr;
}
void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type,
@@ -442,13 +537,15 @@
}
void LogReader::MakeRemappedConfig() {
- CHECK(!event_loop_)
- << ": Can't change the mapping after the events are scheduled.";
+ for (std::pair<const Node *const, State> &state : channel_mergers_) {
+ CHECK(!state.second.event_loop)
+ << ": Can't change the mapping after the events are scheduled.";
+ }
// If no remapping occurred and we are using the original config, then there
// is nothing interesting to do here.
if (remapped_channels_.empty() && replay_configuration_ == nullptr) {
- remapped_configuration_ = sorted_message_reader_.configuration();
+ remapped_configuration_ = logged_configuration();
return;
}
// Config to copy Channel definitions from. Use the specified
@@ -526,5 +623,30 @@
remapped_configuration_ = &remapped_configuration_buffer_->message();
}
+const Channel *LogReader::RemapChannel(const EventLoop *event_loop,
+ const Channel *channel) {
+ std::string_view channel_name = channel->name()->string_view();
+ std::string_view channel_type = channel->type()->string_view();
+ const int channel_index =
+ configuration::ChannelIndex(logged_configuration(), channel);
+ // If the channel is remapped, find the correct channel name to use.
+ if (remapped_channels_.count(channel_index) > 0) {
+ VLOG(2) << "Got remapped channel on "
+ << configuration::CleanedChannelToString(channel);
+ channel_name = remapped_channels_[channel_index];
+ }
+
+ VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type;
+ const Channel *remapped_channel = configuration::GetChannel(
+ event_loop->configuration(), channel_name, channel_type,
+ event_loop->name(), event_loop->node());
+
+ CHECK(remapped_channel != nullptr)
+ << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \""
+ << channel_type << "\"} because it is not in the provided configuration.";
+
+ return remapped_channel;
+}
+
} // namespace logger
} // namespace aos