| #include "aos/events/logging/logger.h" |
| |
| #include <fcntl.h> |
| #include <limits.h> |
| #include <sys/stat.h> |
| #include <sys/types.h> |
| #include <sys/uio.h> |
| #include <vector> |
| |
| #include "absl/types/span.h" |
| #include "aos/events/event_loop.h" |
| #include "aos/events/logging/logger_generated.h" |
| #include "aos/flatbuffer_merge.h" |
| #include "aos/network/team_number.h" |
| #include "aos/time/time.h" |
| #include "flatbuffers/flatbuffers.h" |
| |
| DEFINE_bool(skip_missing_forwarding_entries, false, |
| "If true, drop any forwarding entries with missing data. If " |
| "false, CHECK."); |
| |
| namespace aos { |
| namespace logger { |
| |
| namespace chrono = std::chrono; |
| |
| Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop, |
| std::chrono::milliseconds polling_period) |
| : event_loop_(event_loop), |
| writer_(writer), |
| timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })), |
| polling_period_(polling_period) { |
| for (const Channel *channel : *event_loop_->configuration()->channels()) { |
| FetcherStruct fs; |
| const bool is_readable = |
| configuration::ChannelIsReadableOnNode(channel, event_loop_->node()); |
| const bool log_message = configuration::ChannelMessageIsLoggedOnNode( |
| channel, event_loop_->node()) && |
| is_readable; |
| |
| const bool log_delivery_times = |
| (event_loop_->node() == nullptr) |
| ? false |
| : configuration::ConnectionDeliveryTimeIsLoggedOnNode( |
| channel, event_loop_->node(), event_loop_->node()); |
| |
| if (log_message || log_delivery_times) { |
| fs.fetcher = event_loop->MakeRawFetcher(channel); |
| VLOG(1) << "Logging channel " |
| << configuration::CleanedChannelToString(channel); |
| |
| if (log_delivery_times) { |
| if (log_message) { |
| VLOG(1) << " Logging message and delivery times"; |
| fs.log_type = LogType::kLogMessageAndDeliveryTime; |
| } else { |
| VLOG(1) << " Logging delivery times only"; |
| fs.log_type = LogType::kLogDeliveryTimeOnly; |
| } |
| } else { |
| // We don't have a particularly great use case right now for logging a |
| // forwarded message, but either not logging the delivery times, or |
| // logging them on another node. Fail rather than produce bad results. |
| CHECK(configuration::ChannelIsSendableOnNode(channel, |
| event_loop_->node())) |
| << ": Logger only knows how to log remote messages with " |
| "forwarding timestamps."; |
| VLOG(1) << " Logging message only"; |
| fs.log_type = LogType::kLogMessage; |
| } |
| } |
| |
| fs.written = false; |
| fetchers_.emplace_back(std::move(fs)); |
| } |
| |
| // When things start, we want to log the header, then the most recent messages |
| // available on each fetcher to capture the previous state, then start |
| // polling. |
| event_loop_->OnRun([this, polling_period]() { |
| // Grab data from each channel right before we declare the log file started |
| // so we can capture the latest message on each channel. This lets us have |
| // non periodic messages with configuration that now get logged. |
| for (FetcherStruct &f : fetchers_) { |
| if (f.fetcher.get() != nullptr) { |
| f.written = !f.fetcher->Fetch(); |
| } |
| } |
| |
| // We need to pick a point in time to declare the log file "started". This |
| // starts here. It needs to be after everything is fetched so that the |
| // fetchers are all pointed at the most recent message before the start |
| // time. |
| const monotonic_clock::time_point monotonic_now = |
| event_loop_->monotonic_now(); |
| const realtime_clock::time_point realtime_now = event_loop_->realtime_now(); |
| last_synchronized_time_ = monotonic_now; |
| |
| { |
| // Now write the header with this timestamp in it. |
| flatbuffers::FlatBufferBuilder fbb; |
| fbb.ForceDefaults(1); |
| |
| flatbuffers::Offset<aos::Configuration> configuration_offset = |
| CopyFlatBuffer(event_loop_->configuration(), &fbb); |
| |
| flatbuffers::Offset<flatbuffers::String> string_offset = |
| fbb.CreateString(network::GetHostname()); |
| |
| flatbuffers::Offset<Node> node_offset; |
| if (event_loop_->node() != nullptr) { |
| node_offset = CopyFlatBuffer(event_loop_->node(), &fbb); |
| } |
| LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node()); |
| |
| aos::logger::LogFileHeader::Builder log_file_header_builder(fbb); |
| |
| log_file_header_builder.add_name(string_offset); |
| |
| // Only add the node if we are running in a multinode configuration. |
| if (event_loop_->node() != nullptr) { |
| log_file_header_builder.add_node(node_offset); |
| } |
| |
| log_file_header_builder.add_configuration(configuration_offset); |
| // The worst case theoretical out of order is the polling period times 2. |
| // One message could get logged right after the boundary, but be for right |
| // before the next boundary. And the reverse could happen for another |
| // message. Report back 3x to be extra safe, and because the cost isn't |
| // huge on the read side. |
| log_file_header_builder.add_max_out_of_order_duration( |
| std::chrono::duration_cast<std::chrono::nanoseconds>(3 * |
| polling_period) |
| .count()); |
| |
| log_file_header_builder.add_monotonic_start_time( |
| std::chrono::duration_cast<std::chrono::nanoseconds>( |
| monotonic_now.time_since_epoch()) |
| .count()); |
| log_file_header_builder.add_realtime_start_time( |
| std::chrono::duration_cast<std::chrono::nanoseconds>( |
| realtime_now.time_since_epoch()) |
| .count()); |
| |
| fbb.FinishSizePrefixed(log_file_header_builder.Finish()); |
| writer_->QueueSizedFlatbuffer(&fbb); |
| } |
| |
| timer_handler_->Setup(event_loop_->monotonic_now() + polling_period, |
| polling_period); |
| }); |
| } |
| |
| void Logger::DoLogData() { |
| // We want to guarentee that messages aren't out of order by more than |
| // max_out_of_order_duration. To do this, we need sync points. Every write |
| // cycle should be a sync point. |
| const monotonic_clock::time_point monotonic_now = monotonic_clock::now(); |
| |
| do { |
| // Move the sync point up by at most polling_period. This forces one sync |
| // per iteration, even if it is small. |
| last_synchronized_time_ = |
| std::min(last_synchronized_time_ + polling_period_, monotonic_now); |
| size_t channel_index = 0; |
| // Write each channel to disk, one at a time. |
| for (FetcherStruct &f : fetchers_) { |
| // Skip any channels which we aren't supposed to log. |
| if (f.fetcher.get() != nullptr) { |
| while (true) { |
| if (f.written) { |
| if (!f.fetcher->FetchNext()) { |
| VLOG(2) << "No new data on " |
| << configuration::CleanedChannelToString( |
| f.fetcher->channel()); |
| break; |
| } else { |
| f.written = false; |
| } |
| } |
| |
| CHECK(!f.written); |
| |
| // TODO(james): Write tests to exercise this logic. |
| if (f.fetcher->context().monotonic_event_time < |
| last_synchronized_time_) { |
| // Write! |
| flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size + |
| max_header_size_); |
| fbb.ForceDefaults(1); |
| |
| fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(), |
| channel_index, f.log_type)); |
| |
| VLOG(2) << "Writing data for channel " |
| << configuration::CleanedChannelToString( |
| f.fetcher->channel()); |
| |
| max_header_size_ = std::max( |
| max_header_size_, fbb.GetSize() - f.fetcher->context().size); |
| writer_->QueueSizedFlatbuffer(&fbb); |
| |
| f.written = true; |
| } else { |
| break; |
| } |
| } |
| } |
| |
| ++channel_index; |
| } |
| |
| CHECK_EQ(channel_index, fetchers_.size()); |
| |
| // If we missed cycles, we could be pretty far behind. Spin until we are |
| // caught up. |
| } while (last_synchronized_time_ + polling_period_ < monotonic_now); |
| |
| writer_->Flush(); |
| } |
| |
| LogReader::LogReader(std::string_view filename, |
| const Configuration *replay_configuration) |
| : sorted_message_reader_(filename), |
| replay_configuration_(replay_configuration) { |
| channels_.resize(logged_configuration()->channels()->size()); |
| } |
| |
| LogReader::~LogReader() { |
| Deregister(); |
| } |
| |
| const Configuration *LogReader::logged_configuration() const { |
| return sorted_message_reader_.configuration(); |
| } |
| |
| const Configuration *LogReader::configuration() const { |
| CHECK(remapped_configuration_ != nullptr) |
| << ": Need to call Register() before the remapped config will be " |
| "generated."; |
| return remapped_configuration_; |
| } |
| |
| const Node *LogReader::node() const { |
| // Because the Node pointer will only be valid if it actually points to memory |
| // owned by remapped_configuration_, we need to wait for the |
| // remapped_configuration_ to be populated before accessing it. |
| CHECK(remapped_configuration_ != nullptr) |
| << ": Need to call Register before the node() pointer will be valid."; |
| if (sorted_message_reader_.node() == nullptr) { |
| return nullptr; |
| } |
| return configuration::GetNode( |
| configuration(), sorted_message_reader_.node()->name()->string_view()); |
| } |
| |
| monotonic_clock::time_point LogReader::monotonic_start_time() { |
| return sorted_message_reader_.monotonic_start_time(); |
| } |
| |
| realtime_clock::time_point LogReader::realtime_start_time() { |
| return sorted_message_reader_.realtime_start_time(); |
| } |
| |
| void LogReader::Register() { |
| MakeRemappedConfig(); |
| event_loop_factory_unique_ptr_ = |
| std::make_unique<SimulatedEventLoopFactory>(configuration(), node()); |
| Register(event_loop_factory_unique_ptr_.get()); |
| } |
| |
| void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) { |
| event_loop_factory_ = event_loop_factory; |
| event_loop_unique_ptr_ = event_loop_factory_->MakeEventLoop("log_reader"); |
| // We don't run timing reports when trying to print out logged data, because |
| // otherwise we would end up printing out the timing reports themselves... |
| // This is only really relevant when we are replaying into a simulation. |
| event_loop_unique_ptr_->SkipTimingReport(); |
| |
| Register(event_loop_unique_ptr_.get()); |
| event_loop_factory_->RunFor(monotonic_start_time() - |
| event_loop_->monotonic_now()); |
| } |
| |
| void LogReader::Register(EventLoop *event_loop) { |
| event_loop_ = event_loop; |
| |
| // Otherwise we replay the timing report and try to resend it... |
| event_loop_->SkipTimingReport(); |
| |
| for (size_t i = 0; i < channels_.size(); ++i) { |
| const Channel *const original_channel = |
| logged_configuration()->channels()->Get(i); |
| |
| std::string_view channel_name = original_channel->name()->string_view(); |
| std::string_view channel_type = original_channel->type()->string_view(); |
| // If the channel is remapped, find the correct channel name to use. |
| if (remapped_channels_.count(i) > 0) { |
| VLOG(2) << "Got remapped channel on " |
| << configuration::CleanedChannelToString(original_channel); |
| channel_name = remapped_channels_[i]; |
| } |
| VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type; |
| channels_[i] = event_loop_->MakeRawSender(CHECK_NOTNULL( |
| configuration::GetChannel(event_loop_->configuration(), channel_name, |
| channel_type, "", nullptr))); |
| } |
| |
| timer_handler_ = event_loop_->AddTimer([this]() { |
| if (sorted_message_reader_.active_channel_count() == 0u) { |
| event_loop_factory_->Exit(); |
| return; |
| } |
| monotonic_clock::time_point channel_timestamp; |
| int channel_index; |
| FlatbufferVector<MessageHeader> channel_data = |
| FlatbufferVector<MessageHeader>::Empty(); |
| |
| std::tie(channel_timestamp, channel_index, channel_data) = |
| sorted_message_reader_.PopOldestChannel(); |
| |
| const monotonic_clock::time_point monotonic_now = |
| event_loop_->context().monotonic_event_time; |
| CHECK(monotonic_now == channel_timestamp) |
| << ": Now " << monotonic_now.time_since_epoch().count() |
| << " trying to send " << channel_timestamp.time_since_epoch().count(); |
| |
| if (channel_timestamp > monotonic_start_time() || |
| event_loop_factory_ != nullptr) { |
| if (!FLAGS_skip_missing_forwarding_entries || |
| channel_data.message().data() != nullptr) { |
| CHECK(channel_data.message().data() != nullptr) |
| << ": Got a message without data. Forwarding entry which was " |
| "not " |
| "matched? Use --skip_missing_forwarding_entries to ignore " |
| "this."; |
| |
| // If we have access to the factory, use it to fix the realtime time. |
| if (event_loop_factory_ != nullptr) { |
| event_loop_factory_->SetRealtimeOffset( |
| monotonic_clock::time_point(chrono::nanoseconds( |
| channel_data.message().monotonic_sent_time())), |
| realtime_clock::time_point(chrono::nanoseconds( |
| channel_data.message().realtime_sent_time()))); |
| } |
| |
| channels_[channel_index]->Send( |
| channel_data.message().data()->Data(), |
| channel_data.message().data()->size(), |
| monotonic_clock::time_point(chrono::nanoseconds( |
| channel_data.message().monotonic_remote_time())), |
| realtime_clock::time_point(chrono::nanoseconds( |
| channel_data.message().realtime_remote_time())), |
| channel_data.message().remote_queue_index()); |
| } |
| } else { |
| LOG(WARNING) << "Not sending data from before the start of the log file. " |
| << channel_timestamp.time_since_epoch().count() << " start " |
| << monotonic_start_time().time_since_epoch().count() << " " |
| << FlatbufferToJson(channel_data); |
| } |
| |
| if (sorted_message_reader_.active_channel_count() > 0u) { |
| timer_handler_->Setup(sorted_message_reader_.oldest_message().first); |
| } else { |
| // Set a timer up immediately after now to die. If we don't do this, then |
| // the senders waiting on the message we just read will never get called. |
| timer_handler_->Setup(monotonic_now + event_loop_factory_->send_delay() + |
| std::chrono::nanoseconds(1)); |
| } |
| }); |
| |
| if (sorted_message_reader_.active_channel_count() > 0u) { |
| event_loop_->OnRun([this]() { |
| timer_handler_->Setup(sorted_message_reader_.oldest_message().first); |
| }); |
| } |
| } |
| |
| void LogReader::Deregister() { |
| // Make sure that things get destroyed in the correct order, rather than |
| // relying on getting the order correct in the class definition. |
| for (size_t i = 0; i < channels_.size(); ++i) { |
| channels_[i].reset(); |
| } |
| |
| event_loop_unique_ptr_.reset(); |
| event_loop_ = nullptr; |
| event_loop_factory_unique_ptr_.reset(); |
| event_loop_factory_ = nullptr; |
| } |
| |
| void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type, |
| std::string_view add_prefix) { |
| CHECK(remapped_configuration_ == nullptr) |
| << "Must call RemapLoggedChannel before calling Register()."; |
| for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) { |
| const Channel *const channel = logged_configuration()->channels()->Get(ii); |
| if (channel->name()->str() == name && |
| channel->type()->string_view() == type) { |
| CHECK_EQ(0u, remapped_channels_.count(ii)) |
| << "Already remapped channel " |
| << configuration::CleanedChannelToString(channel); |
| remapped_channels_[ii] = std::string(add_prefix) + std::string(name); |
| VLOG(1) << "Remapping channel " |
| << configuration::CleanedChannelToString(channel) |
| << " to have name " << remapped_channels_[ii]; |
| return; |
| } |
| } |
| LOG(FATAL) << "Unabled to locate channel with name " << name << " and type " |
| << type; |
| } |
| |
| void LogReader::MakeRemappedConfig() { |
| // If no remapping occurred and we are using the original config, then there |
| // is nothing interesting to do here. |
| if (remapped_channels_.empty() && replay_configuration_ == nullptr) { |
| remapped_configuration_ = sorted_message_reader_.configuration(); |
| return; |
| } |
| // Config to copy Channel definitions from. Use the specified |
| // replay_configuration_ if it has been provided. |
| const Configuration *const base_config = replay_configuration_ == nullptr |
| ? logged_configuration() |
| : replay_configuration_; |
| // The remapped config will be identical to the base_config, except that it |
| // will have a bunch of extra channels in the channel list, which are exact |
| // copies of the remapped channels, but with different names. |
| // Because the flatbuffers API is a pain to work with, this requires a bit of |
| // a song-and-dance to get copied over. |
| // The order of operations is to: |
| // 1) Make a flatbuffer builder for a config that will just contain a list of |
| // the new channels that we want to add. |
| // 2) For each channel that we are remapping: |
| // a) Make a buffer/builder and construct into it a Channel table that only |
| // contains the new name for the channel. |
| // b) Merge the new channel with just the name into the channel that we are |
| // trying to copy, built in the flatbuffer builder made in 1. This gives |
| // us the new channel definition that we need. |
| // 3) Using this list of offsets, build the Configuration of just new |
| // Channels. |
| // 4) Merge the Configuration with the new Channels into the base_config. |
| // 5) Call MergeConfiguration() on that result to give MergeConfiguration a |
| // chance to sanitize the config. |
| |
| // This is the builder that we use for the config containing all the new |
| // channels. |
| flatbuffers::FlatBufferBuilder new_config_fbb; |
| new_config_fbb.ForceDefaults(1); |
| std::vector<flatbuffers::Offset<Channel>> channel_offsets; |
| for (auto &pair : remapped_channels_) { |
| // This is the builder that we use for creating the Channel with just the |
| // new name. |
| flatbuffers::FlatBufferBuilder new_name_fbb; |
| new_name_fbb.ForceDefaults(1); |
| const flatbuffers::Offset<flatbuffers::String> name_offset = |
| new_name_fbb.CreateString(pair.second); |
| ChannelBuilder new_name_builder(new_name_fbb); |
| new_name_builder.add_name(name_offset); |
| new_name_fbb.Finish(new_name_builder.Finish()); |
| const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release(); |
| // Retrieve the channel that we want to copy, confirming that it is actually |
| // present in base_config. |
| const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel( |
| base_config, logged_configuration()->channels()->Get(pair.first), "", |
| nullptr)); |
| // Actually create the new channel and put it into the vector of Offsets |
| // that we will use to create the new Configuration. |
| channel_offsets.emplace_back(MergeFlatBuffers<Channel>( |
| reinterpret_cast<const flatbuffers::Table *>(base_channel), |
| reinterpret_cast<const flatbuffers::Table *>(&new_name.message()), |
| &new_config_fbb)); |
| } |
| // Create the Configuration containing the new channels that we want to add. |
| const auto |
| new_name_vector_offsets = new_config_fbb.CreateVector(channel_offsets); |
| ConfigurationBuilder new_config_builder(new_config_fbb); |
| new_config_builder.add_channels(new_name_vector_offsets); |
| new_config_fbb.Finish(new_config_builder.Finish()); |
| const FlatbufferDetachedBuffer<Configuration> new_name_config = |
| new_config_fbb.Release(); |
| // Merge the new channels configuration into the base_config, giving us the |
| // remapped configuration. |
| remapped_configuration_buffer_ = |
| std::make_unique<FlatbufferDetachedBuffer<Configuration>>( |
| MergeFlatBuffers<Configuration>(base_config, |
| &new_name_config.message())); |
| // Call MergeConfiguration to deal with sanitizing the config. |
| remapped_configuration_buffer_ = |
| std::make_unique<FlatbufferDetachedBuffer<Configuration>>( |
| configuration::MergeConfiguration(*remapped_configuration_buffer_)); |
| |
| remapped_configuration_ = &remapped_configuration_buffer_->message(); |
| } |
| |
| } // namespace logger |
| } // namespace aos |