| #include "aos/events/logging/logger.h" |
| |
| #include <fcntl.h> |
| #include <limits.h> |
| #include <sys/stat.h> |
| #include <sys/types.h> |
| #include <sys/uio.h> |
| #include <vector> |
| |
| #include "Eigen/Dense" |
| #include "absl/types/span.h" |
| #include "aos/events/event_loop.h" |
| #include "aos/events/logging/logger_generated.h" |
| #include "aos/flatbuffer_merge.h" |
| #include "aos/network/team_number.h" |
| #include "aos/time/time.h" |
| #include "flatbuffers/flatbuffers.h" |
| |
| DEFINE_bool(skip_missing_forwarding_entries, false, |
| "If true, drop any forwarding entries with missing data. If " |
| "false, CHECK."); |
| |
| DEFINE_bool(timestamps_to_csv, false, |
| "If true, write all the time synchronization information to a set " |
| "of CSV files in /tmp/. This should only be needed when debugging " |
| "time synchronization."); |
| |
| namespace aos { |
| namespace logger { |
| |
| namespace chrono = std::chrono; |
| |
| Logger::Logger(DetachedBufferWriter *writer, EventLoop *event_loop, |
| std::chrono::milliseconds polling_period) |
| : Logger(std::make_unique<LocalLogNamer>(writer, event_loop->node()), |
| event_loop, polling_period) {} |
| |
| Logger::Logger(std::unique_ptr<LogNamer> log_namer, EventLoop *event_loop, |
| std::chrono::milliseconds polling_period) |
| : event_loop_(event_loop), |
| log_namer_(std::move(log_namer)), |
| timer_handler_(event_loop_->AddTimer([this]() { DoLogData(); })), |
| polling_period_(polling_period) { |
| VLOG(1) << "Starting logger for " << FlatbufferToJson(event_loop_->node()); |
| int channel_index = 0; |
| for (const Channel *channel : *event_loop_->configuration()->channels()) { |
| FetcherStruct fs; |
| const bool is_local = |
| configuration::ChannelIsSendableOnNode(channel, event_loop_->node()); |
| |
| const bool is_readable = |
| configuration::ChannelIsReadableOnNode(channel, event_loop_->node()); |
| const bool log_message = configuration::ChannelMessageIsLoggedOnNode( |
| channel, event_loop_->node()) && |
| is_readable; |
| |
| const bool log_delivery_times = |
| (event_loop_->node() == nullptr) |
| ? false |
| : configuration::ConnectionDeliveryTimeIsLoggedOnNode( |
| channel, event_loop_->node(), event_loop_->node()); |
| |
| if (log_message || log_delivery_times) { |
| fs.fetcher = event_loop->MakeRawFetcher(channel); |
| VLOG(1) << "Logging channel " |
| << configuration::CleanedChannelToString(channel); |
| |
| if (log_delivery_times) { |
| VLOG(1) << " Delivery times"; |
| fs.timestamp_writer = log_namer_->MakeTimestampWriter(channel); |
| } |
| if (log_message) { |
| VLOG(1) << " Data"; |
| fs.writer = log_namer_->MakeWriter(channel); |
| if (!is_local) { |
| fs.log_type = LogType::kLogRemoteMessage; |
| } |
| } |
| fs.channel_index = channel_index; |
| fs.written = false; |
| fetchers_.emplace_back(std::move(fs)); |
| } |
| ++channel_index; |
| } |
| |
| // When things start, we want to log the header, then the most recent messages |
| // available on each fetcher to capture the previous state, then start |
| // polling. |
| event_loop_->OnRun([this, polling_period]() { |
| // Grab data from each channel right before we declare the log file started |
| // so we can capture the latest message on each channel. This lets us have |
| // non periodic messages with configuration that now get logged. |
| for (FetcherStruct &f : fetchers_) { |
| f.written = !f.fetcher->Fetch(); |
| } |
| |
| // We need to pick a point in time to declare the log file "started". This |
| // starts here. It needs to be after everything is fetched so that the |
| // fetchers are all pointed at the most recent message before the start |
| // time. |
| monotonic_start_time_ = event_loop_->monotonic_now(); |
| realtime_start_time_ = event_loop_->realtime_now(); |
| last_synchronized_time_ = monotonic_start_time_; |
| |
| LOG(INFO) << "Logging node as " << FlatbufferToJson(event_loop_->node()) |
| << " start_time " << monotonic_start_time_; |
| |
| WriteHeader(); |
| |
| timer_handler_->Setup(event_loop_->monotonic_now() + polling_period, |
| polling_period); |
| }); |
| } |
| |
| // TODO(austin): Set the remote start time to the first time we see a remote |
| // message when we are logging those messages separate? Need to signal what to |
| // do, or how to get a good timestamp. |
| void Logger::WriteHeader() { |
| for (const Node *node : log_namer_->nodes()) { |
| WriteHeader(node); |
| } |
| } |
| |
| void Logger::WriteHeader(const Node *node) { |
| // Now write the header with this timestamp in it. |
| flatbuffers::FlatBufferBuilder fbb; |
| fbb.ForceDefaults(true); |
| |
| flatbuffers::Offset<aos::Configuration> configuration_offset = |
| CopyFlatBuffer(event_loop_->configuration(), &fbb); |
| |
| flatbuffers::Offset<flatbuffers::String> string_offset = |
| fbb.CreateString(network::GetHostname()); |
| |
| flatbuffers::Offset<Node> node_offset; |
| if (event_loop_->node() != nullptr) { |
| node_offset = CopyFlatBuffer(node, &fbb); |
| } |
| |
| aos::logger::LogFileHeader::Builder log_file_header_builder(fbb); |
| |
| log_file_header_builder.add_name(string_offset); |
| |
| // Only add the node if we are running in a multinode configuration. |
| if (node != nullptr) { |
| log_file_header_builder.add_node(node_offset); |
| } |
| |
| log_file_header_builder.add_configuration(configuration_offset); |
| // The worst case theoretical out of order is the polling period times 2. |
| // One message could get logged right after the boundary, but be for right |
| // before the next boundary. And the reverse could happen for another |
| // message. Report back 3x to be extra safe, and because the cost isn't |
| // huge on the read side. |
| log_file_header_builder.add_max_out_of_order_duration( |
| std::chrono::duration_cast<std::chrono::nanoseconds>(3 * polling_period_) |
| .count()); |
| |
| log_file_header_builder.add_monotonic_start_time( |
| std::chrono::duration_cast<std::chrono::nanoseconds>( |
| monotonic_start_time_.time_since_epoch()) |
| .count()); |
| log_file_header_builder.add_realtime_start_time( |
| std::chrono::duration_cast<std::chrono::nanoseconds>( |
| realtime_start_time_.time_since_epoch()) |
| .count()); |
| |
| fbb.FinishSizePrefixed(log_file_header_builder.Finish()); |
| log_namer_->WriteHeader(&fbb, node); |
| } |
| |
| void Logger::Rotate(DetachedBufferWriter *writer) { |
| Rotate(std::make_unique<LocalLogNamer>(writer, event_loop_->node())); |
| } |
| |
| void Logger::Rotate(std::unique_ptr<LogNamer> log_namer) { |
| // Force data up until now to be written. |
| DoLogData(); |
| |
| // Swap the writer out, and re-write the header. |
| log_namer_ = std::move(log_namer); |
| |
| // And then update the writers. |
| for (FetcherStruct &f : fetchers_) { |
| const Channel *channel = |
| event_loop_->configuration()->channels()->Get(f.channel_index); |
| if (f.timestamp_writer != nullptr) { |
| f.timestamp_writer = log_namer_->MakeTimestampWriter(channel); |
| } |
| if (f.writer != nullptr) { |
| f.writer = log_namer_->MakeWriter(channel); |
| } |
| } |
| |
| WriteHeader(); |
| } |
| |
| void Logger::DoLogData() { |
| // We want to guarentee that messages aren't out of order by more than |
| // max_out_of_order_duration. To do this, we need sync points. Every write |
| // cycle should be a sync point. |
| const monotonic_clock::time_point monotonic_now = |
| event_loop_->monotonic_now(); |
| |
| do { |
| // Move the sync point up by at most polling_period. This forces one sync |
| // per iteration, even if it is small. |
| last_synchronized_time_ = |
| std::min(last_synchronized_time_ + polling_period_, monotonic_now); |
| // Write each channel to disk, one at a time. |
| for (FetcherStruct &f : fetchers_) { |
| while (true) { |
| if (f.written) { |
| if (!f.fetcher->FetchNext()) { |
| VLOG(2) << "No new data on " |
| << configuration::CleanedChannelToString( |
| f.fetcher->channel()); |
| break; |
| } else { |
| f.written = false; |
| } |
| } |
| |
| CHECK(!f.written); |
| |
| // TODO(james): Write tests to exercise this logic. |
| if (f.fetcher->context().monotonic_event_time < |
| last_synchronized_time_) { |
| if (f.writer != nullptr) { |
| // Write! |
| flatbuffers::FlatBufferBuilder fbb(f.fetcher->context().size + |
| max_header_size_); |
| fbb.ForceDefaults(true); |
| |
| fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(), |
| f.channel_index, f.log_type)); |
| |
| VLOG(2) << "Writing data as node " |
| << FlatbufferToJson(event_loop_->node()) << " for channel " |
| << configuration::CleanedChannelToString( |
| f.fetcher->channel()) |
| << " to " << f.writer->filename() << " data " |
| << FlatbufferToJson( |
| flatbuffers::GetSizePrefixedRoot<MessageHeader>( |
| fbb.GetBufferPointer())); |
| |
| max_header_size_ = std::max( |
| max_header_size_, fbb.GetSize() - f.fetcher->context().size); |
| f.writer->QueueSizedFlatbuffer(&fbb); |
| } |
| |
| if (f.timestamp_writer != nullptr) { |
| // And now handle timestamps. |
| flatbuffers::FlatBufferBuilder fbb; |
| fbb.ForceDefaults(true); |
| |
| fbb.FinishSizePrefixed(PackMessage(&fbb, f.fetcher->context(), |
| f.channel_index, |
| LogType::kLogDeliveryTimeOnly)); |
| |
| VLOG(2) << "Writing timestamps as node " |
| << FlatbufferToJson(event_loop_->node()) << " for channel " |
| << configuration::CleanedChannelToString( |
| f.fetcher->channel()) |
| << " to " << f.timestamp_writer->filename() << " timestamp " |
| << FlatbufferToJson( |
| flatbuffers::GetSizePrefixedRoot<MessageHeader>( |
| fbb.GetBufferPointer())); |
| |
| f.timestamp_writer->QueueSizedFlatbuffer(&fbb); |
| } |
| |
| f.written = true; |
| } else { |
| break; |
| } |
| } |
| } |
| |
| // If we missed cycles, we could be pretty far behind. Spin until we are |
| // caught up. |
| } while (last_synchronized_time_ + polling_period_ < monotonic_now); |
| } |
| |
| LogReader::LogReader(std::string_view filename, |
| const Configuration *replay_configuration) |
| : LogReader(std::vector<std::string>{std::string(filename)}, |
| replay_configuration) {} |
| |
| LogReader::LogReader(const std::vector<std::string> &filenames, |
| const Configuration *replay_configuration) |
| : LogReader(std::vector<std::vector<std::string>>{filenames}, |
| replay_configuration) {} |
| |
| LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames, |
| const Configuration *replay_configuration) |
| : filenames_(filenames), |
| log_file_header_(ReadHeader(filenames[0][0])), |
| replay_configuration_(replay_configuration) { |
| MakeRemappedConfig(); |
| |
| if (replay_configuration) { |
| CHECK_EQ(configuration::MultiNode(configuration()), |
| configuration::MultiNode(replay_configuration)) |
| << ": Log file and replay config need to both be multi or single node."; |
| } |
| |
| if (!configuration::MultiNode(configuration())) { |
| states_.emplace_back(std::make_unique<State>()); |
| State *state = states_[0].get(); |
| |
| state->channel_merger = std::make_unique<ChannelMerger>(filenames); |
| } else { |
| if (replay_configuration) { |
| CHECK_EQ(configuration()->nodes()->size(), |
| replay_configuration->nodes()->size()) |
| << ": Log file and replay config need to have matching nodes lists."; |
| } |
| states_.resize(configuration()->nodes()->size()); |
| } |
| } |
| |
| LogReader::~LogReader() { |
| Deregister(); |
| if (offset_fp_ != nullptr) { |
| fclose(offset_fp_); |
| } |
| } |
| |
| const Configuration *LogReader::logged_configuration() const { |
| return log_file_header_.message().configuration(); |
| } |
| |
| const Configuration *LogReader::configuration() const { |
| return remapped_configuration_; |
| } |
| |
| std::vector<const Node *> LogReader::Nodes() const { |
| // Because the Node pointer will only be valid if it actually points to memory |
| // owned by remapped_configuration_, we need to wait for the |
| // remapped_configuration_ to be populated before accessing it. |
| // |
| // Also, note, that when ever a map is changed, the nodes in here are |
| // invalidated. |
| CHECK(remapped_configuration_ != nullptr) |
| << ": Need to call Register before the node() pointer will be valid."; |
| return configuration::GetNodes(remapped_configuration_); |
| } |
| |
| monotonic_clock::time_point LogReader::monotonic_start_time(const Node *node) { |
| State *state = |
| states_[configuration::GetNodeIndex(configuration(), node)].get(); |
| CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node); |
| |
| return state->channel_merger->monotonic_start_time(); |
| } |
| |
| realtime_clock::time_point LogReader::realtime_start_time(const Node *node) { |
| State *state = |
| states_[configuration::GetNodeIndex(configuration(), node)].get(); |
| CHECK(state != nullptr) << ": Unknown node " << FlatbufferToJson(node); |
| |
| return state->channel_merger->realtime_start_time(); |
| } |
| |
| void LogReader::Register() { |
| event_loop_factory_unique_ptr_ = |
| std::make_unique<SimulatedEventLoopFactory>(configuration()); |
| Register(event_loop_factory_unique_ptr_.get()); |
| } |
| |
| void LogReader::Register(SimulatedEventLoopFactory *event_loop_factory) { |
| event_loop_factory_ = event_loop_factory; |
| |
| for (const Node *node : configuration::GetNodes(configuration())) { |
| const size_t node_index = |
| configuration::GetNodeIndex(configuration(), node); |
| states_[node_index] = std::make_unique<State>(); |
| State *state = states_[node_index].get(); |
| |
| state->channel_merger = std::make_unique<ChannelMerger>(filenames_); |
| |
| state->node_event_loop_factory = |
| event_loop_factory_->GetNodeEventLoopFactory(node); |
| state->event_loop_unique_ptr = |
| event_loop_factory->MakeEventLoop("log_reader", node); |
| |
| Register(state->event_loop_unique_ptr.get()); |
| } |
| |
| // We need to now seed our per-node time offsets and get everything set up to |
| // run. |
| const size_t num_nodes = !configuration::MultiNode(logged_configuration()) |
| ? 1u |
| : logged_configuration()->nodes()->size(); |
| |
| // It is easiest to solve for per node offsets with a matrix rather than |
| // trying to solve the equations by hand. So let's get after it. |
| // |
| // Now, build up the map matrix. |
| // |
| // sample_matrix_ = map_matrix_ * offset_matrix_ |
| map_matrix_ = Eigen::MatrixXd::Zero(filters_.size() + 1, num_nodes); |
| |
| sample_matrix_ = Eigen::VectorXd::Zero(filters_.size() + 1); |
| offset_matrix_ = Eigen::VectorXd::Zero(num_nodes); |
| |
| // And the base offset matrix, which will be a copy of the initial offset |
| // matrix. |
| base_offset_matrix_ = |
| Eigen::Matrix<std::chrono::nanoseconds, Eigen::Dynamic, 1>::Zero( |
| num_nodes); |
| |
| // All offsets should sum to 0. Add that as the first constraint in our least |
| // squares. |
| map_matrix_.row(0).setOnes(); |
| |
| { |
| // Now, add the a - b -> sample elements. |
| size_t i = 1; |
| for (std::pair<const std::tuple<const Node *, const Node *>, |
| message_bridge::ClippedAverageFilter> &filter : filters_) { |
| const Node *const node_a = std::get<0>(filter.first); |
| const Node *const node_b = std::get<1>(filter.first); |
| |
| const size_t node_a_index = |
| configuration::GetNodeIndex(configuration(), node_a); |
| const size_t node_b_index = |
| configuration::GetNodeIndex(configuration(), node_b); |
| |
| // +a |
| map_matrix_(i, node_a_index) = 1.0; |
| // -b |
| map_matrix_(i, node_b_index) = -1.0; |
| |
| // -> sample |
| filter.second.set_sample_pointer(&sample_matrix_(i, 0)); |
| |
| ++i; |
| } |
| } |
| |
| // Rank of the map matrix tells you if all the nodes are in communication with |
| // each other, which tells you if the offsets are observable. |
| const size_t connected_nodes = |
| Eigen::FullPivLU<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic>>( |
| map_matrix_) |
| .rank(); |
| |
| // We don't need to support isolated nodes until someone has a real use case. |
| CHECK_EQ(connected_nodes, num_nodes) |
| << ": There is a node which isn't communicating with the rest."; |
| |
| // Now, iterate through all the timestamps from all the nodes and seed |
| // everything. |
| for (std::unique_ptr<State> &state : states_) { |
| for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) { |
| TimestampMerger::DeliveryTimestamp timestamp = |
| state->channel_merger->OldestTimestampForChannel(i); |
| if (timestamp.monotonic_event_time != monotonic_clock::min_time) { |
| CHECK(state->MaybeUpdateTimestamp(timestamp, i)); |
| } |
| } |
| } |
| |
| // Make sure all the samples have been seeded. |
| for (int i = 1; i < sample_matrix_.cols(); ++i) { |
| // The seeding logic is pretty basic right now because we don't have great |
| // use cases yet. It wants to see data from every node. Blow up for now, |
| // and once we have a reason to do something different, update this logic. |
| // Maybe read further in the log file? Or seed off the realtime time? |
| CHECK_NE(sample_matrix_(i, 0), 0.0) |
| << ": Sample " << i << " is not seeded."; |
| } |
| |
| // And solve. |
| offset_matrix_ = SolveOffsets(); |
| |
| // Save off the base offsets so we can work in deltas from here out. That |
| // will significantly simplify the numerical precision problems. |
| for (size_t i = 0; i < num_nodes; ++i) { |
| base_offset_matrix_(i, 0) = |
| std::chrono::duration_cast<std::chrono::nanoseconds>( |
| std::chrono::duration<double>(offset_matrix_(i, 0))); |
| } |
| |
| { |
| // Shift everything so we never could (reasonably) require the distributed |
| // clock to have a large backwards jump in time. This makes it so the boot |
| // time on the node up the longest will essentially start matching the |
| // distributed clock. |
| const chrono::nanoseconds offset = -base_offset_matrix_.maxCoeff(); |
| for (int i = 0; i < base_offset_matrix_.rows(); ++i) { |
| base_offset_matrix_(i, 0) += offset; |
| } |
| } |
| |
| { |
| // Re-compute the samples and setup all the filters so that they |
| // subtract this base offset. |
| |
| size_t i = 1; |
| for (std::pair<const std::tuple<const Node *, const Node *>, |
| message_bridge::ClippedAverageFilter> &filter : filters_) { |
| CHECK(filter.second.sample_pointer() == &sample_matrix_(i, 0)); |
| |
| const Node *const node_a = std::get<0>(filter.first); |
| const Node *const node_b = std::get<1>(filter.first); |
| |
| const size_t node_a_index = |
| configuration::GetNodeIndex(configuration(), node_a); |
| const size_t node_b_index = |
| configuration::GetNodeIndex(configuration(), node_b); |
| |
| filter.second.set_base_offset(base_offset_matrix_(node_a_index) - |
| base_offset_matrix_(node_b_index)); |
| |
| ++i; |
| } |
| } |
| |
| // Now, iterate again through all the offsets now that we have set the base |
| // offset to something sane. This will seed everything with an accurate |
| // initial offset. |
| for (std::unique_ptr<State> &state : states_) { |
| for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) { |
| TimestampMerger::DeliveryTimestamp timestamp = |
| state->channel_merger->OldestTimestampForChannel(i); |
| if (timestamp.monotonic_event_time != monotonic_clock::min_time) { |
| CHECK(state->MaybeUpdateTimestamp(timestamp, i)); |
| } |
| } |
| } |
| |
| UpdateOffsets(); |
| |
| // We want to start the log file at the last start time of the log files from |
| // all the nodes. Compute how long each node's simulation needs to run to |
| // move time to this point. |
| distributed_clock::time_point start_time = distributed_clock::min_time; |
| |
| for (std::unique_ptr<State> &state : states_) { |
| // Setup the realtime clock to have something sane in it now. |
| state->node_event_loop_factory->SetRealtimeOffset( |
| state->channel_merger->monotonic_start_time(), |
| state->channel_merger->realtime_start_time()); |
| // And start computing the start time on the distributed clock now that that |
| // works. |
| start_time = std::max(start_time, |
| state->node_event_loop_factory->ToDistributedClock( |
| state->channel_merger->monotonic_start_time())); |
| } |
| CHECK_GE(start_time, distributed_clock::epoch()); |
| |
| // Forwarding is tracked per channel. If it is enabled, we want to turn it |
| // off. Otherwise messages replayed will get forwarded across to the other |
| // nodes, and also replayed on the other nodes. This may not satisfy all our |
| // users, but it'll start the discussion. |
| if (configuration::MultiNode(event_loop_factory_->configuration())) { |
| for (size_t i = 0; i < logged_configuration()->channels()->size(); ++i) { |
| const Channel *channel = logged_configuration()->channels()->Get(i); |
| const Node *node = configuration::GetNode( |
| configuration(), channel->source_node()->string_view()); |
| |
| State *state = |
| states_[configuration::GetNodeIndex(configuration(), node)].get(); |
| |
| const Channel *remapped_channel = |
| RemapChannel(state->event_loop, channel); |
| |
| event_loop_factory_->DisableForwarding(remapped_channel); |
| } |
| } |
| |
| // While we are starting the system up, we might be relying on matching data |
| // to timestamps on log files where the timestamp log file starts before the |
| // data. In this case, it is reasonable to expect missing data. |
| ignore_missing_data_ = true; |
| event_loop_factory_->RunFor(start_time.time_since_epoch()); |
| // Now that we are running for real, missing data means that the log file is |
| // corrupted or went wrong. |
| ignore_missing_data_ = false; |
| } |
| |
| void LogReader::UpdateOffsets() { |
| // TODO(austin): Evaluate less accurate inverses. We might be able to |
| // do some tricks to keep the accuracy up. |
| offset_matrix_ = SolveOffsets(); |
| |
| size_t node_index = 0; |
| for (std::unique_ptr<State> &state : states_) { |
| state->node_event_loop_factory->SetDistributedOffset(offset(node_index)); |
| ++node_index; |
| } |
| } |
| |
| std::tuple<message_bridge::ClippedAverageFilter *, bool> LogReader::GetFilter( |
| const Node *node_a, const Node *node_b) { |
| CHECK_NE(node_a, node_b); |
| CHECK_EQ(configuration::GetNode(configuration(), node_a), node_a); |
| CHECK_EQ(configuration::GetNode(configuration(), node_b), node_b); |
| |
| if (node_a > node_b) { |
| return std::make_pair(std::get<0>(GetFilter(node_b, node_a)), false); |
| } |
| |
| auto tuple = std::make_tuple(node_a, node_b); |
| |
| auto it = filters_.find(tuple); |
| |
| if (it == filters_.end()) { |
| auto &x = filters_ |
| .insert(std::make_pair( |
| tuple, message_bridge::ClippedAverageFilter())) |
| .first->second; |
| if (FLAGS_timestamps_to_csv) { |
| std::string fwd_name = |
| absl::StrCat("/tmp/timestamp_", node_a->name()->string_view(), "_", |
| node_b->name()->string_view(), ".csv"); |
| x.fwd_fp = fopen(fwd_name.c_str(), "w"); |
| std::string rev_name = |
| absl::StrCat("/tmp/timestamp_", node_b->name()->string_view(), "_", |
| node_a->name()->string_view(), ".csv"); |
| x.rev_fp = fopen(rev_name.c_str(), "w"); |
| } |
| |
| return std::make_tuple(&x, true); |
| } else { |
| return std::make_tuple(&(it->second), true); |
| } |
| } |
| |
| bool LogReader::State::MaybeUpdateTimestamp( |
| const TimestampMerger::DeliveryTimestamp &channel_timestamp, |
| int channel_index) { |
| if (channel_timestamp.monotonic_remote_time == monotonic_clock::min_time) { |
| return false; |
| } |
| |
| // Got a forwarding timestamp! |
| CHECK(std::get<0>(filters[channel_index]) != nullptr); |
| |
| // Call the correct method depending on if we are the forward or reverse |
| // direction here. |
| if (std::get<1>(filters[channel_index])) { |
| std::get<0>(filters[channel_index]) |
| ->FwdSample(channel_timestamp.monotonic_event_time, |
| channel_timestamp.monotonic_event_time - |
| channel_timestamp.monotonic_remote_time); |
| } else { |
| std::get<0>(filters[channel_index]) |
| ->RevSample(channel_timestamp.monotonic_event_time, |
| channel_timestamp.monotonic_event_time - |
| channel_timestamp.monotonic_remote_time); |
| } |
| return true; |
| } |
| |
| void LogReader::Register(EventLoop *event_loop) { |
| State *state = |
| states_[configuration::GetNodeIndex(configuration(), event_loop->node())] |
| .get(); |
| |
| state->event_loop = event_loop; |
| |
| // We don't run timing reports when trying to print out logged data, because |
| // otherwise we would end up printing out the timing reports themselves... |
| // This is only really relevant when we are replaying into a simulation. |
| event_loop->SkipTimingReport(); |
| event_loop->SkipAosLog(); |
| |
| const bool has_data = state->channel_merger->SetNode(event_loop->node()); |
| |
| state->channels.resize(logged_configuration()->channels()->size()); |
| state->filters.resize(state->channels.size()); |
| |
| state->channel_target_event_loop_factory.resize(state->channels.size()); |
| |
| for (size_t i = 0; i < state->channels.size(); ++i) { |
| const Channel *channel = |
| RemapChannel(event_loop, logged_configuration()->channels()->Get(i)); |
| |
| state->channels[i] = event_loop->MakeRawSender(channel); |
| |
| state->filters[i] = std::make_tuple(nullptr, false); |
| |
| if (!configuration::ChannelIsSendableOnNode(channel, event_loop->node()) && |
| configuration::ChannelIsReadableOnNode(channel, event_loop->node())) { |
| const Node *target_node = configuration::GetNode( |
| event_loop->configuration(), channel->source_node()->string_view()); |
| state->filters[i] = GetFilter(event_loop->node(), target_node); |
| |
| if (event_loop_factory_ != nullptr) { |
| state->channel_target_event_loop_factory[i] = |
| event_loop_factory_->GetNodeEventLoopFactory(target_node); |
| } |
| } |
| } |
| |
| // If we didn't find any log files with data in them, we won't ever get a |
| // callback or be live. So skip the rest of the setup. |
| if (!has_data) { |
| return; |
| } |
| |
| state->timer_handler = event_loop->AddTimer([this, state]() { |
| if (state->channel_merger->OldestMessage() == monotonic_clock::max_time) { |
| --live_nodes_; |
| VLOG(1) << "Node down!"; |
| if (live_nodes_ == 0) { |
| event_loop_factory_->Exit(); |
| } |
| return; |
| } |
| bool update_offsets = false; |
| TimestampMerger::DeliveryTimestamp channel_timestamp; |
| int channel_index; |
| FlatbufferVector<MessageHeader> channel_data = |
| FlatbufferVector<MessageHeader>::Empty(); |
| |
| std::tie(channel_timestamp, channel_index, channel_data) = |
| state->channel_merger->PopOldest(); |
| |
| const monotonic_clock::time_point monotonic_now = |
| state->event_loop->context().monotonic_event_time; |
| CHECK(monotonic_now == channel_timestamp.monotonic_event_time) |
| << ": " << FlatbufferToJson(state->event_loop->node()) << " Now " |
| << monotonic_now << " trying to send " |
| << channel_timestamp.monotonic_event_time << " failure " |
| << state->channel_merger->DebugString(); |
| |
| if (channel_timestamp.monotonic_event_time > |
| state->channel_merger->monotonic_start_time() || |
| event_loop_factory_ != nullptr) { |
| if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries && |
| !state->channel_merger->at_end()) || |
| channel_data.message().data() != nullptr) { |
| CHECK(channel_data.message().data() != nullptr) |
| << ": Got a message without data. Forwarding entry which was " |
| "not matched? Use --skip_missing_forwarding_entries to ignore " |
| "this."; |
| |
| if (state->MaybeUpdateTimestamp(channel_timestamp, channel_index)) { |
| // Confirm that the message was sent on the sending node before the |
| // destination node (this node). As a proxy, do this by making sure |
| // that time on the source node is past when the message was sent. |
| CHECK_LT(channel_timestamp.monotonic_remote_time, |
| state->channel_target_event_loop_factory[channel_index] |
| ->monotonic_now()); |
| |
| update_offsets = true; |
| |
| if (FLAGS_timestamps_to_csv) { |
| if (offset_fp_ == nullptr) { |
| offset_fp_ = fopen("/tmp/offsets.csv", "w"); |
| fprintf( |
| offset_fp_, |
| "# time_since_start, offset node 0, offset node 1, ...\n"); |
| first_time_ = channel_timestamp.realtime_event_time; |
| } |
| |
| fprintf(offset_fp_, "%.9f", |
| std::chrono::duration_cast<std::chrono::duration<double>>( |
| channel_timestamp.realtime_event_time - first_time_) |
| .count()); |
| for (int i = 0; i < base_offset_matrix_.rows(); ++i) { |
| fprintf( |
| offset_fp_, ", %.9f", |
| offset_matrix_(i, 0) + |
| std::chrono::duration_cast<std::chrono::duration<double>>( |
| base_offset_matrix_(i, 0)) |
| .count()); |
| } |
| fprintf(offset_fp_, "\n"); |
| } |
| |
| } else { |
| CHECK(std::get<0>(state->filters[channel_index]) == nullptr); |
| } |
| |
| // If we have access to the factory, use it to fix the realtime time. |
| if (state->node_event_loop_factory != nullptr) { |
| state->node_event_loop_factory->SetRealtimeOffset( |
| channel_timestamp.monotonic_event_time, |
| channel_timestamp.realtime_event_time); |
| } |
| |
| state->channels[channel_index]->Send( |
| channel_data.message().data()->Data(), |
| channel_data.message().data()->size(), |
| channel_timestamp.monotonic_remote_time, |
| channel_timestamp.realtime_remote_time, |
| channel_timestamp.remote_queue_index); |
| } else if (state->channel_merger->at_end()) { |
| // We are at the end of the log file and found missing data. Finish |
| // reading the rest of the log file and call it quits. We don't want to |
| // replay partial data. |
| while (state->channel_merger->OldestMessage() != |
| monotonic_clock::max_time) { |
| state->channel_merger->PopOldest(); |
| } |
| } |
| |
| } else { |
| LOG(WARNING) |
| << "Not sending data from before the start of the log file. " |
| << channel_timestamp.monotonic_event_time.time_since_epoch().count() |
| << " start " << monotonic_start_time().time_since_epoch().count() |
| << " " << FlatbufferToJson(channel_data); |
| } |
| |
| const monotonic_clock::time_point next_time = |
| state->channel_merger->OldestMessage(); |
| if (next_time != monotonic_clock::max_time) { |
| state->timer_handler->Setup(next_time); |
| } else { |
| // Set a timer up immediately after now to die. If we don't do this, then |
| // the senders waiting on the message we just read will never get called. |
| if (event_loop_factory_ != nullptr) { |
| state->timer_handler->Setup(monotonic_now + |
| event_loop_factory_->send_delay() + |
| std::chrono::nanoseconds(1)); |
| } |
| } |
| |
| // Once we make this call, the current time changes. So do everything which |
| // involves time before changing it. That especially includes sending the |
| // message. |
| if (update_offsets) { |
| UpdateOffsets(); |
| } |
| }); |
| |
| ++live_nodes_; |
| |
| if (state->channel_merger->OldestMessage() != monotonic_clock::max_time) { |
| event_loop->OnRun([state]() { |
| state->timer_handler->Setup(state->channel_merger->OldestMessage()); |
| }); |
| } |
| } |
| |
| void LogReader::Deregister() { |
| // Make sure that things get destroyed in the correct order, rather than |
| // relying on getting the order correct in the class definition. |
| for (std::unique_ptr<State> &state : states_) { |
| for (size_t i = 0; i < state->channels.size(); ++i) { |
| state->channels[i].reset(); |
| } |
| state->event_loop_unique_ptr.reset(); |
| state->event_loop = nullptr; |
| state->node_event_loop_factory = nullptr; |
| } |
| |
| event_loop_factory_unique_ptr_.reset(); |
| event_loop_factory_ = nullptr; |
| } |
| |
| void LogReader::RemapLoggedChannel(std::string_view name, std::string_view type, |
| std::string_view add_prefix) { |
| for (size_t ii = 0; ii < logged_configuration()->channels()->size(); ++ii) { |
| const Channel *const channel = logged_configuration()->channels()->Get(ii); |
| if (channel->name()->str() == name && |
| channel->type()->string_view() == type) { |
| CHECK_EQ(0u, remapped_channels_.count(ii)) |
| << "Already remapped channel " |
| << configuration::CleanedChannelToString(channel); |
| remapped_channels_[ii] = std::string(add_prefix) + std::string(name); |
| VLOG(1) << "Remapping channel " |
| << configuration::CleanedChannelToString(channel) |
| << " to have name " << remapped_channels_[ii]; |
| MakeRemappedConfig(); |
| return; |
| } |
| } |
| LOG(FATAL) << "Unabled to locate channel with name " << name << " and type " |
| << type; |
| } |
| |
| void LogReader::MakeRemappedConfig() { |
| for (std::unique_ptr<State> &state : states_) { |
| if (state) { |
| CHECK(!state->event_loop) |
| << ": Can't change the mapping after the events are scheduled."; |
| } |
| } |
| |
| // If no remapping occurred and we are using the original config, then there |
| // is nothing interesting to do here. |
| if (remapped_channels_.empty() && replay_configuration_ == nullptr) { |
| remapped_configuration_ = logged_configuration(); |
| return; |
| } |
| // Config to copy Channel definitions from. Use the specified |
| // replay_configuration_ if it has been provided. |
| const Configuration *const base_config = replay_configuration_ == nullptr |
| ? logged_configuration() |
| : replay_configuration_; |
| // The remapped config will be identical to the base_config, except that it |
| // will have a bunch of extra channels in the channel list, which are exact |
| // copies of the remapped channels, but with different names. |
| // Because the flatbuffers API is a pain to work with, this requires a bit of |
| // a song-and-dance to get copied over. |
| // The order of operations is to: |
| // 1) Make a flatbuffer builder for a config that will just contain a list of |
| // the new channels that we want to add. |
| // 2) For each channel that we are remapping: |
| // a) Make a buffer/builder and construct into it a Channel table that only |
| // contains the new name for the channel. |
| // b) Merge the new channel with just the name into the channel that we are |
| // trying to copy, built in the flatbuffer builder made in 1. This gives |
| // us the new channel definition that we need. |
| // 3) Using this list of offsets, build the Configuration of just new |
| // Channels. |
| // 4) Merge the Configuration with the new Channels into the base_config. |
| // 5) Call MergeConfiguration() on that result to give MergeConfiguration a |
| // chance to sanitize the config. |
| |
| // This is the builder that we use for the config containing all the new |
| // channels. |
| flatbuffers::FlatBufferBuilder new_config_fbb; |
| new_config_fbb.ForceDefaults(true); |
| std::vector<flatbuffers::Offset<Channel>> channel_offsets; |
| for (auto &pair : remapped_channels_) { |
| // This is the builder that we use for creating the Channel with just the |
| // new name. |
| flatbuffers::FlatBufferBuilder new_name_fbb; |
| new_name_fbb.ForceDefaults(true); |
| const flatbuffers::Offset<flatbuffers::String> name_offset = |
| new_name_fbb.CreateString(pair.second); |
| ChannelBuilder new_name_builder(new_name_fbb); |
| new_name_builder.add_name(name_offset); |
| new_name_fbb.Finish(new_name_builder.Finish()); |
| const FlatbufferDetachedBuffer<Channel> new_name = new_name_fbb.Release(); |
| // Retrieve the channel that we want to copy, confirming that it is actually |
| // present in base_config. |
| const Channel *const base_channel = CHECK_NOTNULL(configuration::GetChannel( |
| base_config, logged_configuration()->channels()->Get(pair.first), "", |
| nullptr)); |
| // Actually create the new channel and put it into the vector of Offsets |
| // that we will use to create the new Configuration. |
| channel_offsets.emplace_back(MergeFlatBuffers<Channel>( |
| reinterpret_cast<const flatbuffers::Table *>(base_channel), |
| reinterpret_cast<const flatbuffers::Table *>(&new_name.message()), |
| &new_config_fbb)); |
| } |
| // Create the Configuration containing the new channels that we want to add. |
| const auto new_name_vector_offsets = |
| new_config_fbb.CreateVector(channel_offsets); |
| ConfigurationBuilder new_config_builder(new_config_fbb); |
| new_config_builder.add_channels(new_name_vector_offsets); |
| new_config_fbb.Finish(new_config_builder.Finish()); |
| const FlatbufferDetachedBuffer<Configuration> new_name_config = |
| new_config_fbb.Release(); |
| // Merge the new channels configuration into the base_config, giving us the |
| // remapped configuration. |
| remapped_configuration_buffer_ = |
| std::make_unique<FlatbufferDetachedBuffer<Configuration>>( |
| MergeFlatBuffers<Configuration>(base_config, |
| &new_name_config.message())); |
| // Call MergeConfiguration to deal with sanitizing the config. |
| remapped_configuration_buffer_ = |
| std::make_unique<FlatbufferDetachedBuffer<Configuration>>( |
| configuration::MergeConfiguration(*remapped_configuration_buffer_)); |
| |
| remapped_configuration_ = &remapped_configuration_buffer_->message(); |
| } |
| |
| const Channel *LogReader::RemapChannel(const EventLoop *event_loop, |
| const Channel *channel) { |
| std::string_view channel_name = channel->name()->string_view(); |
| std::string_view channel_type = channel->type()->string_view(); |
| const int channel_index = |
| configuration::ChannelIndex(logged_configuration(), channel); |
| // If the channel is remapped, find the correct channel name to use. |
| if (remapped_channels_.count(channel_index) > 0) { |
| VLOG(2) << "Got remapped channel on " |
| << configuration::CleanedChannelToString(channel); |
| channel_name = remapped_channels_[channel_index]; |
| } |
| |
| VLOG(1) << "Going to remap channel " << channel_name << " " << channel_type; |
| const Channel *remapped_channel = configuration::GetChannel( |
| event_loop->configuration(), channel_name, channel_type, |
| event_loop->name(), event_loop->node()); |
| |
| CHECK(remapped_channel != nullptr) |
| << ": Unable to send {\"name\": \"" << channel_name << "\", \"type\": \"" |
| << channel_type << "\"} because it is not in the provided configuration."; |
| |
| return remapped_channel; |
| } |
| |
| } // namespace logger |
| } // namespace aos |