Switch LogReader to the new API, and remove the old
Now that we have a fancy new log sorter, lets flip over to using it.
This doesn't simplify anything yet on the log reader side to take
advantage of the simpler code.
While we are here, the new API really wants a LogFiles object instead of
vectors of strings. Convert any calls over to the new LogFiles API.
The log file was updated to add UUIDs. They are required with
multi-node log files, and it doesn't seem worth changing that
requirement for this old log.
Change-Id: I84bd63c7339ec43ed01c106131153e1cb6d213bb
diff --git a/WORKSPACE b/WORKSPACE
index a8c8a19..464d2e2 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -652,8 +652,8 @@
http_file(
name = "drivetrain_replay",
downloaded_file_path = "spinning_wheels_while_still.bfbs",
- sha256 = "d724dbf0acae894b30c9bb62006a2633a7f4c478db48548e76cbec03cbb07f46",
- urls = ["https://www.frc971.org/Build-Dependencies/spinning_wheels_while_still3.bfbs"],
+ sha256 = "8abe3bbf7ac7a3ab37ad8a313ec22fc244899d916f5e9037100b02e242f5fb45",
+ urls = ["https://www.frc971.org/Build-Dependencies/spinning_wheels_while_still4.bfbs"],
)
# OpenCV armhf (for raspberry pi)
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index eac8d08..f901056 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -61,7 +61,7 @@
std::vector<ChannelStats> channel_stats;
// Open LogFile
- aos::logger::LogReader reader(FLAGS_logfile);
+ aos::logger::LogReader reader(aos::logger::SortParts({FLAGS_logfile}));
aos::SimulatedEventLoopFactory log_reader_factory(reader.configuration());
reader.Register(&log_reader_factory);
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index ec79f17..9316795 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -664,7 +664,6 @@
TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
: node_merger_(std::move(parts)),
- node_(node_merger_.node()),
message_{.channel_index = 0xffffffff,
.queue_index = 0xffffffff,
.monotonic_event_time = monotonic_clock::min_time,
@@ -962,1175 +961,6 @@
return ss.str();
}
-SplitMessageReader::SplitMessageReader(
- const std::vector<std::string> &filenames)
- : filenames_(filenames),
- log_file_header_(SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
- CHECK(NextLogFile()) << ": filenames is empty. Need files to read.";
-
- // Grab any log file header. They should all match (and we will check as we
- // open more of them).
- log_file_header_ = message_reader_->raw_log_file_header();
-
- for (size_t i = 1; i < filenames_.size(); ++i) {
- MessageReader message_reader(filenames_[i]);
-
- const monotonic_clock::time_point new_monotonic_start_time(
- chrono::nanoseconds(
- message_reader.log_file_header()->monotonic_start_time()));
- const realtime_clock::time_point new_realtime_start_time(
- chrono::nanoseconds(
- message_reader.log_file_header()->realtime_start_time()));
-
- // There are 2 types of part files. Part files from before time estimation
- // has started, and part files after. We don't declare a log file "started"
- // until time estimation is up. And once a log file starts, it should never
- // stop again, and should remain constant.
- // To compare both types of headers, we mutate our saved copy of the header
- // to match the next chunk by updating time if we detect a stopped ->
- // started transition.
- if (monotonic_start_time() == monotonic_clock::min_time) {
- CHECK_EQ(realtime_start_time(), realtime_clock::min_time);
- // We should only be missing the monotonic start time when logging data
- // for remote nodes. We don't have a good way to determine the remote
- // realtime offset, so it shouldn't be filled out.
- // TODO(austin): If we have a good way, feel free to fill it out. It
- // probably won't be better than we could do in post though with the same
- // data.
- CHECK(!log_file_header_.mutable_message()->has_realtime_start_time());
- if (new_monotonic_start_time != monotonic_clock::min_time) {
- // If we finally found our start time, update the header. Do this once
- // because it should never change again.
- log_file_header_.mutable_message()->mutate_monotonic_start_time(
- new_monotonic_start_time.time_since_epoch().count());
- log_file_header_.mutable_message()->mutate_realtime_start_time(
- new_realtime_start_time.time_since_epoch().count());
- }
- }
-
- // We don't have a good way to set the realtime start time on remote nodes.
- // Confirm it remains consistent.
- CHECK_EQ(log_file_header_.mutable_message()->has_realtime_start_time(),
- message_reader.log_file_header()->has_realtime_start_time());
-
- // Parts index will *not* match unless we set them to match. We only want
- // to accept the start time and parts mismatching, so set them.
- log_file_header_.mutable_message()->mutate_parts_index(
- message_reader.log_file_header()->parts_index());
-
- // Now compare that the headers match.
- if (!CompareFlatBuffer(message_reader.raw_log_file_header(),
- log_file_header_)) {
- if (message_reader.log_file_header()->has_log_event_uuid() &&
- log_file_header_.message().has_log_event_uuid() &&
- message_reader.log_file_header()->log_event_uuid()->string_view() !=
- log_file_header_.message().log_event_uuid()->string_view()) {
- LOG(FATAL) << "Logger UUIDs don't match between log file chunks "
- << filenames_[0] << " and " << filenames_[i]
- << ", this is not supported.";
- }
- if (message_reader.log_file_header()->has_parts_uuid() &&
- log_file_header_.message().has_parts_uuid() &&
- message_reader.log_file_header()->parts_uuid()->string_view() !=
- log_file_header_.message().parts_uuid()->string_view()) {
- LOG(FATAL) << "Parts UUIDs don't match between log file chunks "
- << filenames_[0] << " and " << filenames_[i]
- << ", this is not supported.";
- }
-
- LOG(FATAL) << "Header is different between log file chunks "
- << filenames_[0] << " and " << filenames_[i]
- << ", this is not supported.";
- }
- }
- // Put the parts index back to the first log file chunk.
- log_file_header_.mutable_message()->mutate_parts_index(
- message_reader_->log_file_header()->parts_index());
-
- // Setup per channel state.
- channels_.resize(configuration()->channels()->size());
- for (ChannelData &channel_data : channels_) {
- channel_data.data.split_reader = this;
- // Build up the timestamp list.
- if (configuration::MultiNode(configuration())) {
- channel_data.timestamps.resize(configuration()->nodes()->size());
- for (MessageHeaderQueue &queue : channel_data.timestamps) {
- queue.timestamps = true;
- queue.split_reader = this;
- }
- }
- }
-
- // Build up channels_to_write_ as an optimization to make it fast to figure
- // out which datastructure to place any new data from a channel on.
- for (const Channel *channel : *configuration()->channels()) {
- // This is the main case. We will only see data on this node.
- if (configuration::ChannelIsSendableOnNode(channel, node())) {
- channels_to_write_.emplace_back(
- &channels_[channels_to_write_.size()].data);
- } else
- // If we can't send, but can receive, we should be able to see
- // timestamps here.
- if (configuration::ChannelIsReadableOnNode(channel, node())) {
- channels_to_write_.emplace_back(
- &(channels_[channels_to_write_.size()]
- .timestamps[configuration::GetNodeIndex(configuration(),
- node())]));
- } else {
- channels_to_write_.emplace_back(nullptr);
- }
- }
-}
-
-bool SplitMessageReader::NextLogFile() {
- if (next_filename_index_ == filenames_.size()) {
- return false;
- }
- message_reader_ =
- std::make_unique<MessageReader>(filenames_[next_filename_index_]);
-
- // We can't support the config diverging between two log file headers. See if
- // they are the same.
- if (next_filename_index_ != 0) {
- // In order for the headers to identically compare, they need to have the
- // same parts_index. Rewrite the saved header with the new parts_index,
- // compare, and then restore.
- const int32_t original_parts_index =
- log_file_header_.message().parts_index();
- log_file_header_.mutable_message()->mutate_parts_index(
- message_reader_->log_file_header()->parts_index());
-
- CHECK(CompareFlatBuffer(message_reader_->raw_log_file_header(),
- log_file_header_))
- << ": Header is different between log file chunks "
- << filenames_[next_filename_index_] << " and "
- << filenames_[next_filename_index_ - 1] << ", this is not supported.";
-
- log_file_header_.mutable_message()->mutate_parts_index(
- original_parts_index);
- }
-
- ++next_filename_index_;
- return true;
-}
-
-bool SplitMessageReader::QueueMessages(
- monotonic_clock::time_point last_dequeued_time) {
- // TODO(austin): Once we are happy that everything works, read a 256kb chunk
- // to reduce the need to re-heap down below.
-
- // Special case no more data. Otherwise we blow up on the CHECK statement
- // confirming that we have enough data queued.
- if (at_end_) {
- return false;
- }
-
- // If this isn't the first time around, confirm that we had enough data queued
- // to follow the contract.
- if (time_to_queue_ != monotonic_clock::min_time) {
- CHECK_LE(last_dequeued_time,
- newest_timestamp() - max_out_of_order_duration())
- << " node " << FlatbufferToJson(node()) << " on " << this;
-
- // Bail if there is enough data already queued.
- if (last_dequeued_time < time_to_queue_) {
- VLOG(1) << MaybeNodeName(target_node_) << "All up to date on " << this
- << ", dequeued " << last_dequeued_time << " queue time "
- << time_to_queue_;
- return true;
- }
- } else {
- // Startup takes a special dance. We want to queue up until the start time,
- // but we then want to find the next message to read. The conservative
- // answer is to immediately trigger a second requeue to get things moving.
- time_to_queue_ = monotonic_start_time();
- CHECK_NE(time_to_queue_, monotonic_clock::min_time);
- QueueMessages(time_to_queue_);
- }
-
- // If we are asked to queue, queue for at least max_out_of_order_duration past
- // the last known time in the log file (ie the newest timestep read). As long
- // as we requeue exactly when time_to_queue_ is dequeued and go no further, we
- // are safe. And since we pop in order, that works.
- //
- // Special case the start of the log file. There should be at most 1 message
- // from each channel at the start of the log file. So always force the start
- // of the log file to just be read.
- time_to_queue_ = std::max(time_to_queue_, newest_timestamp());
- VLOG(1) << MaybeNodeName(target_node_) << "Queueing, going until "
- << time_to_queue_ << " " << filename();
-
- bool was_emplaced = false;
- while (true) {
- // Stop if we have enough.
- if (newest_timestamp() > time_to_queue_ + max_out_of_order_duration() &&
- was_emplaced) {
- VLOG(1) << MaybeNodeName(target_node_) << "Done queueing on " << this
- << ", queued to " << newest_timestamp() << " with requeue time "
- << time_to_queue_;
- return true;
- }
-
- if (std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
- message_reader_->ReadMessage()) {
- const MessageHeader &header = msg.value().message();
-
- const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
- chrono::nanoseconds(header.monotonic_sent_time()));
-
- if (VLOG_IS_ON(2)) {
- LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
- << filename() << " ttq: " << time_to_queue_ << " now "
- << newest_timestamp() << " start time "
- << monotonic_start_time() << " " << FlatbufferToJson(&header);
- } else if (VLOG_IS_ON(1)) {
- SizePrefixedFlatbufferVector<MessageHeader> copy = msg.value();
- copy.mutable_message()->clear_data();
- LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
- << filename() << " ttq: " << time_to_queue_ << " now "
- << newest_timestamp() << " start time "
- << monotonic_start_time() << " " << FlatbufferToJson(copy);
- }
-
- const int channel_index = header.channel_index();
- was_emplaced = channels_to_write_[channel_index]->emplace_back(
- std::move(msg.value()));
- if (was_emplaced) {
- newest_timestamp_ = std::max(newest_timestamp_, timestamp);
- }
- } else {
- if (!NextLogFile()) {
- VLOG(1) << MaybeNodeName(target_node_) << "No more files, last was "
- << filenames_.back();
- at_end_ = true;
- for (MessageHeaderQueue *queue : channels_to_write_) {
- if (queue == nullptr || queue->timestamp_merger == nullptr) {
- continue;
- }
- queue->timestamp_merger->NoticeAtEnd();
- }
- return false;
- }
- }
- }
-}
-
-void SplitMessageReader::SetTimestampMerger(TimestampMerger *timestamp_merger,
- int channel_index,
- const Node *target_node) {
- const Node *reinterpreted_target_node =
- configuration::GetNodeOrDie(configuration(), target_node);
- target_node_ = reinterpreted_target_node;
-
- const Channel *const channel =
- configuration()->channels()->Get(channel_index);
-
- VLOG(1) << " Configuring merger " << this << " for channel " << channel_index
- << " "
- << configuration::CleanedChannelToString(
- configuration()->channels()->Get(channel_index));
-
- MessageHeaderQueue *message_header_queue = nullptr;
-
- // Figure out if this log file is from our point of view, or the other node's
- // point of view.
- if (node() == reinterpreted_target_node) {
- VLOG(1) << " Replaying as logged node " << filename();
-
- if (configuration::ChannelIsSendableOnNode(channel, node())) {
- VLOG(1) << " Data on node";
- message_header_queue = &(channels_[channel_index].data);
- } else if (configuration::ChannelIsReadableOnNode(channel, node())) {
- VLOG(1) << " Timestamps on node";
- message_header_queue =
- &(channels_[channel_index].timestamps[configuration::GetNodeIndex(
- configuration(), node())]);
- } else {
- VLOG(1) << " Dropping";
- }
- } else {
- VLOG(1) << " Replaying as other node " << filename();
- // We are replaying from another node's point of view. The only interesting
- // data is data that is sent from our node and received on theirs.
- if (configuration::ChannelIsReadableOnNode(channel,
- reinterpreted_target_node) &&
- configuration::ChannelIsSendableOnNode(channel, node())) {
- VLOG(1) << " Readable on target node";
- // Data from another node.
- message_header_queue = &(channels_[channel_index].data);
- } else {
- VLOG(1) << " Dropping";
- // This is either not sendable on the other node, or is a timestamp and
- // therefore not interesting.
- }
- }
-
- // If we found one, write it down. This will be nullptr when there is nothing
- // relevant on this channel on this node for the target node. In that case,
- // we want to drop the message instead of queueing it.
- if (message_header_queue != nullptr) {
- message_header_queue->timestamp_merger = timestamp_merger;
- }
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
-SplitMessageReader::PopOldest(int channel_index) {
- CHECK_GT(channels_[channel_index].data.size(), 0u);
- const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp = channels_[channel_index].data.front_timestamp();
- SizePrefixedFlatbufferVector<MessageHeader> front =
- std::move(channels_[channel_index].data.front());
- channels_[channel_index].data.PopFront();
-
- VLOG(1) << MaybeNodeName(target_node_) << "Popped Data " << this << " "
- << std::get<0>(timestamp) << " for "
- << configuration::StrippedChannelToString(
- configuration()->channels()->Get(channel_index))
- << " (" << channel_index << ")";
-
- QueueMessages(std::get<0>(timestamp));
-
- return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
- std::move(front));
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
-SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
- CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
- const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp = channels_[channel].timestamps[node_index].front_timestamp();
- SizePrefixedFlatbufferVector<MessageHeader> front =
- std::move(channels_[channel].timestamps[node_index].front());
- channels_[channel].timestamps[node_index].PopFront();
-
- VLOG(1) << MaybeNodeName(target_node_) << "Popped timestamp " << this << " "
- << std::get<0>(timestamp) << " for "
- << configuration::StrippedChannelToString(
- configuration()->channels()->Get(channel))
- << " on "
- << configuration()->nodes()->Get(node_index)->name()->string_view()
- << " (" << node_index << ")";
-
- QueueMessages(std::get<0>(timestamp));
-
- return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
- std::move(front));
-}
-
-bool SplitMessageReader::MessageHeaderQueue::emplace_back(
- SizePrefixedFlatbufferVector<MessageHeader> &&msg) {
- CHECK(split_reader != nullptr);
-
- // If there is no timestamp merger for this queue, nobody is listening. Drop
- // the message. This happens when a log file from another node is replayed,
- // and the timestamp mergers down stream just don't care.
- if (timestamp_merger == nullptr) {
- return false;
- }
-
- CHECK(timestamps != msg.message().has_data())
- << ": Got timestamps and data mixed up on a node. "
- << FlatbufferToJson(msg);
-
- data_.emplace_back(std::move(msg));
-
- if (data_.size() == 1u) {
- // Yup, new data. Notify.
- if (timestamps) {
- timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
- } else {
- timestamp_merger->Update(split_reader, front_timestamp());
- }
- }
-
- return true;
-}
-
-void SplitMessageReader::MessageHeaderQueue::PopFront() {
- data_.pop_front();
- if (data_.size() != 0u) {
- // Yup, new data.
- if (timestamps) {
- timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
- } else {
- timestamp_merger->Update(split_reader, front_timestamp());
- }
- } else {
- // Poke anyways to update the heap.
- if (timestamps) {
- timestamp_merger->UpdateTimestamp(
- nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
- } else {
- timestamp_merger->Update(
- nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
- }
- }
-}
-
-namespace {
-
-bool SplitMessageReaderHeapCompare(
- const std::tuple<monotonic_clock::time_point, uint32_t,
- SplitMessageReader *>
- first,
- const std::tuple<monotonic_clock::time_point, uint32_t,
- SplitMessageReader *>
- second) {
- if (std::get<0>(first) > std::get<0>(second)) {
- return true;
- } else if (std::get<0>(first) == std::get<0>(second)) {
- if (std::get<1>(first) > std::get<1>(second)) {
- return true;
- } else if (std::get<1>(first) == std::get<1>(second)) {
- return std::get<2>(first) > std::get<2>(second);
- } else {
- return false;
- }
- } else {
- return false;
- }
-}
-
-bool ChannelHeapCompare(
- const std::pair<monotonic_clock::time_point, int> first,
- const std::pair<monotonic_clock::time_point, int> second) {
- if (first.first > second.first) {
- return true;
- } else if (first.first == second.first) {
- return first.second > second.second;
- } else {
- return false;
- }
-}
-
-} // namespace
-
-TimestampMerger::TimestampMerger(
- const Configuration *configuration,
- std::vector<SplitMessageReader *> split_message_readers, int channel_index,
- const Node *target_node, ChannelMerger *channel_merger)
- : configuration_(configuration),
- split_message_readers_(std::move(split_message_readers)),
- channel_index_(channel_index),
- node_index_(configuration::MultiNode(configuration)
- ? configuration::GetNodeIndex(configuration, target_node)
- : -1),
- channel_merger_(channel_merger) {
- // Tell the readers we care so they know who to notify.
- VLOG(1) << "Configuring channel " << channel_index << " target node "
- << FlatbufferToJson(target_node);
- for (SplitMessageReader *reader : split_message_readers_) {
- reader->SetTimestampMerger(this, channel_index, target_node);
- }
-
- // And then determine if we need to track timestamps.
- const Channel *channel = configuration->channels()->Get(channel_index);
- if (!configuration::ChannelIsSendableOnNode(channel, target_node) &&
- configuration::ChannelIsReadableOnNode(channel, target_node)) {
- has_timestamps_ = true;
- }
-}
-
-void TimestampMerger::PushMessageHeap(
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp,
- SplitMessageReader *split_message_reader) {
- if (split_message_reader != nullptr) {
- DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
- [split_message_reader](
- const std::tuple<monotonic_clock::time_point,
- uint32_t, SplitMessageReader *>
- x) {
- return std::get<2>(x) == split_message_reader;
- }) == message_heap_.end())
- << ": Pushing message when it is already in the heap.";
-
- message_heap_.push_back(std::make_tuple(
- std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
-
- std::push_heap(message_heap_.begin(), message_heap_.end(),
- &SplitMessageReaderHeapCompare);
- }
-
- // If we are just a data merger, don't wait for timestamps.
- if (!has_timestamps_) {
- if (!message_heap_.empty()) {
- channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
- pushed_ = true;
- } else {
- // Remove ourselves if we are empty.
- channel_merger_->Update(monotonic_clock::min_time, channel_index_);
- }
- }
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-TimestampMerger::oldest_message() const {
- CHECK_GT(message_heap_.size(), 0u);
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_message_reader = message_heap_.front();
- return std::get<2>(oldest_message_reader)->oldest_message(channel_index_);
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-TimestampMerger::oldest_timestamp() const {
- CHECK_GT(timestamp_heap_.size(), 0u);
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_message_reader = timestamp_heap_.front();
- return std::get<2>(oldest_message_reader)
- ->oldest_message(channel_index_, node_index_);
-}
-
-void TimestampMerger::PushTimestampHeap(
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp,
- SplitMessageReader *split_message_reader) {
- if (split_message_reader != nullptr) {
- DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
- [split_message_reader](
- const std::tuple<monotonic_clock::time_point,
- uint32_t, SplitMessageReader *>
- x) {
- return std::get<2>(x) == split_message_reader;
- }) == timestamp_heap_.end())
- << ": Pushing timestamp when it is already in the heap.";
-
- timestamp_heap_.push_back(std::make_tuple(
- std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
-
- std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- SplitMessageReaderHeapCompare);
- }
-
- // If we are a timestamp merger, don't wait for data. Missing data will be
- // caught at read time.
- if (has_timestamps_) {
- if (!timestamp_heap_.empty()) {
- channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
- pushed_ = true;
- } else {
- // Remove ourselves if we are empty.
- channel_merger_->Update(monotonic_clock::min_time, channel_index_);
- }
- }
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
-TimestampMerger::PopMessageHeap() {
- // Pop the oldest message reader pointer off the heap.
- CHECK_GT(message_heap_.size(), 0u);
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_message_reader = message_heap_.front();
-
- std::pop_heap(message_heap_.begin(), message_heap_.end(),
- &SplitMessageReaderHeapCompare);
- message_heap_.pop_back();
-
- // Pop the oldest message. This re-pushes any messages from the reader to the
- // message heap.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- oldest_message =
- std::get<2>(oldest_message_reader)->PopOldest(channel_index_);
-
- // Confirm that the time and queue_index we have recorded matches.
- CHECK_EQ(std::get<0>(oldest_message), std::get<0>(oldest_message_reader));
- CHECK_EQ(std::get<1>(oldest_message), std::get<1>(oldest_message_reader));
-
- // Now, keep reading until we have found all duplicates.
- while (!message_heap_.empty()) {
- // See if it is a duplicate.
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- next_oldest_message_reader = message_heap_.front();
-
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- next_oldest_message_time = std::get<2>(next_oldest_message_reader)
- ->oldest_message(channel_index_);
-
- if (std::get<0>(next_oldest_message_time) == std::get<0>(oldest_message) &&
- std::get<1>(next_oldest_message_time) == std::get<1>(oldest_message)) {
- // Pop the message reader pointer.
- std::pop_heap(message_heap_.begin(), message_heap_.end(),
- &SplitMessageReaderHeapCompare);
- message_heap_.pop_back();
-
- // Pop the next oldest message. This re-pushes any messages from the
- // reader.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- next_oldest_message = std::get<2>(next_oldest_message_reader)
- ->PopOldest(channel_index_);
-
- // And make sure the message matches in it's entirety.
- CHECK(std::get<2>(oldest_message).span() ==
- std::get<2>(next_oldest_message).span())
- << ": Data at the same timestamp doesn't match.";
- } else {
- break;
- }
- }
-
- return oldest_message;
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
-TimestampMerger::PopTimestampHeap() {
- // Pop the oldest message reader pointer off the heap.
- CHECK_GT(timestamp_heap_.size(), 0u);
-
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_timestamp_reader = timestamp_heap_.front();
-
- std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- &SplitMessageReaderHeapCompare);
- timestamp_heap_.pop_back();
-
- CHECK(node_index_ != -1) << ": Timestamps in a single node environment";
-
- // Pop the oldest message. This re-pushes any timestamps from the reader to
- // the timestamp heap.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- oldest_timestamp = std::get<2>(oldest_timestamp_reader)
- ->PopOldestTimestamp(channel_index_, node_index_);
-
- // Confirm that the time we have recorded matches.
- CHECK_EQ(std::get<0>(oldest_timestamp), std::get<0>(oldest_timestamp_reader));
- CHECK_EQ(std::get<1>(oldest_timestamp), std::get<1>(oldest_timestamp_reader));
-
- // Now, keep reading until we have found all duplicates.
- while (!timestamp_heap_.empty()) {
- // See if it is a duplicate.
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- next_oldest_timestamp_reader = timestamp_heap_.front();
-
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- next_oldest_timestamp_time =
- std::get<2>(next_oldest_timestamp_reader)
- ->oldest_message(channel_index_, node_index_);
-
- if (std::get<0>(next_oldest_timestamp_time) ==
- std::get<0>(oldest_timestamp) &&
- std::get<1>(next_oldest_timestamp_time) ==
- std::get<1>(oldest_timestamp)) {
- // Pop the timestamp reader pointer.
- std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- &SplitMessageReaderHeapCompare);
- timestamp_heap_.pop_back();
-
- // Pop the next oldest timestamp. This re-pushes any messages from the
- // reader.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- next_oldest_timestamp =
- std::get<2>(next_oldest_timestamp_reader)
- ->PopOldestTimestamp(channel_index_, node_index_);
-
- // And make sure the contents matches in it's entirety.
- CHECK(std::get<2>(oldest_timestamp).span() ==
- std::get<2>(next_oldest_timestamp).span())
- << ": Data at the same timestamp doesn't match, "
- << aos::FlatbufferToJson(std::get<2>(oldest_timestamp)) << " vs "
- << aos::FlatbufferToJson(std::get<2>(next_oldest_timestamp)) << " "
- << absl::BytesToHexString(std::string_view(
- reinterpret_cast<const char *>(
- std::get<2>(oldest_timestamp).span().data()),
- std::get<2>(oldest_timestamp).span().size()))
- << " vs "
- << absl::BytesToHexString(std::string_view(
- reinterpret_cast<const char *>(
- std::get<2>(next_oldest_timestamp).span().data()),
- std::get<2>(next_oldest_timestamp).span().size()));
-
- } else {
- break;
- }
- }
-
- return oldest_timestamp;
-}
-
-std::tuple<TimestampMerger::DeliveryTimestamp,
- SizePrefixedFlatbufferVector<MessageHeader>>
-TimestampMerger::PopOldest() {
- if (has_timestamps_) {
- VLOG(1) << "Looking for matching timestamp for "
- << configuration::StrippedChannelToString(
- configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ") "
- << " at " << std::get<0>(oldest_timestamp());
-
- // Read the timestamps.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- oldest_timestamp = PopTimestampHeap();
-
- TimestampMerger::DeliveryTimestamp timestamp;
- timestamp.monotonic_event_time =
- monotonic_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp).message().monotonic_sent_time()));
- timestamp.realtime_event_time =
- realtime_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp).message().realtime_sent_time()));
- timestamp.queue_index =
- std::get<2>(oldest_timestamp).message().queue_index();
-
- // Consistency check.
- CHECK_EQ(timestamp.monotonic_event_time, std::get<0>(oldest_timestamp));
- CHECK_EQ(std::get<2>(oldest_timestamp).message().queue_index(),
- std::get<1>(oldest_timestamp));
-
- monotonic_clock::time_point remote_timestamp_monotonic_time(
- chrono::nanoseconds(
- std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
-
- // See if we have any data. If not, pass the problem up the chain.
- if (message_heap_.empty()) {
- LOG(WARNING) << MaybeNodeName(configuration_->nodes()->Get(node_index_))
- << "No data to match timestamp on "
- << configuration::CleanedChannelToString(
- configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ")";
- return std::make_tuple(timestamp,
- std::move(std::get<2>(oldest_timestamp)));
- }
-
- while (true) {
- {
- // Ok, now try grabbing data until we find one which matches.
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message_ref = oldest_message();
-
- // Time at which the message was sent (this message is written from the
- // sending node's perspective.
- monotonic_clock::time_point remote_monotonic_time(chrono::nanoseconds(
- std::get<2>(oldest_message_ref)->monotonic_sent_time()));
-
- if (remote_monotonic_time < remote_timestamp_monotonic_time) {
- LOG(WARNING) << configuration_->nodes()
- ->Get(node_index_)
- ->name()
- ->string_view()
- << " Undelivered message, skipping. Remote time is "
- << remote_monotonic_time << " timestamp is "
- << remote_timestamp_monotonic_time << " on channel "
- << configuration::StrippedChannelToString(
- configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ")";
- PopMessageHeap();
- continue;
- } else if (remote_monotonic_time > remote_timestamp_monotonic_time) {
- LOG(WARNING) << configuration_->nodes()
- ->Get(node_index_)
- ->name()
- ->string_view()
- << " Data not found. Remote time should be "
- << remote_timestamp_monotonic_time
- << ", message time is " << remote_monotonic_time
- << " on channel "
- << configuration::StrippedChannelToString(
- configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ")"
- << (VLOG_IS_ON(1) ? DebugString() : "");
- return std::make_tuple(timestamp,
- std::move(std::get<2>(oldest_timestamp)));
- }
-
- timestamp.monotonic_remote_time = remote_monotonic_time;
- }
-
- VLOG(1) << "Found matching data "
- << configuration::StrippedChannelToString(
- configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ")";
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- oldest_message = PopMessageHeap();
-
- timestamp.realtime_remote_time =
- realtime_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_message).message().realtime_sent_time()));
- timestamp.remote_queue_index =
- std::get<2>(oldest_message).message().queue_index();
-
- CHECK_EQ(timestamp.monotonic_remote_time,
- remote_timestamp_monotonic_time);
-
- CHECK_EQ(timestamp.remote_queue_index,
- std::get<2>(oldest_timestamp).message().remote_queue_index())
- << ": " << FlatbufferToJson(&std::get<2>(oldest_timestamp).message())
- << " data "
- << FlatbufferToJson(&std::get<2>(oldest_message).message());
-
- return std::make_tuple(timestamp, std::move(std::get<2>(oldest_message)));
- }
- } else {
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- oldest_message = PopMessageHeap();
-
- TimestampMerger::DeliveryTimestamp timestamp;
- timestamp.monotonic_event_time =
- monotonic_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_message).message().monotonic_sent_time()));
- timestamp.realtime_event_time =
- realtime_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_message).message().realtime_sent_time()));
- timestamp.queue_index = std::get<2>(oldest_message).message().queue_index();
- timestamp.remote_queue_index = 0xffffffff;
-
- CHECK_EQ(std::get<0>(oldest_message), timestamp.monotonic_event_time);
- CHECK_EQ(std::get<1>(oldest_message),
- std::get<2>(oldest_message).message().queue_index());
-
- return std::make_tuple(timestamp, std::move(std::get<2>(oldest_message)));
- }
-}
-
-void TimestampMerger::NoticeAtEnd() { channel_merger_->NoticeAtEnd(); }
-
-namespace {
-std::vector<std::unique_ptr<SplitMessageReader>> MakeSplitMessageReaders(
- const std::vector<std::vector<std::string>> &filenames) {
- CHECK_GT(filenames.size(), 0u);
- // Build up all the SplitMessageReaders.
- std::vector<std::unique_ptr<SplitMessageReader>> result;
- for (const std::vector<std::string> &filenames : filenames) {
- result.emplace_back(std::make_unique<SplitMessageReader>(filenames));
- }
- return result;
-}
-} // namespace
-
-ChannelMerger::ChannelMerger(
- const std::vector<std::vector<std::string>> &filenames)
- : split_message_readers_(MakeSplitMessageReaders(filenames)),
- log_file_header_(split_message_readers_[0]->raw_log_file_header()) {
- // Now, confirm that the configuration matches for each and pick a start time.
- // Also return the list of possible nodes.
- for (const std::unique_ptr<SplitMessageReader> &reader :
- split_message_readers_) {
- CHECK(CompareFlatBuffer(log_file_header_.message().configuration(),
- reader->log_file_header()->configuration()))
- << ": Replaying log files with different configurations isn't "
- "supported";
- }
-
- nodes_ = configuration::GetNodes(configuration());
-}
-
-bool ChannelMerger::SetNode(const Node *target_node) {
- std::vector<SplitMessageReader *> split_message_readers;
- for (const std::unique_ptr<SplitMessageReader> &reader :
- split_message_readers_) {
- split_message_readers.emplace_back(reader.get());
- }
-
- // Go find a log_file_header for this node.
- {
- bool found_node = false;
-
- for (const std::unique_ptr<SplitMessageReader> &reader :
- split_message_readers_) {
- // In order to identify which logfile(s) map to the target node, do a
- // logical comparison of the nodes, by confirming that we are either in a
- // single-node setup (where the nodes will both be nullptr) or that the
- // node names match (but the other node fields--e.g., hostname lists--may
- // not).
- const bool both_null =
- reader->node() == nullptr && target_node == nullptr;
- const bool both_have_name =
- (reader->node() != nullptr) && (target_node != nullptr) &&
- (reader->node()->has_name() && target_node->has_name());
- const bool node_names_identical =
- both_have_name && (reader->node()->name()->string_view() ==
- target_node->name()->string_view());
- if (both_null || node_names_identical) {
- if (!found_node) {
- found_node = true;
- log_file_header_ = reader->raw_log_file_header();
- VLOG(1) << "Found log file " << reader->filename() << " with node "
- << FlatbufferToJson(reader->node()) << " start_time "
- << monotonic_start_time();
- } else {
- // Find the earliest start time. That way, if we get a full log file
- // directly from the node, and a partial later, we start with the
- // full. Update our header to match that.
- const monotonic_clock::time_point new_monotonic_start_time(
- chrono::nanoseconds(
- reader->log_file_header()->monotonic_start_time()));
- const realtime_clock::time_point new_realtime_start_time(
- chrono::nanoseconds(
- reader->log_file_header()->realtime_start_time()));
-
- if (monotonic_start_time() == monotonic_clock::min_time ||
- (new_monotonic_start_time != monotonic_clock::min_time &&
- new_monotonic_start_time < monotonic_start_time())) {
- log_file_header_.mutable_message()->mutate_monotonic_start_time(
- new_monotonic_start_time.time_since_epoch().count());
- log_file_header_.mutable_message()->mutate_realtime_start_time(
- new_realtime_start_time.time_since_epoch().count());
- VLOG(1) << "Updated log file " << reader->filename()
- << " with node " << FlatbufferToJson(reader->node())
- << " start_time " << new_monotonic_start_time;
- }
- }
- }
- }
-
- if (!found_node) {
- LOG(WARNING) << "Failed to find log file for node "
- << FlatbufferToJson(target_node);
- return false;
- }
- }
-
- // Build up all the timestamp mergers. This connects up all the
- // SplitMessageReaders.
- timestamp_mergers_.reserve(configuration()->channels()->size());
- for (size_t channel_index = 0;
- channel_index < configuration()->channels()->size(); ++channel_index) {
- timestamp_mergers_.emplace_back(
- configuration(), split_message_readers, channel_index,
- configuration::GetNode(configuration(), target_node), this);
- }
-
- // And prime everything.
- for (std::unique_ptr<SplitMessageReader> &split_message_reader :
- split_message_readers_) {
- split_message_reader->QueueMessages(
- split_message_reader->monotonic_start_time());
- }
-
- node_ = configuration::GetNodeOrDie(configuration(), target_node);
- return true;
-}
-
-monotonic_clock::time_point ChannelMerger::OldestMessageTime() const {
- if (channel_heap_.empty()) {
- return monotonic_clock::max_time;
- }
- return channel_heap_.front().first;
-}
-
-void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
- int channel_index) {
- // Pop and recreate the heap if it has already been pushed. And since we are
- // pushing again, we don't need to clear pushed.
- if (timestamp_mergers_[channel_index].pushed()) {
- const auto channel_iterator = std::find_if(
- channel_heap_.begin(), channel_heap_.end(),
- [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
- return x.second == channel_index;
- });
- DCHECK(channel_iterator != channel_heap_.end());
- if (std::get<0>(*channel_iterator) == timestamp) {
- // It's already in the heap, in the correct spot, so nothing
- // more for us to do here.
- return;
- }
- channel_heap_.erase(channel_iterator);
- std::make_heap(channel_heap_.begin(), channel_heap_.end(),
- ChannelHeapCompare);
- }
-
- if (timestamp == monotonic_clock::min_time) {
- timestamp_mergers_[channel_index].set_pushed(false);
- return;
- }
-
- channel_heap_.push_back(std::make_pair(timestamp, channel_index));
-
- // The default sort puts the newest message first. Use a custom comparator to
- // put the oldest message first.
- std::push_heap(channel_heap_.begin(), channel_heap_.end(),
- ChannelHeapCompare);
-}
-
-void ChannelMerger::VerifyHeaps() {
- std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
- channel_heap_;
- std::make_heap(channel_heap.begin(), channel_heap.end(), &ChannelHeapCompare);
-
- for (size_t i = 0; i < channel_heap_.size(); ++i) {
- CHECK(channel_heap_[i] == channel_heap[i]) << ": Heaps diverged...";
- CHECK_EQ(
- std::get<0>(channel_heap[i]),
- timestamp_mergers_[std::get<1>(channel_heap[i])].channel_merger_time());
- }
-}
-
-std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>>
-ChannelMerger::PopOldest() {
- CHECK_GT(channel_heap_.size(), 0u);
- std::pair<monotonic_clock::time_point, int> oldest_channel_data =
- channel_heap_.front();
- int channel_index = oldest_channel_data.second;
- std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
- &ChannelHeapCompare);
- channel_heap_.pop_back();
-
- timestamp_mergers_[channel_index].set_pushed(false);
-
- TimestampMerger *merger = ×tamp_mergers_[channel_index];
-
- // Merger handles any queueing needed from here.
- std::tuple<TimestampMerger::DeliveryTimestamp,
- SizePrefixedFlatbufferVector<MessageHeader>>
- message = merger->PopOldest();
- DCHECK_EQ(std::get<0>(message).monotonic_event_time,
- oldest_channel_data.first)
- << ": channel_heap_ was corrupted for " << channel_index << ": "
- << DebugString();
-
- CHECK_GE(std::get<0>(message).monotonic_event_time, last_popped_time_)
- << ": " << MaybeNodeName(log_file_header()->node())
- << "Messages came off the queue out of order. " << DebugString();
- last_popped_time_ = std::get<0>(message).monotonic_event_time;
-
- VLOG(1) << "Popped " << last_popped_time_ << " "
- << configuration::StrippedChannelToString(
- configuration()->channels()->Get(channel_index))
- << " (" << channel_index << ")";
-
- return std::make_tuple(std::get<0>(message), channel_index,
- std::move(std::get<1>(message)));
-}
-
-std::string SplitMessageReader::MessageHeaderQueue::DebugString() const {
- std::stringstream ss;
- for (size_t i = 0; i < data_.size(); ++i) {
- if (i < 5 || i + 5 > data_.size()) {
- if (timestamps) {
- ss << " msg: ";
- } else {
- ss << " timestamp: ";
- }
- ss << monotonic_clock::time_point(
- chrono::nanoseconds(data_[i].message().monotonic_sent_time()))
- << " ("
- << realtime_clock::time_point(
- chrono::nanoseconds(data_[i].message().realtime_sent_time()))
- << ") " << data_[i].message().queue_index();
- if (timestamps) {
- ss << " <- remote "
- << monotonic_clock::time_point(chrono::nanoseconds(
- data_[i].message().monotonic_remote_time()))
- << " ("
- << realtime_clock::time_point(chrono::nanoseconds(
- data_[i].message().realtime_remote_time()))
- << ")";
- }
- ss << "\n";
- } else if (i == 5) {
- ss << " ...\n";
- }
- }
-
- return ss.str();
-}
-
-std::string SplitMessageReader::DebugString(int channel) const {
- std::stringstream ss;
- ss << "[\n";
- ss << channels_[channel].data.DebugString();
- ss << " ]";
- return ss.str();
-}
-
-std::string SplitMessageReader::DebugString(int channel, int node_index) const {
- std::stringstream ss;
- ss << "[\n";
- ss << channels_[channel].timestamps[node_index].DebugString();
- ss << " ]";
- return ss.str();
-}
-
-std::string TimestampMerger::DebugString() const {
- std::stringstream ss;
-
- if (timestamp_heap_.size() > 0) {
- ss << " timestamp_heap {\n";
- std::vector<
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
- timestamp_heap = timestamp_heap_;
- while (timestamp_heap.size() > 0u) {
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_timestamp_reader = timestamp_heap.front();
-
- ss << " " << std::get<2>(oldest_timestamp_reader) << " "
- << std::get<0>(oldest_timestamp_reader) << " queue_index ("
- << std::get<1>(oldest_timestamp_reader) << ") ttq "
- << std::get<2>(oldest_timestamp_reader)->time_to_queue() << " "
- << std::get<2>(oldest_timestamp_reader)->filename() << " -> "
- << std::get<2>(oldest_timestamp_reader)
- ->DebugString(channel_index_, node_index_)
- << "\n";
-
- std::pop_heap(timestamp_heap.begin(), timestamp_heap.end(),
- &SplitMessageReaderHeapCompare);
- timestamp_heap.pop_back();
- }
- ss << " }\n";
- }
-
- ss << " message_heap {\n";
- {
- std::vector<
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
- message_heap = message_heap_;
- while (!message_heap.empty()) {
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_message_reader = message_heap.front();
-
- ss << " " << std::get<2>(oldest_message_reader) << " "
- << std::get<0>(oldest_message_reader) << " queue_index ("
- << std::get<1>(oldest_message_reader) << ") ttq "
- << std::get<2>(oldest_message_reader)->time_to_queue() << " "
- << std::get<2>(oldest_message_reader)->filename() << " -> "
- << std::get<2>(oldest_message_reader)->DebugString(channel_index_)
- << "\n";
-
- std::pop_heap(message_heap.begin(), message_heap.end(),
- &SplitMessageReaderHeapCompare);
- message_heap.pop_back();
- }
- }
- ss << " }";
-
- return ss.str();
-}
-
-std::string ChannelMerger::DebugString() const {
- std::stringstream ss;
- ss << "start_time " << realtime_start_time() << " " << monotonic_start_time()
- << "\n";
- ss << "channel_heap {\n";
- std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
- channel_heap_;
- while (!channel_heap.empty()) {
- std::tuple<monotonic_clock::time_point, int> channel = channel_heap.front();
- ss << " " << std::get<0>(channel) << " (" << std::get<1>(channel) << ") "
- << configuration::CleanedChannelToString(
- configuration()->channels()->Get(std::get<1>(channel)))
- << "\n";
-
- ss << timestamp_mergers_[std::get<1>(channel)].DebugString() << "\n";
-
- std::pop_heap(channel_heap.begin(), channel_heap.end(),
- &ChannelHeapCompare);
- channel_heap.pop_back();
- }
- ss << "}";
-
- return ss.str();
-}
-
std::string MaybeNodeName(const Node *node) {
if (node != nullptr) {
return node->name()->str() + " ";
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index d24e0af..fd600d3 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -478,7 +478,7 @@
}
// Returns which node this is sorting for.
- size_t node() const { return node_; }
+ size_t node() const { return node_merger_.node(); }
// The start time of this log.
monotonic_clock::time_point monotonic_start_time() const {
@@ -550,8 +550,6 @@
// The node merger to source messages from.
NodeMerger node_merger_;
- // Our node.
- const size_t node_;
// The buffer of messages for this node. These are not matched with any
// remote data.
std::deque<Message> messages_;
@@ -580,454 +578,6 @@
monotonic_clock::time_point queued_until_ = monotonic_clock::min_time;
};
-class TimestampMerger;
-
-// A design requirement is that the relevant data for a channel is not more than
-// max_out_of_order_duration out of order. We approach sorting in layers.
-//
-// 1) Split each (maybe chunked) log file into one queue per channel. Read this
-// log file looking for data pertaining to a specific node.
-// (SplitMessageReader)
-// 2) Merge all the data per channel from the different log files into a sorted
-// list of timestamps and messages. (TimestampMerger)
-// 3) Combine the timestamps and messages. (TimestampMerger)
-// 4) Merge all the channels to produce the next message on a node.
-// (ChannelMerger)
-// 5) Duplicate this entire stack per node.
-
-// This class splits messages and timestamps up into a queue per channel, and
-// handles reading data from multiple chunks.
-class SplitMessageReader {
- public:
- SplitMessageReader(const std::vector<std::string> &filenames);
-
- // Sets the TimestampMerger that gets notified for each channel. The node
- // that the TimestampMerger is merging as needs to be passed in.
- void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
- const Node *target_node);
-
- // Returns the (timestamp, queue_index, message_header) for the oldest message
- // in a channel, or max_time if there is nothing in the channel.
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message(int channel) {
- return channels_[channel].data.front_timestamp();
- }
-
- // Returns the (timestamp, queue_index, message_header) for the oldest
- // delivery time in a channel, or max_time if there is nothing in the channel.
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message(int channel, int destination_node) {
- return channels_[channel].timestamps[destination_node].front_timestamp();
- }
-
- // Returns the timestamp, queue_index, and message for the oldest data on a
- // channel. Requeues data as needed.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopOldest(int channel_index);
-
- // Returns the timestamp, queue_index, and message for the oldest timestamp on
- // a channel delivered to a node. Requeues data as needed.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopOldestTimestamp(int channel, int node_index);
-
- // Returns the header for the log files.
- const LogFileHeader *log_file_header() const {
- return &log_file_header_.message();
- }
-
- const SizePrefixedFlatbufferVector<LogFileHeader> &raw_log_file_header()
- const {
- return log_file_header_;
- }
-
- // Returns the starting time for this set of log files.
- monotonic_clock::time_point monotonic_start_time() {
- return monotonic_clock::time_point(
- std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
- }
- realtime_clock::time_point realtime_start_time() {
- return realtime_clock::time_point(
- std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
- }
-
- // Returns the configuration from the log file header.
- const Configuration *configuration() const {
- return log_file_header()->configuration();
- }
-
- // Returns the node who's point of view this log file is from. Make sure this
- // is a pointer in the configuration() nodes list so it can be consumed
- // elsewhere.
- const Node *node() const {
- if (configuration()->has_nodes()) {
- return configuration::GetNodeOrDie(configuration(),
- log_file_header()->node());
- } else {
- CHECK(!log_file_header()->has_node());
- return nullptr;
- }
- }
-
- // Returns the timestamp of the newest message read from the log file, and the
- // timestamp that we need to re-queue data.
- monotonic_clock::time_point newest_timestamp() const {
- return newest_timestamp_;
- }
-
- // Returns the next time to trigger a requeue.
- monotonic_clock::time_point time_to_queue() const { return time_to_queue_; }
-
- // Returns the minimum amount of data needed to queue up for sorting before
- // we are guarenteed to not see data out of order.
- std::chrono::nanoseconds max_out_of_order_duration() const {
- return message_reader_->max_out_of_order_duration();
- }
-
- std::string_view filename() const { return message_reader_->filename(); }
-
- // Adds more messages to the sorted list. This reads enough data such that
- // oldest_message_time can be replayed safely. Returns false if the log file
- // has all been read.
- bool QueueMessages(monotonic_clock::time_point oldest_message_time);
-
- // Returns debug strings for a channel, and timestamps for a node.
- std::string DebugString(int channel) const;
- std::string DebugString(int channel, int node_index) const;
-
- // Returns true if all the messages have been queued from the last log file in
- // the list of log files chunks.
- bool at_end() const { return at_end_; }
-
- private:
- // TODO(austin): Need to copy or refcount the message instead of running
- // multiple copies of the reader. Or maybe have a "as_node" index and hide it
- // inside.
-
- // Moves to the next log file in the list.
- bool NextLogFile();
-
- // Filenames of the log files.
- std::vector<std::string> filenames_;
- // And the index of the next file to open.
- size_t next_filename_index_ = 0;
-
- // Node we are reading as.
- const Node *target_node_ = nullptr;
-
- // Log file header to report. This is a copy.
- SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
- // Current log file being read.
- std::unique_ptr<MessageReader> message_reader_;
-
- // Datastructure to hold the list of messages, cached timestamp for the
- // oldest message, and sender to send with.
- struct MessageHeaderQueue {
- // If true, this is a timestamp queue.
- bool timestamps = false;
-
- // Returns a reference to the the oldest message.
- SizePrefixedFlatbufferVector<MessageHeader> &front() {
- CHECK_GT(data_.size(), 0u);
- return data_.front();
- }
-
- // Adds a message to the back of the queue. Returns true if it was actually
- // emplaced.
- bool emplace_back(SizePrefixedFlatbufferVector<MessageHeader> &&msg);
-
- // Drops the front message. Invalidates the front() reference.
- void PopFront();
-
- // The size of the queue.
- size_t size() { return data_.size(); }
-
- // Returns a debug string with info about each message in the queue.
- std::string DebugString() const;
-
- // Returns the (timestamp, queue_index, message_header) for the oldest
- // message.
- const std::tuple<monotonic_clock::time_point, uint32_t,
- const MessageHeader *>
- front_timestamp() {
- const MessageHeader &message = front().message();
- return std::make_tuple(
- monotonic_clock::time_point(
- std::chrono::nanoseconds(message.monotonic_sent_time())),
- message.queue_index(), &message);
- }
-
- // Pointer to the timestamp merger for this queue if available.
- TimestampMerger *timestamp_merger = nullptr;
- // Pointer to the reader which feeds this queue.
- SplitMessageReader *split_reader = nullptr;
-
- private:
- // The data.
- std::deque<SizePrefixedFlatbufferVector<MessageHeader>> data_;
- };
-
- // All the queues needed for a channel. There isn't going to be data in all
- // of these.
- struct ChannelData {
- // The data queue for the channel.
- MessageHeaderQueue data;
- // Queues for timestamps for each node.
- std::vector<MessageHeaderQueue> timestamps;
- };
-
- // Data for all the channels.
- std::vector<ChannelData> channels_;
-
- // Once we know the node that this SplitMessageReader will be writing as,
- // there will be only one MessageHeaderQueue that a specific channel matches.
- // Precompute this here for efficiency.
- std::vector<MessageHeaderQueue *> channels_to_write_;
-
- monotonic_clock::time_point time_to_queue_ = monotonic_clock::min_time;
-
- // Latches true when we hit the end of the last log file and there is no sense
- // poking it further.
- bool at_end_ = false;
-
- // Timestamp of the newest message that was read and actually queued. We want
- // to track this independently from the log file because we need the
- // timestamps here to be timestamps of messages that are queued.
- monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
-};
-
-class ChannelMerger;
-
-// Sorts channels (and timestamps) from multiple log files for a single channel.
-class TimestampMerger {
- public:
- TimestampMerger(const Configuration *configuration,
- std::vector<SplitMessageReader *> split_message_readers,
- int channel_index, const Node *target_node,
- ChannelMerger *channel_merger);
-
- // Metadata used to schedule the message.
- struct DeliveryTimestamp {
- monotonic_clock::time_point monotonic_event_time =
- monotonic_clock::min_time;
- realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
- uint32_t queue_index = 0xffffffff;
-
- monotonic_clock::time_point monotonic_remote_time =
- monotonic_clock::min_time;
- realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
- uint32_t remote_queue_index = 0xffffffff;
- };
-
- // Pushes SplitMessageReader onto the timestamp heap. This should only be
- // called when timestamps are placed in the channel this class is merging for
- // the reader.
- void UpdateTimestamp(
- SplitMessageReader *split_message_reader,
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message_time) {
- PushTimestampHeap(oldest_message_time, split_message_reader);
- }
- // Pushes SplitMessageReader onto the message heap. This should only be
- // called when data is placed in the channel this class is merging for the
- // reader.
- void Update(
- SplitMessageReader *split_message_reader,
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message_time) {
- PushMessageHeap(oldest_message_time, split_message_reader);
- }
-
- // Returns the oldest combined timestamp and data for this channel. If there
- // isn't a matching piece of data, returns only the timestamp with no data.
- // The caller can determine what the appropriate action is to recover.
- std::tuple<DeliveryTimestamp, SizePrefixedFlatbufferVector<MessageHeader>>
- PopOldest();
-
- // Tracks if the channel merger has pushed this onto it's heap or not.
- bool pushed() { return pushed_; }
- // Sets if this has been pushed to the channel merger heap. Should only be
- // called by the channel merger.
- void set_pushed(bool pushed) { pushed_ = pushed; }
-
- // Returns a debug string with the heaps printed out.
- std::string DebugString() const;
-
- // Returns true if we have timestamps.
- bool has_timestamps() const { return has_timestamps_; }
-
- // Records that one of the log files ran out of data. This should only be
- // called by a SplitMessageReader.
- void NoticeAtEnd();
-
- aos::monotonic_clock::time_point channel_merger_time() {
- if (has_timestamps_) {
- return std::get<0>(timestamp_heap_[0]);
- } else {
- return std::get<0>(message_heap_[0]);
- }
- }
-
- private:
- // Pushes messages and timestamps to the corresponding heaps.
- void PushMessageHeap(
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp,
- SplitMessageReader *split_message_reader);
- void PushTimestampHeap(
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp,
- SplitMessageReader *split_message_reader);
-
- // Pops a message from the message heap. This automatically triggers the
- // split message reader to re-fetch any new data.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopMessageHeap();
-
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message() const;
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_timestamp() const;
- // Pops a message from the timestamp heap. This automatically triggers the
- // split message reader to re-fetch any new data.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopTimestampHeap();
-
- const Configuration *configuration_;
-
- // If true, this is a forwarded channel and timestamps should be matched.
- bool has_timestamps_ = false;
-
- // Tracks if the ChannelMerger has pushed this onto it's queue.
- bool pushed_ = false;
-
- // The split message readers used for source data.
- std::vector<SplitMessageReader *> split_message_readers_;
-
- // The channel to merge.
- int channel_index_;
-
- // Our node.
- int node_index_;
-
- // Heaps for messages and timestamps.
- std::vector<
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
- message_heap_;
- std::vector<
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
- timestamp_heap_;
-
- // Parent channel merger.
- ChannelMerger *channel_merger_;
-};
-
-// This class handles constructing all the split message readers, channel
-// mergers, and combining the results.
-class ChannelMerger {
- public:
- // Builds a ChannelMerger around a set of log files. These are of the format:
- // {
- // {log1_part0, log1_part1, ...},
- // {log2}
- // }
- // The inner vector is a list of log file chunks which form up a log file.
- // The outer vector is a list of log files with subsets of the messages, or
- // messages from different nodes.
- ChannelMerger(const std::vector<std::vector<std::string>> &filenames);
-
- // Returns the nodes that we know how to merge.
- const std::vector<const Node *> nodes() const;
- // Sets the node that we will return messages as. Returns true if the node
- // has log files and will produce data. This can only be called once, and
- // will likely corrupt state if called a second time.
- bool SetNode(const Node *target_node);
-
- // Everything else needs the node set before it works.
-
- // Returns a timestamp for the oldest message in this group of logfiles.
- monotonic_clock::time_point OldestMessageTime() const;
- // Pops the oldest message.
- std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopOldest();
-
- // Returns the config for this set of log files.
- const Configuration *configuration() const {
- return log_file_header()->configuration();
- }
-
- const LogFileHeader *log_file_header() const {
- return &log_file_header_.message();
- }
-
- // Returns the start times for the configured node's log files.
- monotonic_clock::time_point monotonic_start_time() const {
- return monotonic_clock::time_point(
- std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
- }
- realtime_clock::time_point realtime_start_time() const {
- return realtime_clock::time_point(
- std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
- }
-
- // Returns the node set by SetNode above.
- const Node *node() const { return node_; }
-
- // Called by the TimestampMerger when new data is available with the provided
- // timestamp and channel_index.
- void Update(monotonic_clock::time_point timestamp, int channel_index) {
- PushChannelHeap(timestamp, channel_index);
- }
-
- // Returns a debug string with all the heaps in it. Generally only useful for
- // debugging what went wrong.
- std::string DebugString() const;
-
- // Returns true if one of the log files has finished reading everything. When
- // log file chunks are involved, this means that the last chunk in a log file
- // has been read. It is acceptable to be missing data at this point in time.
- bool at_end() const { return at_end_; }
-
- // Marks that one of the log files is at the end. This should only be called
- // by timestamp mergers.
- void NoticeAtEnd() { at_end_ = true; }
-
- private:
- // Pushes the timestamp for new data on the provided channel.
- void PushChannelHeap(monotonic_clock::time_point timestamp,
- int channel_index);
-
- // CHECKs that channel_heap_ and timestamp_heap_ are valid heaps.
- void VerifyHeaps();
-
- // All the message readers.
- std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
-
- // The log header we are claiming to be.
- SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
-
- // The timestamp mergers which combine data from the split message readers.
- std::vector<TimestampMerger> timestamp_mergers_;
-
- // A heap of the channel readers and timestamps for the oldest data in each.
- std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
-
- // Configured node.
- const Node *node_;
-
- bool at_end_ = false;
-
- // Cached copy of the list of nodes.
- std::vector<const Node *> nodes_;
-
- // Last time popped. Used to detect events being returned out of order.
- monotonic_clock::time_point last_popped_time_ = monotonic_clock::min_time;
-};
-
// Returns the node name with a trailing space, or an empty string if we are on
// a single node.
std::string MaybeNodeName(const Node *);
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index b32c748..5a6bb8f 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -38,11 +38,12 @@
namespace {
// Helper to safely read a header, or CHECK.
SizePrefixedFlatbufferVector<LogFileHeader> MaybeReadHeaderOrDie(
- const std::vector<std::vector<std::string>> &filenames) {
- CHECK_GE(filenames.size(), 1u) << ": Empty filenames list";
- CHECK_GE(filenames[0].size(), 1u) << ": Empty filenames list";
+ const std::vector<LogFile> &log_files) {
+ CHECK_GE(log_files.size(), 1u) << ": Empty filenames list";
+ CHECK_GE(log_files[0].parts.size(), 1u) << ": Empty filenames list";
+ CHECK_GE(log_files[0].parts[0].parts.size(), 1u) << ": Empty filenames list";
std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> result =
- ReadHeader(filenames[0][0]);
+ ReadHeader(log_files[0].parts[0].parts[0]);
CHECK(result);
return result.value();
}
@@ -717,24 +718,12 @@
LogReader::LogReader(std::string_view filename,
const Configuration *replay_configuration)
- : LogReader(std::vector<std::string>{std::string(filename)},
- replay_configuration) {}
+ : LogReader(SortParts({std::string(filename)}), replay_configuration) {}
-LogReader::LogReader(const std::vector<std::string> &filenames,
+LogReader::LogReader(std::vector<LogFile> log_files,
const Configuration *replay_configuration)
- : LogReader(std::vector<std::vector<std::string>>{filenames},
- replay_configuration) {}
-
-// TODO(austin): Make this the base and kill the others. This has much better
-// context for sorting.
-LogReader::LogReader(const std::vector<LogFile> &log_files,
- const Configuration *replay_configuration)
- : LogReader(ToLogReaderVector(log_files), replay_configuration) {}
-
-LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
- const Configuration *replay_configuration)
- : filenames_(filenames),
- log_file_header_(MaybeReadHeaderOrDie(filenames)),
+ : log_files_(std::move(log_files)),
+ log_file_header_(MaybeReadHeaderOrDie(log_files_)),
replay_configuration_(replay_configuration) {
MakeRemappedConfig();
@@ -762,8 +751,8 @@
}
if (!configuration::MultiNode(configuration())) {
- states_.emplace_back(
- std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
+ states_.emplace_back(std::make_unique<State>(
+ std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, ""))));
} else {
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -851,8 +840,12 @@
for (const Node *node : configuration::GetNodes(configuration())) {
const size_t node_index =
configuration::GetNodeIndex(configuration(), node);
- states_[node_index] =
- std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
+ std::vector<LogParts> filtered_parts = FilterPartsForNode(
+ log_files_, node != nullptr ? node->name()->string_view() : "");
+ states_[node_index] = std::make_unique<State>(
+ filtered_parts.size() == 0u
+ ? nullptr
+ : std::make_unique<TimestampMapper>(std::move(filtered_parts)));
State *state = states_[node_index].get();
state->set_event_loop(state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node)));
@@ -860,6 +853,20 @@
state->SetChannelCount(logged_configuration()->channels()->size());
}
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ const size_t node_index =
+ configuration::GetNodeIndex(configuration(), node);
+ State *state = states_[node_index].get();
+ for (const Node *other_node : configuration::GetNodes(configuration())) {
+ const size_t other_node_index =
+ configuration::GetNodeIndex(configuration(), other_node);
+ State *other_state = states_[other_node_index].get();
+ if (other_state != state) {
+ state->AddPeer(other_state);
+ }
+ }
+ }
+
// Register after making all the State objects so we can build references
// between them.
for (const Node *node : configuration::GetNodes(configuration())) {
@@ -971,6 +978,9 @@
VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
<< MaybeNodeName(state->event_loop()->node()) << "now "
<< state->monotonic_now();
+ if (state->monotonic_start_time() == monotonic_clock::min_time) {
+ continue;
+ }
// And start computing the start time on the distributed clock now that
// that works.
start_time = std::max(
@@ -1221,8 +1231,6 @@
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
- const bool has_data = state->SetNode();
-
for (size_t logged_channel_index = 0;
logged_channel_index < logged_configuration()->channels()->size();
++logged_channel_index) {
@@ -1267,7 +1275,7 @@
// If we didn't find any log files with data in them, we won't ever get a
// callback or be live. So skip the rest of the setup.
- if (!has_data) {
+ if (state->OldestMessageTime() == monotonic_clock::max_time) {
return;
}
@@ -1283,45 +1291,39 @@
}
return;
}
- TimestampMerger::DeliveryTimestamp channel_timestamp;
- int channel_index;
- SizePrefixedFlatbufferVector<MessageHeader> channel_data =
- SizePrefixedFlatbufferVector<MessageHeader>::Empty();
-
if (VLOG_IS_ON(1)) {
LogFit("Offset was");
}
bool update_time;
- std::tie(channel_timestamp, channel_index, channel_data) =
- state->PopOldest(&update_time);
+ TimestampedMessage timestamped_message = state->PopOldest(&update_time);
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
if (!FLAGS_skip_order_validation) {
- CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
+ CHECK(monotonic_now == timestamped_message.monotonic_event_time)
<< ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
<< monotonic_now << " trying to send "
- << channel_timestamp.monotonic_event_time << " failure "
+ << timestamped_message.monotonic_event_time << " failure "
<< state->DebugString();
- } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
+ } else if (monotonic_now != timestamped_message.monotonic_event_time) {
LOG(WARNING) << "Check failed: monotonic_now == "
- "channel_timestamp.monotonic_event_time) ("
+ "timestamped_message.monotonic_event_time) ("
<< monotonic_now << " vs. "
- << channel_timestamp.monotonic_event_time
+ << timestamped_message.monotonic_event_time
<< "): " << FlatbufferToJson(state->event_loop()->node())
<< " Now " << monotonic_now << " trying to send "
- << channel_timestamp.monotonic_event_time << " failure "
+ << timestamped_message.monotonic_event_time << " failure "
<< state->DebugString();
}
- if (channel_timestamp.monotonic_event_time >
+ if (timestamped_message.monotonic_event_time >
state->monotonic_start_time() ||
event_loop_factory_ != nullptr) {
if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
!state->at_end()) ||
- channel_data.message().data() != nullptr) {
- CHECK(channel_data.message().data() != nullptr)
+ timestamped_message.data.span().size() != 0u) {
+ CHECK_NE(timestamped_message.data.span().size(), 0u)
<< ": Got a message without data. Forwarding entry which was "
"not matched? Use --skip_missing_forwarding_entries to "
"ignore this.";
@@ -1331,28 +1333,38 @@
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
if (!FLAGS_skip_order_validation) {
- CHECK_LT(channel_timestamp.monotonic_remote_time,
- state->monotonic_remote_now(channel_index))
+ CHECK_LT(
+ timestamped_message.monotonic_remote_time,
+ state->monotonic_remote_now(timestamped_message.channel_index))
<< state->event_loop()->node()->name()->string_view() << " to "
- << state->remote_node(channel_index)->name()->string_view()
+ << state->remote_node(timestamped_message.channel_index)
+ ->name()
+ ->string_view()
<< " " << state->DebugString();
- } else if (channel_timestamp.monotonic_remote_time >=
- state->monotonic_remote_now(channel_index)) {
+ } else if (timestamped_message.monotonic_remote_time >=
+ state->monotonic_remote_now(
+ timestamped_message.channel_index)) {
LOG(WARNING)
- << "Check failed: channel_timestamp.monotonic_remote_time < "
- "state->monotonic_remote_now(channel_index) ("
- << channel_timestamp.monotonic_remote_time << " vs. "
- << state->monotonic_remote_now(channel_index) << ") "
- << state->event_loop()->node()->name()->string_view() << " to "
- << state->remote_node(channel_index)->name()->string_view()
- << " currently " << channel_timestamp.monotonic_event_time
+ << "Check failed: timestamped_message.monotonic_remote_time < "
+ "state->monotonic_remote_now(timestamped_message.channel_"
+ "index) ("
+ << timestamped_message.monotonic_remote_time << " vs. "
+ << state->monotonic_remote_now(
+ timestamped_message.channel_index)
+ << ") " << state->event_loop()->node()->name()->string_view()
+ << " to "
+ << state->remote_node(timestamped_message.channel_index)
+ ->name()
+ ->string_view()
+ << " currently " << timestamped_message.monotonic_event_time
<< " ("
<< state->ToDistributedClock(
- channel_timestamp.monotonic_event_time)
+ timestamped_message.monotonic_event_time)
<< ") remote event time "
- << channel_timestamp.monotonic_remote_time << " ("
+ << timestamped_message.monotonic_remote_time << " ("
<< state->RemoteToDistributedClock(
- channel_index, channel_timestamp.monotonic_remote_time)
+ timestamped_message.channel_index,
+ timestamped_message.monotonic_remote_time)
<< ") " << state->DebugString();
}
@@ -1362,12 +1374,12 @@
fprintf(
offset_fp_,
"# time_since_start, offset node 0, offset node 1, ...\n");
- first_time_ = channel_timestamp.realtime_event_time;
+ first_time_ = timestamped_message.realtime_event_time;
}
fprintf(offset_fp_, "%.9f",
std::chrono::duration_cast<std::chrono::duration<double>>(
- channel_timestamp.realtime_event_time - first_time_)
+ timestamped_message.realtime_event_time - first_time_)
.count());
for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
fprintf(offset_fp_, ", %.9f",
@@ -1383,15 +1395,14 @@
}
// If we have access to the factory, use it to fix the realtime time.
- state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
- channel_timestamp.realtime_event_time);
+ state->SetRealtimeOffset(timestamped_message.monotonic_event_time,
+ timestamped_message.realtime_event_time);
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
- << channel_timestamp.monotonic_event_time;
+ << timestamped_message.monotonic_event_time;
// TODO(austin): std::move channel_data in and make that efficient in
// simulation.
- state->Send(channel_index, channel_data.message().data()->Data(),
- channel_data.message().data()->size(), channel_timestamp);
+ state->Send(std::move(timestamped_message));
} else if (state->at_end() && !ignore_missing_data_) {
// We are at the end of the log file and found missing data. Finish
// reading the rest of the log file and call it quits. We don't want
@@ -1401,15 +1412,15 @@
state->PopOldest(&update_time_dummy);
}
} else {
- CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
+ CHECK(timestamped_message.data.span().data() == nullptr) << ": Nullptr";
}
} else {
LOG(WARNING)
<< "Not sending data from before the start of the log file. "
- << channel_timestamp.monotonic_event_time.time_since_epoch().count()
+ << timestamped_message.monotonic_event_time.time_since_epoch().count()
<< " start " << monotonic_start_time().time_since_epoch().count()
<< " "
- << FlatbufferToJson(channel_data,
+ << FlatbufferToJson(timestamped_message.data,
{.multi_line = false, .max_vector_size = 100});
}
@@ -1742,8 +1753,14 @@
return remapped_channel;
}
-LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
- : channel_merger_(std::move(channel_merger)) {}
+LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper)
+ : timestamp_mapper_(std::move(timestamp_mapper)) {}
+
+void LogReader::State::AddPeer(State *peer) {
+ if (timestamp_mapper_ && peer->timestamp_mapper_) {
+ timestamp_mapper_->AddPeer(peer->timestamp_mapper_.get());
+ }
+}
EventLoop *LogReader::State::SetNodeEventLoopFactory(
NodeEventLoopFactory *node_event_loop_factory) {
@@ -1783,22 +1800,20 @@
factory_channel_index_[logged_channel_index] = factory_channel_index;
}
-bool LogReader::State::Send(
- size_t channel_index, const void *data, size_t size,
- const TimestampMerger::DeliveryTimestamp &delivery_timestamp) {
- aos::RawSender *sender = channels_[channel_index].get();
+bool LogReader::State::Send(const TimestampedMessage ×tamped_message) {
+ aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
uint32_t remote_queue_index = 0xffffffff;
- if (remote_timestamp_senders_[channel_index] != nullptr) {
- std::vector<SentTimestamp> *queue_index_map =
- CHECK_NOTNULL(CHECK_NOTNULL(channel_source_state_[channel_index])
- ->queue_index_map_[channel_index]
- .get());
+ if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
+ std::vector<SentTimestamp> *queue_index_map = CHECK_NOTNULL(
+ CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index])
+ ->queue_index_map_[timestamped_message.channel_index]
+ .get());
SentTimestamp search;
- search.monotonic_event_time = delivery_timestamp.monotonic_remote_time;
- search.realtime_event_time = delivery_timestamp.realtime_remote_time;
- search.queue_index = delivery_timestamp.remote_queue_index;
+ search.monotonic_event_time = timestamped_message.monotonic_remote_time;
+ search.realtime_event_time = timestamped_message.realtime_remote_time;
+ search.queue_index = timestamped_message.remote_queue_index;
// Find the sent time if available.
auto element = std::lower_bound(
@@ -1828,10 +1843,10 @@
// receive time.
if (element != queue_index_map->end()) {
CHECK_EQ(element->monotonic_event_time,
- delivery_timestamp.monotonic_remote_time);
+ timestamped_message.monotonic_remote_time);
CHECK_EQ(element->realtime_event_time,
- delivery_timestamp.realtime_remote_time);
- CHECK_EQ(element->queue_index, delivery_timestamp.remote_queue_index);
+ timestamped_message.realtime_remote_time);
+ CHECK_EQ(element->queue_index, timestamped_message.remote_queue_index);
remote_queue_index = element->actual_queue_index;
}
@@ -1839,27 +1854,32 @@
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
- const bool sent =
- sender->Send(data, size, delivery_timestamp.monotonic_remote_time,
- delivery_timestamp.realtime_remote_time, remote_queue_index);
+ const bool sent = sender->Send(
+ timestamped_message.data.message().data()->Data(),
+ timestamped_message.data.message().data()->size(),
+ timestamped_message.monotonic_remote_time,
+ timestamped_message.realtime_remote_time, remote_queue_index);
if (!sent) return false;
- if (queue_index_map_[channel_index]) {
+ if (queue_index_map_[timestamped_message.channel_index]) {
SentTimestamp timestamp;
- timestamp.monotonic_event_time = delivery_timestamp.monotonic_event_time;
- timestamp.realtime_event_time = delivery_timestamp.realtime_event_time;
- timestamp.queue_index = delivery_timestamp.queue_index;
+ timestamp.monotonic_event_time = timestamped_message.monotonic_event_time;
+ timestamp.realtime_event_time = timestamped_message.realtime_event_time;
+ timestamp.queue_index = timestamped_message.queue_index;
timestamp.actual_queue_index = sender->sent_queue_index();
- queue_index_map_[channel_index]->emplace_back(timestamp);
- } else if (remote_timestamp_senders_[channel_index] != nullptr) {
+ queue_index_map_[timestamped_message.channel_index]->emplace_back(
+ timestamp);
+ } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
+ nullptr) {
aos::Sender<MessageHeader>::Builder builder =
- remote_timestamp_senders_[channel_index]->MakeBuilder();
+ remote_timestamp_senders_[timestamped_message.channel_index]
+ ->MakeBuilder();
logger::MessageHeader::Builder message_header_builder =
builder.MakeBuilder<logger::MessageHeader>();
message_header_builder.add_channel_index(
- factory_channel_index_[channel_index]);
+ factory_channel_index_[timestamped_message.channel_index]);
// Swap the remote and sent metrics. They are from the sender's
// perspective, not the receiver's perspective.
@@ -1870,9 +1890,9 @@
message_header_builder.add_queue_index(sender->sent_queue_index());
message_header_builder.add_monotonic_remote_time(
- delivery_timestamp.monotonic_remote_time.time_since_epoch().count());
+ timestamped_message.monotonic_remote_time.time_since_epoch().count());
message_header_builder.add_realtime_remote_time(
- delivery_timestamp.realtime_remote_time.time_since_epoch().count());
+ timestamped_message.realtime_remote_time.time_since_epoch().count());
message_header_builder.add_remote_queue_index(remote_queue_index);
@@ -1899,28 +1919,23 @@
return &(sender->second);
}
-std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>>
-LogReader::State::PopOldest(bool *update_time) {
+TimestampedMessage LogReader::State::PopOldest(bool *update_time) {
CHECK_GT(sorted_messages_.size(), 0u);
- std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>,
- message_bridge::NoncausalOffsetEstimator *>
+ std::tuple<TimestampedMessage, message_bridge::NoncausalOffsetEstimator *>
result = std::move(sorted_messages_.front());
VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
<< std::get<0>(result).monotonic_event_time;
sorted_messages_.pop_front();
SeedSortedMessages();
- if (std::get<3>(result) != nullptr) {
- *update_time = std::get<3>(result)->Pop(
+ if (std::get<1>(result) != nullptr) {
+ *update_time = std::get<1>(result)->Pop(
event_loop_->node(), std::get<0>(result).monotonic_event_time);
} else {
*update_time = false;
}
- return std::make_tuple(std::get<0>(result), std::get<1>(result),
- std::move(std::get<2>(result)));
+ return std::move(std::get<0>(result));
}
monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
@@ -1930,18 +1945,25 @@
return std::get<0>(sorted_messages_.front()).monotonic_event_time;
}
- return channel_merger_->OldestMessageTime();
+ TimestampedMessage *m =
+ timestamp_mapper_ ? timestamp_mapper_->Front() : nullptr;
+ if (m == nullptr) {
+ return monotonic_clock::max_time;
+ }
+ return m->monotonic_event_time;
}
void LogReader::State::SeedSortedMessages() {
+ if (!timestamp_mapper_) return;
const aos::monotonic_clock::time_point end_queue_time =
(sorted_messages_.size() > 0
? std::get<0>(sorted_messages_.front()).monotonic_event_time
- : channel_merger_->monotonic_start_time()) +
+ : timestamp_mapper_->monotonic_start_time()) +
std::chrono::seconds(2);
while (true) {
- if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
+ TimestampedMessage *m = timestamp_mapper_->Front();
+ if (m == nullptr) {
return;
}
if (sorted_messages_.size() > 0) {
@@ -1953,31 +1975,25 @@
}
}
- TimestampMerger::DeliveryTimestamp channel_timestamp;
- int channel_index;
- SizePrefixedFlatbufferVector<MessageHeader> channel_data =
- SizePrefixedFlatbufferVector<MessageHeader>::Empty();
-
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
- std::tie(channel_timestamp, channel_index, channel_data) =
- channel_merger_->PopOldest();
+ TimestampedMessage timestamped_message = std::move(*m);
+ timestamp_mapper_->PopFront();
// Skip any messages without forwarding information.
- if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
+ if (timestamped_message.monotonic_remote_time != monotonic_clock::min_time) {
// Got a forwarding timestamp!
- filter = filters_[channel_index];
+ filter = filters_[timestamped_message.channel_index];
CHECK(filter != nullptr);
// Call the correct method depending on if we are the forward or
// reverse direction here.
filter->Sample(event_loop_->node(),
- channel_timestamp.monotonic_event_time,
- channel_timestamp.monotonic_remote_time);
+ timestamped_message.monotonic_event_time,
+ timestamped_message.monotonic_remote_time);
}
- sorted_messages_.emplace_back(channel_timestamp, channel_index,
- std::move(channel_data), filter);
+ sorted_messages_.emplace_back(std::move(timestamped_message), filter);
}
}
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index f6a037b..f0f0a69 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -319,24 +319,10 @@
// pass it in here. It must provide all the channels that the original logged
// config did.
//
- // Log filenames are in the following format:
- //
- // {
- // {log1_part0, log1_part1, ...},
- // {log2}
- // }
- // The inner vector is a list of log file chunks which form up a log file.
- // The outer vector is a list of log files with subsets of the messages, or
- // messages from different nodes.
- //
- // If the outer vector isn't provided, it is assumed to be of size 1.
+ // The single file constructor calls SortParts internally.
LogReader(std::string_view filename,
const Configuration *replay_configuration = nullptr);
- LogReader(const std::vector<std::string> &filenames,
- const Configuration *replay_configuration = nullptr);
- LogReader(const std::vector<std::vector<std::string>> &filenames,
- const Configuration *replay_configuration = nullptr);
- LogReader(const std::vector<LogFile> &log_files,
+ LogReader(std::vector<LogFile> log_files,
const Configuration *replay_configuration = nullptr);
~LogReader();
@@ -450,7 +436,7 @@
: logged_configuration()->nodes()->size();
}
- const std::vector<std::vector<std::string>> filenames_;
+ const std::vector<LogFile> log_files_;
// This is *a* log file header used to provide the logged config. The rest of
// the header is likely distracting.
@@ -466,14 +452,15 @@
// State per node.
class State {
public:
- State(std::unique_ptr<ChannelMerger> channel_merger);
+ State(std::unique_ptr<TimestampMapper> timestamp_mapper);
+
+ // Connects up the timestamp mappers.
+ void AddPeer(State *peer);
// Returns the timestamps, channel_index, and message from a channel.
// update_time (will be) set to true when popping this message causes the
// filter to change the time offset estimation function.
- std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopOldest(bool *update_time);
+ TimestampedMessage PopOldest(bool *update_time);
// Returns the monotonic time of the oldest message.
monotonic_clock::time_point OldestMessageTime() const;
@@ -484,10 +471,12 @@
// Returns the starting time for this node.
monotonic_clock::time_point monotonic_start_time() const {
- return channel_merger_->monotonic_start_time();
+ return timestamp_mapper_ ? timestamp_mapper_->monotonic_start_time()
+ : monotonic_clock::min_time;
}
realtime_clock::time_point realtime_start_time() const {
- return channel_merger_->realtime_start_time();
+ return timestamp_mapper_ ? timestamp_mapper_->realtime_start_time()
+ : realtime_clock::min_time;
}
// Sets the node event loop factory for replaying into a
@@ -555,10 +544,6 @@
return node_event_loop_factory_->monotonic_now();
}
- // Sets the node we will be merging as, and returns true if there is any
- // data on it.
- bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
-
// Sets the number of channels.
void SetChannelCount(size_t count);
@@ -570,7 +555,9 @@
State *source_state);
// Returns if we have read all the messages from all the logs.
- bool at_end() const { return channel_merger_->at_end(); }
+ bool at_end() const {
+ return timestamp_mapper_ ? timestamp_mapper_->Front() == nullptr : true;
+ }
// Unregisters everything so we can destory the event loop.
void Deregister();
@@ -586,8 +573,7 @@
}
// Sends a buffer on the provided channel index.
- bool Send(size_t channel_index, const void *data, size_t size,
- const TimestampMerger::DeliveryTimestamp &delivery_timestamp);
+ bool Send(const TimestampedMessage ×tamped_message);
// Returns a debug string for the channel merger.
std::string DebugString() const {
@@ -599,22 +585,24 @@
<< "]: " << std::get<0>(message).monotonic_event_time << " "
<< configuration::StrippedChannelToString(
event_loop_->configuration()->channels()->Get(
- std::get<2>(message).message().channel_index()))
+ std::get<0>(message).channel_index))
<< "\n";
} else if (i == 7) {
messages << "...\n";
}
++i;
}
- return messages.str() + channel_merger_->DebugString();
+ if (!timestamp_mapper_) {
+ return messages.str();
+ }
+ return messages.str() + timestamp_mapper_->DebugString();
}
private:
// Log file.
- std::unique_ptr<ChannelMerger> channel_merger_;
+ std::unique_ptr<TimestampMapper> timestamp_mapper_;
- std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>,
+ std::deque<std::tuple<TimestampedMessage,
message_bridge::NoncausalOffsetEstimator *>>
sorted_messages_;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index dbc8a78..f2969ad 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -279,8 +279,7 @@
// Even though it doesn't make any difference here, exercise the logic for
// passing in a separate config.
- LogReader reader(std::vector<std::string>{logfile0, logfile1},
- &config_.message());
+ LogReader reader(SortParts({logfile0, logfile1}), &config_.message());
// Confirm that we can remap logged channels to point to new buses.
reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
@@ -762,7 +761,7 @@
"/pi2/aos", "aos.message_bridge.Timestamp", 190)));
}
- LogReader reader(structured_logfiles_);
+ LogReader reader(SortParts(logfiles_));
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -938,7 +937,8 @@
}
)");
- EXPECT_DEATH(LogReader(structured_logfiles_, &extra_nodes_config.message()),
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
"Log file and replay config need to have matching nodes lists.");
}
@@ -960,7 +960,7 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(structured_logfiles_);
+ LogReader reader(SortParts(logfiles_));
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -1100,7 +1100,7 @@
event_loop_factory_.RunFor(chrono::milliseconds(400));
}
- LogReader reader(structured_logfiles_);
+ LogReader reader(SortParts(logfiles_));
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -1318,7 +1318,7 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(structured_logfiles_);
+ LogReader reader(SortParts(logfiles_));
// Remap just on pi1.
reader.RemapLoggedChannel<aos::timing::Report>(
@@ -1384,7 +1384,7 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(structured_logfiles_);
+ LogReader reader(SortParts(logfiles_));
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));