Merge "Support running logger_test under gdb"
diff --git a/WORKSPACE b/WORKSPACE
index a8c8a19..464d2e2 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -652,8 +652,8 @@
http_file(
name = "drivetrain_replay",
downloaded_file_path = "spinning_wheels_while_still.bfbs",
- sha256 = "d724dbf0acae894b30c9bb62006a2633a7f4c478db48548e76cbec03cbb07f46",
- urls = ["https://www.frc971.org/Build-Dependencies/spinning_wheels_while_still3.bfbs"],
+ sha256 = "8abe3bbf7ac7a3ab37ad8a313ec22fc244899d916f5e9037100b02e242f5fb45",
+ urls = ["https://www.frc971.org/Build-Dependencies/spinning_wheels_while_still4.bfbs"],
)
# OpenCV armhf (for raspberry pi)
diff --git a/aos/BUILD b/aos/BUILD
index 2761cbf..9a62e66 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -547,3 +547,16 @@
"//aos/testing:googletest",
],
)
+
+cc_test(
+ name = "flatbuffers_test",
+ srcs = [
+ "flatbuffers_test.cc",
+ ],
+ deps = [
+ ":flatbuffers",
+ ":json_to_flatbuffer",
+ ":json_to_flatbuffer_flatbuffer",
+ "//aos/testing:googletest",
+ ],
+)
diff --git a/aos/events/logging/log_cat.cc b/aos/events/logging/log_cat.cc
index caa2b90..7dabc18 100644
--- a/aos/events/logging/log_cat.cc
+++ b/aos/events/logging/log_cat.cc
@@ -33,6 +33,11 @@
DEFINE_bool(pretty, false,
"If true, pretty print the messages on multiple lines");
+bool EndsWith(std::string_view str, std::string_view ending) {
+ return str.size() >= ending.size() &&
+ str.substr(str.size() - ending.size()) == ending;
+}
+
// Print the flatbuffer out to stdout, both to remove the unnecessary cruft from
// glog and to allow the user to readily redirect just the logged output
// independent of any debugging information on stderr.
@@ -66,7 +71,9 @@
// its not a directory
// it could be a file
// or it could not exist
- files->emplace_back(filename);
+ if (EndsWith(filename, ".bfbs") || EndsWith(filename, ".bfbs.xz")) {
+ files->emplace_back(filename);
+ }
return;
}
diff --git a/aos/events/logging/log_stats.cc b/aos/events/logging/log_stats.cc
index eac8d08..f901056 100644
--- a/aos/events/logging/log_stats.cc
+++ b/aos/events/logging/log_stats.cc
@@ -61,7 +61,7 @@
std::vector<ChannelStats> channel_stats;
// Open LogFile
- aos::logger::LogReader reader(FLAGS_logfile);
+ aos::logger::LogReader reader(aos::logger::SortParts({FLAGS_logfile}));
aos::SimulatedEventLoopFactory log_reader_factory(reader.configuration());
reader.Register(&log_reader_factory);
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 08a230c..168230e 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -257,6 +257,33 @@
return result;
}
+std::vector<std::string> FindNodes(const std::vector<LogFile> &parts) {
+ std::set<std::string> nodes;
+ for (const LogFile &log_file : parts) {
+ for (const LogParts& part : log_file.parts) {
+ nodes.insert(part.node);
+ }
+ }
+ std::vector<std::string> node_list;
+ while (!nodes.empty()) {
+ node_list.emplace_back(std::move(nodes.extract(nodes.begin()).value()));
+ }
+ return node_list;
+}
+
+std::vector<LogParts> FilterPartsForNode(const std::vector<LogFile> &parts,
+ std::string_view node) {
+ std::vector<LogParts> result;
+ for (const LogFile &log_file : parts) {
+ for (const LogParts& part : log_file.parts) {
+ if (part.node == node) {
+ result.emplace_back(part);
+ }
+ }
+ }
+ return result;
+}
+
std::ostream &operator<<(std::ostream &stream, const LogFile &file) {
stream << "{";
if (!file.log_event_uuid.empty()) {
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 0d2a0fb..94cb771 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -56,6 +56,12 @@
// Takes a bunch of parts and sorts them based on part_uuid and part_index.
std::vector<LogFile> SortParts(const std::vector<std::string> &parts);
+// Finds all the nodes which have parts logged from their point of view.
+std::vector<std::string> FindNodes(const std::vector<LogFile> &parts);
+// Finds all the parts which are from the point of view of a single node.
+std::vector<LogParts> FilterPartsForNode(const std::vector<LogFile> &parts,
+ std::string_view node);
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 330c78e..9316795 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -442,8 +442,15 @@
newest_timestamp_ = message_reader_.newest_timestamp();
const monotonic_clock::time_point monotonic_sent_time(
chrono::nanoseconds(message->message().monotonic_sent_time()));
- CHECK_GE(monotonic_sent_time,
- newest_timestamp_ - max_out_of_order_duration());
+ // TODO(austin): Does this work with startup? Might need to use the start
+ // time.
+ // TODO(austin): Does this work with startup when we don't know the remote
+ // start time too? Look at one of those logs to compare.
+ if (monotonic_sent_time > parts_.monotonic_start_time) {
+ CHECK_GE(monotonic_sent_time,
+ newest_timestamp_ - max_out_of_order_duration())
+ << ": Max out of order exceeded. " << parts_;
+ }
return message;
}
NextLog();
@@ -461,1172 +468,496 @@
++next_part_index_;
}
-SplitMessageReader::SplitMessageReader(
- const std::vector<std::string> &filenames)
- : filenames_(filenames),
- log_file_header_(SizePrefixedFlatbufferVector<LogFileHeader>::Empty()) {
- CHECK(NextLogFile()) << ": filenames is empty. Need files to read.";
-
- // Grab any log file header. They should all match (and we will check as we
- // open more of them).
- log_file_header_ = message_reader_->raw_log_file_header();
-
- for (size_t i = 1; i < filenames_.size(); ++i) {
- MessageReader message_reader(filenames_[i]);
-
- const monotonic_clock::time_point new_monotonic_start_time(
- chrono::nanoseconds(
- message_reader.log_file_header()->monotonic_start_time()));
- const realtime_clock::time_point new_realtime_start_time(
- chrono::nanoseconds(
- message_reader.log_file_header()->realtime_start_time()));
-
- // There are 2 types of part files. Part files from before time estimation
- // has started, and part files after. We don't declare a log file "started"
- // until time estimation is up. And once a log file starts, it should never
- // stop again, and should remain constant.
- // To compare both types of headers, we mutate our saved copy of the header
- // to match the next chunk by updating time if we detect a stopped ->
- // started transition.
- if (monotonic_start_time() == monotonic_clock::min_time) {
- CHECK_EQ(realtime_start_time(), realtime_clock::min_time);
- // We should only be missing the monotonic start time when logging data
- // for remote nodes. We don't have a good way to determine the remote
- // realtime offset, so it shouldn't be filled out.
- // TODO(austin): If we have a good way, feel free to fill it out. It
- // probably won't be better than we could do in post though with the same
- // data.
- CHECK(!log_file_header_.mutable_message()->has_realtime_start_time());
- if (new_monotonic_start_time != monotonic_clock::min_time) {
- // If we finally found our start time, update the header. Do this once
- // because it should never change again.
- log_file_header_.mutable_message()->mutate_monotonic_start_time(
- new_monotonic_start_time.time_since_epoch().count());
- log_file_header_.mutable_message()->mutate_realtime_start_time(
- new_realtime_start_time.time_since_epoch().count());
- }
- }
-
- // We don't have a good way to set the realtime start time on remote nodes.
- // Confirm it remains consistent.
- CHECK_EQ(log_file_header_.mutable_message()->has_realtime_start_time(),
- message_reader.log_file_header()->has_realtime_start_time());
-
- // Parts index will *not* match unless we set them to match. We only want
- // to accept the start time and parts mismatching, so set them.
- log_file_header_.mutable_message()->mutate_parts_index(
- message_reader.log_file_header()->parts_index());
-
- // Now compare that the headers match.
- if (!CompareFlatBuffer(message_reader.raw_log_file_header(),
- log_file_header_)) {
- if (message_reader.log_file_header()->has_log_event_uuid() &&
- log_file_header_.message().has_log_event_uuid() &&
- message_reader.log_file_header()->log_event_uuid()->string_view() !=
- log_file_header_.message().log_event_uuid()->string_view()) {
- LOG(FATAL) << "Logger UUIDs don't match between log file chunks "
- << filenames_[0] << " and " << filenames_[i]
- << ", this is not supported.";
- }
- if (message_reader.log_file_header()->has_parts_uuid() &&
- log_file_header_.message().has_parts_uuid() &&
- message_reader.log_file_header()->parts_uuid()->string_view() !=
- log_file_header_.message().parts_uuid()->string_view()) {
- LOG(FATAL) << "Parts UUIDs don't match between log file chunks "
- << filenames_[0] << " and " << filenames_[i]
- << ", this is not supported.";
- }
-
- LOG(FATAL) << "Header is different between log file chunks "
- << filenames_[0] << " and " << filenames_[i]
- << ", this is not supported.";
- }
- }
- // Put the parts index back to the first log file chunk.
- log_file_header_.mutable_message()->mutate_parts_index(
- message_reader_->log_file_header()->parts_index());
-
- // Setup per channel state.
- channels_.resize(configuration()->channels()->size());
- for (ChannelData &channel_data : channels_) {
- channel_data.data.split_reader = this;
- // Build up the timestamp list.
- if (configuration::MultiNode(configuration())) {
- channel_data.timestamps.resize(configuration()->nodes()->size());
- for (MessageHeaderQueue &queue : channel_data.timestamps) {
- queue.timestamps = true;
- queue.split_reader = this;
- }
- }
- }
-
- // Build up channels_to_write_ as an optimization to make it fast to figure
- // out which datastructure to place any new data from a channel on.
- for (const Channel *channel : *configuration()->channels()) {
- // This is the main case. We will only see data on this node.
- if (configuration::ChannelIsSendableOnNode(channel, node())) {
- channels_to_write_.emplace_back(
- &channels_[channels_to_write_.size()].data);
- } else
- // If we can't send, but can receive, we should be able to see
- // timestamps here.
- if (configuration::ChannelIsReadableOnNode(channel, node())) {
- channels_to_write_.emplace_back(
- &(channels_[channels_to_write_.size()]
- .timestamps[configuration::GetNodeIndex(configuration(),
- node())]));
- } else {
- channels_to_write_.emplace_back(nullptr);
- }
- }
-}
-
-bool SplitMessageReader::NextLogFile() {
- if (next_filename_index_ == filenames_.size()) {
- return false;
- }
- message_reader_ =
- std::make_unique<MessageReader>(filenames_[next_filename_index_]);
-
- // We can't support the config diverging between two log file headers. See if
- // they are the same.
- if (next_filename_index_ != 0) {
- // In order for the headers to identically compare, they need to have the
- // same parts_index. Rewrite the saved header with the new parts_index,
- // compare, and then restore.
- const int32_t original_parts_index =
- log_file_header_.message().parts_index();
- log_file_header_.mutable_message()->mutate_parts_index(
- message_reader_->log_file_header()->parts_index());
-
- CHECK(CompareFlatBuffer(message_reader_->raw_log_file_header(),
- log_file_header_))
- << ": Header is different between log file chunks "
- << filenames_[next_filename_index_] << " and "
- << filenames_[next_filename_index_ - 1] << ", this is not supported.";
-
- log_file_header_.mutable_message()->mutate_parts_index(
- original_parts_index);
- }
-
- ++next_filename_index_;
- return true;
-}
-
-bool SplitMessageReader::QueueMessages(
- monotonic_clock::time_point last_dequeued_time) {
- // TODO(austin): Once we are happy that everything works, read a 256kb chunk
- // to reduce the need to re-heap down below.
-
- // Special case no more data. Otherwise we blow up on the CHECK statement
- // confirming that we have enough data queued.
- if (at_end_) {
- return false;
- }
-
- // If this isn't the first time around, confirm that we had enough data queued
- // to follow the contract.
- if (time_to_queue_ != monotonic_clock::min_time) {
- CHECK_LE(last_dequeued_time,
- newest_timestamp() - max_out_of_order_duration())
- << " node " << FlatbufferToJson(node()) << " on " << this;
-
- // Bail if there is enough data already queued.
- if (last_dequeued_time < time_to_queue_) {
- VLOG(1) << MaybeNodeName(target_node_) << "All up to date on " << this
- << ", dequeued " << last_dequeued_time << " queue time "
- << time_to_queue_;
- return true;
- }
- } else {
- // Startup takes a special dance. We want to queue up until the start time,
- // but we then want to find the next message to read. The conservative
- // answer is to immediately trigger a second requeue to get things moving.
- time_to_queue_ = monotonic_start_time();
- CHECK_NE(time_to_queue_, monotonic_clock::min_time);
- QueueMessages(time_to_queue_);
- }
-
- // If we are asked to queue, queue for at least max_out_of_order_duration past
- // the last known time in the log file (ie the newest timestep read). As long
- // as we requeue exactly when time_to_queue_ is dequeued and go no further, we
- // are safe. And since we pop in order, that works.
- //
- // Special case the start of the log file. There should be at most 1 message
- // from each channel at the start of the log file. So always force the start
- // of the log file to just be read.
- time_to_queue_ = std::max(time_to_queue_, newest_timestamp());
- VLOG(1) << MaybeNodeName(target_node_) << "Queueing, going until "
- << time_to_queue_ << " " << filename();
-
- bool was_emplaced = false;
- while (true) {
- // Stop if we have enough.
- if (newest_timestamp() > time_to_queue_ + max_out_of_order_duration() &&
- was_emplaced) {
- VLOG(1) << MaybeNodeName(target_node_) << "Done queueing on " << this
- << ", queued to " << newest_timestamp() << " with requeue time "
- << time_to_queue_;
- return true;
- }
-
- if (std::optional<SizePrefixedFlatbufferVector<MessageHeader>> msg =
- message_reader_->ReadMessage()) {
- const MessageHeader &header = msg.value().message();
-
- const monotonic_clock::time_point timestamp = monotonic_clock::time_point(
- chrono::nanoseconds(header.monotonic_sent_time()));
-
- if (VLOG_IS_ON(2)) {
- LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
- << filename() << " ttq: " << time_to_queue_ << " now "
- << newest_timestamp() << " start time "
- << monotonic_start_time() << " " << FlatbufferToJson(&header);
- } else if (VLOG_IS_ON(1)) {
- SizePrefixedFlatbufferVector<MessageHeader> copy = msg.value();
- copy.mutable_message()->clear_data();
- LOG(INFO) << MaybeNodeName(target_node_) << "Queued " << this << " "
- << filename() << " ttq: " << time_to_queue_ << " now "
- << newest_timestamp() << " start time "
- << monotonic_start_time() << " " << FlatbufferToJson(copy);
- }
-
- const int channel_index = header.channel_index();
- was_emplaced = channels_to_write_[channel_index]->emplace_back(
- std::move(msg.value()));
- if (was_emplaced) {
- newest_timestamp_ = std::max(newest_timestamp_, timestamp);
- }
- } else {
- if (!NextLogFile()) {
- VLOG(1) << MaybeNodeName(target_node_) << "No more files, last was "
- << filenames_.back();
- at_end_ = true;
- for (MessageHeaderQueue *queue : channels_to_write_) {
- if (queue == nullptr || queue->timestamp_merger == nullptr) {
- continue;
- }
- queue->timestamp_merger->NoticeAtEnd();
- }
- return false;
- }
- }
- }
-}
-
-void SplitMessageReader::SetTimestampMerger(TimestampMerger *timestamp_merger,
- int channel_index,
- const Node *target_node) {
- const Node *reinterpreted_target_node =
- configuration::GetNodeOrDie(configuration(), target_node);
- target_node_ = reinterpreted_target_node;
-
- const Channel *const channel =
- configuration()->channels()->Get(channel_index);
-
- VLOG(1) << " Configuring merger " << this << " for channel " << channel_index
- << " "
- << configuration::CleanedChannelToString(
- configuration()->channels()->Get(channel_index));
-
- MessageHeaderQueue *message_header_queue = nullptr;
-
- // Figure out if this log file is from our point of view, or the other node's
- // point of view.
- if (node() == reinterpreted_target_node) {
- VLOG(1) << " Replaying as logged node " << filename();
-
- if (configuration::ChannelIsSendableOnNode(channel, node())) {
- VLOG(1) << " Data on node";
- message_header_queue = &(channels_[channel_index].data);
- } else if (configuration::ChannelIsReadableOnNode(channel, node())) {
- VLOG(1) << " Timestamps on node";
- message_header_queue =
- &(channels_[channel_index].timestamps[configuration::GetNodeIndex(
- configuration(), node())]);
- } else {
- VLOG(1) << " Dropping";
- }
- } else {
- VLOG(1) << " Replaying as other node " << filename();
- // We are replaying from another node's point of view. The only interesting
- // data is data that is sent from our node and received on theirs.
- if (configuration::ChannelIsReadableOnNode(channel,
- reinterpreted_target_node) &&
- configuration::ChannelIsSendableOnNode(channel, node())) {
- VLOG(1) << " Readable on target node";
- // Data from another node.
- message_header_queue = &(channels_[channel_index].data);
- } else {
- VLOG(1) << " Dropping";
- // This is either not sendable on the other node, or is a timestamp and
- // therefore not interesting.
- }
- }
-
- // If we found one, write it down. This will be nullptr when there is nothing
- // relevant on this channel on this node for the target node. In that case,
- // we want to drop the message instead of queueing it.
- if (message_header_queue != nullptr) {
- message_header_queue->timestamp_merger = timestamp_merger;
- }
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
-SplitMessageReader::PopOldest(int channel_index) {
- CHECK_GT(channels_[channel_index].data.size(), 0u);
- const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp = channels_[channel_index].data.front_timestamp();
- SizePrefixedFlatbufferVector<MessageHeader> front =
- std::move(channels_[channel_index].data.front());
- channels_[channel_index].data.PopFront();
-
- VLOG(1) << MaybeNodeName(target_node_) << "Popped Data " << this << " "
- << std::get<0>(timestamp) << " for "
- << configuration::StrippedChannelToString(
- configuration()->channels()->Get(channel_index))
- << " (" << channel_index << ")";
-
- QueueMessages(std::get<0>(timestamp));
-
- return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
- std::move(front));
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
-SplitMessageReader::PopOldestTimestamp(int channel, int node_index) {
- CHECK_GT(channels_[channel].timestamps[node_index].size(), 0u);
- const std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp = channels_[channel].timestamps[node_index].front_timestamp();
- SizePrefixedFlatbufferVector<MessageHeader> front =
- std::move(channels_[channel].timestamps[node_index].front());
- channels_[channel].timestamps[node_index].PopFront();
-
- VLOG(1) << MaybeNodeName(target_node_) << "Popped timestamp " << this << " "
- << std::get<0>(timestamp) << " for "
- << configuration::StrippedChannelToString(
- configuration()->channels()->Get(channel))
- << " on "
- << configuration()->nodes()->Get(node_index)->name()->string_view()
- << " (" << node_index << ")";
-
- QueueMessages(std::get<0>(timestamp));
-
- return std::make_tuple(std::get<0>(timestamp), std::get<1>(timestamp),
- std::move(front));
-}
-
-bool SplitMessageReader::MessageHeaderQueue::emplace_back(
- SizePrefixedFlatbufferVector<MessageHeader> &&msg) {
- CHECK(split_reader != nullptr);
-
- // If there is no timestamp merger for this queue, nobody is listening. Drop
- // the message. This happens when a log file from another node is replayed,
- // and the timestamp mergers down stream just don't care.
- if (timestamp_merger == nullptr) {
- return false;
- }
-
- CHECK(timestamps != msg.message().has_data())
- << ": Got timestamps and data mixed up on a node. "
- << FlatbufferToJson(msg);
-
- data_.emplace_back(std::move(msg));
-
- if (data_.size() == 1u) {
- // Yup, new data. Notify.
- if (timestamps) {
- timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
- } else {
- timestamp_merger->Update(split_reader, front_timestamp());
- }
- }
-
- return true;
-}
-
-void SplitMessageReader::MessageHeaderQueue::PopFront() {
- data_.pop_front();
- if (data_.size() != 0u) {
- // Yup, new data.
- if (timestamps) {
- timestamp_merger->UpdateTimestamp(split_reader, front_timestamp());
- } else {
- timestamp_merger->Update(split_reader, front_timestamp());
- }
- } else {
- // Poke anyways to update the heap.
- if (timestamps) {
- timestamp_merger->UpdateTimestamp(
- nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
- } else {
- timestamp_merger->Update(
- nullptr, std::make_tuple(monotonic_clock::min_time, 0, nullptr));
- }
- }
-}
-
-namespace {
-
-bool SplitMessageReaderHeapCompare(
- const std::tuple<monotonic_clock::time_point, uint32_t,
- SplitMessageReader *>
- first,
- const std::tuple<monotonic_clock::time_point, uint32_t,
- SplitMessageReader *>
- second) {
- if (std::get<0>(first) > std::get<0>(second)) {
+bool Message::operator<(const Message &m2) const {
+ if (this->timestamp < m2.timestamp) {
return true;
- } else if (std::get<0>(first) == std::get<0>(second)) {
- if (std::get<1>(first) > std::get<1>(second)) {
- return true;
- } else if (std::get<1>(first) == std::get<1>(second)) {
- return std::get<2>(first) > std::get<2>(second);
- } else {
- return false;
- }
- } else {
+ } else if (this->timestamp > m2.timestamp) {
return false;
}
-}
-bool ChannelHeapCompare(
- const std::pair<monotonic_clock::time_point, int> first,
- const std::pair<monotonic_clock::time_point, int> second) {
- if (first.first > second.first) {
+ if (this->channel_index < m2.channel_index) {
return true;
- } else if (first.first == second.first) {
- return first.second > second.second;
- } else {
+ } else if (this->channel_index > m2.channel_index) {
return false;
}
+
+ return this->queue_index < m2.queue_index;
}
-} // namespace
-
-TimestampMerger::TimestampMerger(
- const Configuration *configuration,
- std::vector<SplitMessageReader *> split_message_readers, int channel_index,
- const Node *target_node, ChannelMerger *channel_merger)
- : configuration_(configuration),
- split_message_readers_(std::move(split_message_readers)),
- channel_index_(channel_index),
- node_index_(configuration::MultiNode(configuration)
- ? configuration::GetNodeIndex(configuration, target_node)
- : -1),
- channel_merger_(channel_merger) {
- // Tell the readers we care so they know who to notify.
- VLOG(1) << "Configuring channel " << channel_index << " target node "
- << FlatbufferToJson(target_node);
- for (SplitMessageReader *reader : split_message_readers_) {
- reader->SetTimestampMerger(this, channel_index, target_node);
- }
-
- // And then determine if we need to track timestamps.
- const Channel *channel = configuration->channels()->Get(channel_index);
- if (!configuration::ChannelIsSendableOnNode(channel, target_node) &&
- configuration::ChannelIsReadableOnNode(channel, target_node)) {
- has_timestamps_ = true;
- }
+bool Message::operator>=(const Message &m2) const { return !(*this < m2); }
+bool Message::operator==(const Message &m2) const {
+ return timestamp == m2.timestamp && channel_index == m2.channel_index &&
+ queue_index == m2.queue_index;
}
-void TimestampMerger::PushMessageHeap(
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp,
- SplitMessageReader *split_message_reader) {
- if (split_message_reader != nullptr) {
- DCHECK(std::find_if(message_heap_.begin(), message_heap_.end(),
- [split_message_reader](
- const std::tuple<monotonic_clock::time_point,
- uint32_t, SplitMessageReader *>
- x) {
- return std::get<2>(x) == split_message_reader;
- }) == message_heap_.end())
- << ": Pushing message when it is already in the heap.";
-
- message_heap_.push_back(std::make_tuple(
- std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
-
- std::push_heap(message_heap_.begin(), message_heap_.end(),
- &SplitMessageReaderHeapCompare);
+std::ostream &operator<<(std::ostream &os, const Message &m) {
+ os << "{.channel_index=" << m.channel_index
+ << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
+ if (m.data.Verify()) {
+ os << ", .data="
+ << aos::FlatbufferToJson(m.data,
+ {.multi_line = false, .max_vector_size = 1});
}
-
- // If we are just a data merger, don't wait for timestamps.
- if (!has_timestamps_) {
- if (!message_heap_.empty()) {
- channel_merger_->Update(std::get<0>(message_heap_[0]), channel_index_);
- pushed_ = true;
- } else {
- // Remove ourselves if we are empty.
- channel_merger_->Update(monotonic_clock::min_time, channel_index_);
- }
- }
+ os << "}";
+ return os;
}
-std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-TimestampMerger::oldest_message() const {
- CHECK_GT(message_heap_.size(), 0u);
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_message_reader = message_heap_.front();
- return std::get<2>(oldest_message_reader)->oldest_message(channel_index_);
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
-TimestampMerger::oldest_timestamp() const {
- CHECK_GT(timestamp_heap_.size(), 0u);
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_message_reader = timestamp_heap_.front();
- return std::get<2>(oldest_message_reader)
- ->oldest_message(channel_index_, node_index_);
-}
-
-void TimestampMerger::PushTimestampHeap(
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp,
- SplitMessageReader *split_message_reader) {
- if (split_message_reader != nullptr) {
- DCHECK(std::find_if(timestamp_heap_.begin(), timestamp_heap_.end(),
- [split_message_reader](
- const std::tuple<monotonic_clock::time_point,
- uint32_t, SplitMessageReader *>
- x) {
- return std::get<2>(x) == split_message_reader;
- }) == timestamp_heap_.end())
- << ": Pushing timestamp when it is already in the heap.";
-
- timestamp_heap_.push_back(std::make_tuple(
- std::get<0>(timestamp), std::get<1>(timestamp), split_message_reader));
-
- std::push_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- SplitMessageReaderHeapCompare);
+std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
+ os << "{.channel_index=" << m.channel_index
+ << ", .queue_index=" << m.queue_index
+ << ", .monotonic_event_time=" << m.monotonic_event_time
+ << ", .realtime_event_time=" << m.realtime_event_time;
+ if (m.remote_queue_index != 0xffffffff) {
+ os << ", .remote_queue_index=" << m.remote_queue_index;
}
-
- // If we are a timestamp merger, don't wait for data. Missing data will be
- // caught at read time.
- if (has_timestamps_) {
- if (!timestamp_heap_.empty()) {
- channel_merger_->Update(std::get<0>(timestamp_heap_[0]), channel_index_);
- pushed_ = true;
- } else {
- // Remove ourselves if we are empty.
- channel_merger_->Update(monotonic_clock::min_time, channel_index_);
- }
+ if (m.monotonic_remote_time != monotonic_clock::min_time) {
+ os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
}
+ if (m.realtime_remote_time != realtime_clock::min_time) {
+ os << ", .realtime_remote_time=" << m.realtime_remote_time;
+ }
+ if (m.data.Verify()) {
+ os << ", .data="
+ << aos::FlatbufferToJson(m.data,
+ {.multi_line = false, .max_vector_size = 1});
+ }
+ os << "}";
+ return os;
}
-std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
-TimestampMerger::PopMessageHeap() {
- // Pop the oldest message reader pointer off the heap.
- CHECK_GT(message_heap_.size(), 0u);
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_message_reader = message_heap_.front();
+LogPartsSorter::LogPartsSorter(LogParts log_parts)
+ : parts_message_reader_(log_parts) {}
- std::pop_heap(message_heap_.begin(), message_heap_.end(),
- &SplitMessageReaderHeapCompare);
- message_heap_.pop_back();
-
- // Pop the oldest message. This re-pushes any messages from the reader to the
- // message heap.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- oldest_message =
- std::get<2>(oldest_message_reader)->PopOldest(channel_index_);
-
- // Confirm that the time and queue_index we have recorded matches.
- CHECK_EQ(std::get<0>(oldest_message), std::get<0>(oldest_message_reader));
- CHECK_EQ(std::get<1>(oldest_message), std::get<1>(oldest_message_reader));
-
- // Now, keep reading until we have found all duplicates.
- while (!message_heap_.empty()) {
- // See if it is a duplicate.
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- next_oldest_message_reader = message_heap_.front();
-
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- next_oldest_message_time = std::get<2>(next_oldest_message_reader)
- ->oldest_message(channel_index_);
-
- if (std::get<0>(next_oldest_message_time) == std::get<0>(oldest_message) &&
- std::get<1>(next_oldest_message_time) == std::get<1>(oldest_message)) {
- // Pop the message reader pointer.
- std::pop_heap(message_heap_.begin(), message_heap_.end(),
- &SplitMessageReaderHeapCompare);
- message_heap_.pop_back();
-
- // Pop the next oldest message. This re-pushes any messages from the
- // reader.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- next_oldest_message = std::get<2>(next_oldest_message_reader)
- ->PopOldest(channel_index_);
-
- // And make sure the message matches in it's entirety.
- CHECK(std::get<2>(oldest_message).span() ==
- std::get<2>(next_oldest_message).span())
- << ": Data at the same timestamp doesn't match.";
- } else {
- break;
- }
- }
-
- return oldest_message;
-}
-
-std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
-TimestampMerger::PopTimestampHeap() {
- // Pop the oldest message reader pointer off the heap.
- CHECK_GT(timestamp_heap_.size(), 0u);
-
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_timestamp_reader = timestamp_heap_.front();
-
- std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- &SplitMessageReaderHeapCompare);
- timestamp_heap_.pop_back();
-
- CHECK(node_index_ != -1) << ": Timestamps in a single node environment";
-
- // Pop the oldest message. This re-pushes any timestamps from the reader to
- // the timestamp heap.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- oldest_timestamp = std::get<2>(oldest_timestamp_reader)
- ->PopOldestTimestamp(channel_index_, node_index_);
-
- // Confirm that the time we have recorded matches.
- CHECK_EQ(std::get<0>(oldest_timestamp), std::get<0>(oldest_timestamp_reader));
- CHECK_EQ(std::get<1>(oldest_timestamp), std::get<1>(oldest_timestamp_reader));
-
- // Now, keep reading until we have found all duplicates.
- while (!timestamp_heap_.empty()) {
- // See if it is a duplicate.
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- next_oldest_timestamp_reader = timestamp_heap_.front();
-
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- next_oldest_timestamp_time =
- std::get<2>(next_oldest_timestamp_reader)
- ->oldest_message(channel_index_, node_index_);
-
- if (std::get<0>(next_oldest_timestamp_time) ==
- std::get<0>(oldest_timestamp) &&
- std::get<1>(next_oldest_timestamp_time) ==
- std::get<1>(oldest_timestamp)) {
- // Pop the timestamp reader pointer.
- std::pop_heap(timestamp_heap_.begin(), timestamp_heap_.end(),
- &SplitMessageReaderHeapCompare);
- timestamp_heap_.pop_back();
-
- // Pop the next oldest timestamp. This re-pushes any messages from the
- // reader.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- next_oldest_timestamp =
- std::get<2>(next_oldest_timestamp_reader)
- ->PopOldestTimestamp(channel_index_, node_index_);
-
- // And make sure the contents matches in it's entirety.
- CHECK(std::get<2>(oldest_timestamp).span() ==
- std::get<2>(next_oldest_timestamp).span())
- << ": Data at the same timestamp doesn't match, "
- << aos::FlatbufferToJson(std::get<2>(oldest_timestamp)) << " vs "
- << aos::FlatbufferToJson(std::get<2>(next_oldest_timestamp)) << " "
- << absl::BytesToHexString(std::string_view(
- reinterpret_cast<const char *>(
- std::get<2>(oldest_timestamp).span().data()),
- std::get<2>(oldest_timestamp).span().size()))
- << " vs "
- << absl::BytesToHexString(std::string_view(
- reinterpret_cast<const char *>(
- std::get<2>(next_oldest_timestamp).span().data()),
- std::get<2>(next_oldest_timestamp).span().size()));
-
- } else {
- break;
- }
- }
-
- return oldest_timestamp;
-}
-
-std::tuple<TimestampMerger::DeliveryTimestamp,
- SizePrefixedFlatbufferVector<MessageHeader>>
-TimestampMerger::PopOldest() {
- if (has_timestamps_) {
- VLOG(1) << "Looking for matching timestamp for "
- << configuration::StrippedChannelToString(
- configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ") "
- << " at " << std::get<0>(oldest_timestamp());
-
- // Read the timestamps.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- oldest_timestamp = PopTimestampHeap();
-
- TimestampMerger::DeliveryTimestamp timestamp;
- timestamp.monotonic_event_time =
- monotonic_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp).message().monotonic_sent_time()));
- timestamp.realtime_event_time =
- realtime_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_timestamp).message().realtime_sent_time()));
- timestamp.queue_index =
- std::get<2>(oldest_timestamp).message().queue_index();
-
- // Consistency check.
- CHECK_EQ(timestamp.monotonic_event_time, std::get<0>(oldest_timestamp));
- CHECK_EQ(std::get<2>(oldest_timestamp).message().queue_index(),
- std::get<1>(oldest_timestamp));
-
- monotonic_clock::time_point remote_timestamp_monotonic_time(
- chrono::nanoseconds(
- std::get<2>(oldest_timestamp).message().monotonic_remote_time()));
-
- // See if we have any data. If not, pass the problem up the chain.
- if (message_heap_.empty()) {
- LOG(WARNING) << MaybeNodeName(configuration_->nodes()->Get(node_index_))
- << "No data to match timestamp on "
- << configuration::CleanedChannelToString(
- configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ")";
- return std::make_tuple(timestamp,
- std::move(std::get<2>(oldest_timestamp)));
- }
-
+Message *LogPartsSorter::Front() {
+ // Queue up data until enough data has been queued that the front message is
+ // sorted enough to be safe to pop. This may do nothing, so we should make
+ // sure the nothing path is checked quickly.
+ if (sorted_until() != monotonic_clock::max_time) {
while (true) {
- {
- // Ok, now try grabbing data until we find one which matches.
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message_ref = oldest_message();
-
- // Time at which the message was sent (this message is written from the
- // sending node's perspective.
- monotonic_clock::time_point remote_monotonic_time(chrono::nanoseconds(
- std::get<2>(oldest_message_ref)->monotonic_sent_time()));
-
- if (remote_monotonic_time < remote_timestamp_monotonic_time) {
- LOG(WARNING) << configuration_->nodes()
- ->Get(node_index_)
- ->name()
- ->string_view()
- << " Undelivered message, skipping. Remote time is "
- << remote_monotonic_time << " timestamp is "
- << remote_timestamp_monotonic_time << " on channel "
- << configuration::StrippedChannelToString(
- configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ")";
- PopMessageHeap();
- continue;
- } else if (remote_monotonic_time > remote_timestamp_monotonic_time) {
- LOG(WARNING) << configuration_->nodes()
- ->Get(node_index_)
- ->name()
- ->string_view()
- << " Data not found. Remote time should be "
- << remote_timestamp_monotonic_time
- << ", message time is " << remote_monotonic_time
- << " on channel "
- << configuration::StrippedChannelToString(
- configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ")"
- << (VLOG_IS_ON(1) ? DebugString() : "");
- return std::make_tuple(timestamp,
- std::move(std::get<2>(oldest_timestamp)));
- }
-
- timestamp.monotonic_remote_time = remote_monotonic_time;
+ if (!messages_.empty() && messages_.begin()->timestamp < sorted_until() &&
+ sorted_until() >= monotonic_start_time()) {
+ break;
}
- VLOG(1) << "Found matching data "
- << configuration::StrippedChannelToString(
- configuration_->channels()->Get(channel_index_))
- << " (" << channel_index_ << ")";
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- oldest_message = PopMessageHeap();
+ std::optional<SizePrefixedFlatbufferVector<MessageHeader>> m =
+ parts_message_reader_.ReadMessage();
+ // No data left, sorted forever, work through what is left.
+ if (!m) {
+ sorted_until_ = monotonic_clock::max_time;
+ break;
+ }
- timestamp.realtime_remote_time =
- realtime_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_message).message().realtime_sent_time()));
- timestamp.remote_queue_index =
- std::get<2>(oldest_message).message().queue_index();
+ messages_.insert(
+ {.channel_index = m.value().message().channel_index(),
+ .queue_index = m.value().message().queue_index(),
+ .timestamp = monotonic_clock::time_point(std::chrono::nanoseconds(
+ m.value().message().monotonic_sent_time())),
+ .data = std::move(m.value())});
- CHECK_EQ(timestamp.monotonic_remote_time,
- remote_timestamp_monotonic_time);
-
- CHECK_EQ(timestamp.remote_queue_index,
- std::get<2>(oldest_timestamp).message().remote_queue_index())
- << ": " << FlatbufferToJson(&std::get<2>(oldest_timestamp).message())
- << " data "
- << FlatbufferToJson(&std::get<2>(oldest_message).message());
-
- return std::make_tuple(timestamp, std::move(std::get<2>(oldest_message)));
+ // Now, update sorted_until_ to match the new message.
+ if (parts_message_reader_.newest_timestamp() >
+ monotonic_clock::min_time +
+ parts_message_reader_.max_out_of_order_duration()) {
+ sorted_until_ = parts_message_reader_.newest_timestamp() -
+ parts_message_reader_.max_out_of_order_duration();
+ } else {
+ sorted_until_ = monotonic_clock::min_time;
+ }
}
- } else {
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- oldest_message = PopMessageHeap();
+ }
- TimestampMerger::DeliveryTimestamp timestamp;
- timestamp.monotonic_event_time =
- monotonic_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_message).message().monotonic_sent_time()));
- timestamp.realtime_event_time =
- realtime_clock::time_point(chrono::nanoseconds(
- std::get<2>(oldest_message).message().realtime_sent_time()));
- timestamp.queue_index = std::get<2>(oldest_message).message().queue_index();
- timestamp.remote_queue_index = 0xffffffff;
+ // Now that we have enough data queued, return a pointer to the oldest piece
+ // of data if it exists.
+ if (messages_.empty()) {
+ last_message_time_ = monotonic_clock::max_time;
+ return nullptr;
+ }
- CHECK_EQ(std::get<0>(oldest_message), timestamp.monotonic_event_time);
- CHECK_EQ(std::get<1>(oldest_message),
- std::get<2>(oldest_message).message().queue_index());
+ CHECK_GE(messages_.begin()->timestamp, last_message_time_);
+ last_message_time_ = messages_.begin()->timestamp;
+ return &(*messages_.begin());
+}
- return std::make_tuple(timestamp, std::move(std::get<2>(oldest_message)));
+void LogPartsSorter::PopFront() { messages_.erase(messages_.begin()); }
+
+std::string LogPartsSorter::DebugString() const {
+ std::stringstream ss;
+ ss << "messages: [\n";
+ for (const Message &m : messages_) {
+ ss << m << "\n";
+ }
+ ss << "] <- " << parts_message_reader_.filename();
+ return ss.str();
+}
+
+NodeMerger::NodeMerger(std::vector<LogParts> parts) {
+ CHECK_GE(parts.size(), 1u);
+ const std::string part0_node = parts[0].node;
+ for (size_t i = 1; i < parts.size(); ++i) {
+ CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
+ }
+ for (LogParts &part : parts) {
+ parts_sorters_.emplace_back(std::move(part));
+ }
+
+ node_ = configuration::GetNodeIndex(log_file_header()->configuration(),
+ part0_node);
+
+ monotonic_start_time_ = monotonic_clock::max_time;
+ realtime_start_time_ = realtime_clock::max_time;
+ for (const LogPartsSorter &parts_sorter : parts_sorters_) {
+ if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
+ monotonic_start_time_ = parts_sorter.monotonic_start_time();
+ realtime_start_time_ = parts_sorter.realtime_start_time();
+ }
}
}
-void TimestampMerger::NoticeAtEnd() { channel_merger_->NoticeAtEnd(); }
-
-namespace {
-std::vector<std::unique_ptr<SplitMessageReader>> MakeSplitMessageReaders(
- const std::vector<std::vector<std::string>> &filenames) {
- CHECK_GT(filenames.size(), 0u);
- // Build up all the SplitMessageReaders.
- std::vector<std::unique_ptr<SplitMessageReader>> result;
- for (const std::vector<std::string> &filenames : filenames) {
- result.emplace_back(std::make_unique<SplitMessageReader>(filenames));
+Message *NodeMerger::Front() {
+ // Return the current Front if we have one, otherwise go compute one.
+ if (current_ != nullptr) {
+ Message *result = current_->Front();
+ CHECK_GE(result->timestamp, last_message_time_);
+ return result;
}
+
+ // Otherwise, do a simple search for the oldest message, deduplicating any
+ // duplicates.
+ Message *oldest = nullptr;
+ sorted_until_ = monotonic_clock::max_time;
+ for (LogPartsSorter &parts_sorter : parts_sorters_) {
+ Message *m = parts_sorter.Front();
+ if (!m) {
+ sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
+ continue;
+ }
+ if (oldest == nullptr || *m < *oldest) {
+ oldest = m;
+ current_ = &parts_sorter;
+ } else if (*m == *oldest) {
+ // Found a duplicate. It doesn't matter which one we return. It is
+ // easiest to just drop the new one.
+ parts_sorter.PopFront();
+ }
+
+ // PopFront may change this, so compute it down here.
+ sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
+ }
+
+ if (oldest) {
+ CHECK_GE(oldest->timestamp, last_message_time_);
+ last_message_time_ = oldest->timestamp;
+ } else {
+ last_message_time_ = monotonic_clock::max_time;
+ }
+
+ // Return the oldest message found. This will be nullptr if nothing was
+ // found, indicating there is nothing left.
+ return oldest;
+}
+
+void NodeMerger::PopFront() {
+ CHECK(current_ != nullptr) << "Popping before calling Front()";
+ current_->PopFront();
+ current_ = nullptr;
+}
+
+TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
+ : node_merger_(std::move(parts)),
+ message_{.channel_index = 0xffffffff,
+ .queue_index = 0xffffffff,
+ .monotonic_event_time = monotonic_clock::min_time,
+ .realtime_event_time = realtime_clock::min_time,
+ .remote_queue_index = 0xffffffff,
+ .monotonic_remote_time = monotonic_clock::min_time,
+ .realtime_remote_time = realtime_clock::min_time,
+ .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()} {
+ const Configuration *config = log_file_header()->configuration();
+ // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
+ // pretty simple.
+ if (configuration::MultiNode(config)) {
+ nodes_data_.resize(config->nodes()->size());
+ const Node *my_node = config->nodes()->Get(node());
+ for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
+ const Node *node = config->nodes()->Get(node_index);
+ NodeData *node_data = &nodes_data_[node_index];
+ node_data->channels.resize(config->channels()->size());
+ // We should save the channel if it is delivered to the node represented
+ // by the NodeData, but not sent by that node. That combo means it is
+ // forwarded.
+ size_t channel_index = 0;
+ node_data->any_delivered = false;
+ for (const Channel *channel : *config->channels()) {
+ node_data->channels[channel_index].delivered =
+ configuration::ChannelIsReadableOnNode(channel, node) &&
+ configuration::ChannelIsSendableOnNode(channel, my_node);
+ node_data->any_delivered = node_data->any_delivered ||
+ node_data->channels[channel_index].delivered;
+ ++channel_index;
+ }
+ }
+
+ for (const Channel *channel : *config->channels()) {
+ source_node_.emplace_back(configuration::GetNodeIndex(
+ config, channel->source_node()->string_view()));
+ }
+ }
+}
+
+void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
+ CHECK(configuration::MultiNode(log_file_header()->configuration()));
+ CHECK_NE(timestamp_mapper->node(), node());
+ CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
+
+ NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
+ // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
+ // we could needlessly save data.
+ if (node_data->any_delivered) {
+ LOG(INFO) << "Registering on node " << node() << " for peer node "
+ << timestamp_mapper->node();
+ CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
+
+ timestamp_mapper->nodes_data_[node()].peer = this;
+ }
+}
+
+void TimestampMapper::FillMessage(Message *m) {
+ message_ = {
+ .channel_index = m->channel_index,
+ .queue_index = m->queue_index,
+ .monotonic_event_time = m->timestamp,
+ .realtime_event_time = aos::realtime_clock::time_point(
+ std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+ .remote_queue_index = 0xffffffff,
+ .monotonic_remote_time = monotonic_clock::min_time,
+ .realtime_remote_time = realtime_clock::min_time,
+ .data = std::move(m->data)};
+}
+
+TimestampedMessage *TimestampMapper::Front() {
+ // No need to fetch anything new. A previous message still exists.
+ switch (first_message_) {
+ case FirstMessage::kNeedsUpdate:
+ break;
+ case FirstMessage::kInMessage:
+ return &message_;
+ case FirstMessage::kNullptr:
+ return nullptr;
+ }
+
+ if (nodes_data_.empty()) {
+ // Simple path. We are single node, so there are no timestamps to match!
+ CHECK_EQ(messages_.size(), 0u);
+ Message *m = node_merger_.Front();
+ if (!m) {
+ first_message_ = FirstMessage::kNullptr;
+ return nullptr;
+ }
+ // Fill in message_ so we have a place to associate remote timestamps, and
+ // return it.
+ FillMessage(m);
+
+ CHECK_GE(message_.monotonic_event_time, last_message_time_);
+ last_message_time_ = message_.monotonic_event_time;
+ first_message_ = FirstMessage::kInMessage;
+ return &message_;
+ }
+
+ // We need to only add messages to the list so they get processed for messages
+ // which are delivered. Reuse the flow below which uses messages_ by just
+ // adding the new message to messages_ and continuing.
+ if (messages_.empty()) {
+ if (!Queue()) {
+ // Found nothing to add, we are out of data!
+ first_message_ = FirstMessage::kNullptr;
+ return nullptr;
+ }
+
+ // Now that it has been added (and cannibalized), forget about it upstream.
+ node_merger_.PopFront();
+ }
+
+ Message *m = &(messages_.front());
+
+ if (source_node_[m->channel_index] == node()) {
+ // From us, just forward it on, filling the remote data in as invalid.
+ FillMessage(m);
+ CHECK_GE(message_.monotonic_event_time, last_message_time_);
+ last_message_time_ = message_.monotonic_event_time;
+ first_message_ = FirstMessage::kInMessage;
+ return &message_;
+ } else {
+ // Got a timestamp, find the matching remote data, match it, and return it.
+ Message data = MatchingMessageFor(*m);
+
+ // Return the data from the remote. The local message only has timestamp
+ // info which isn't relevant anymore once extracted.
+ message_ = {
+ .channel_index = m->channel_index,
+ .queue_index = m->queue_index,
+ .monotonic_event_time = m->timestamp,
+ .realtime_event_time = aos::realtime_clock::time_point(
+ std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+ .remote_queue_index = m->data.message().remote_queue_index(),
+ .monotonic_remote_time =
+ monotonic_clock::time_point(std::chrono::nanoseconds(
+ m->data.message().monotonic_remote_time())),
+ .realtime_remote_time = realtime_clock::time_point(
+ std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
+ .data = std::move(data.data)};
+ CHECK_GE(message_.monotonic_event_time, last_message_time_);
+ last_message_time_ = message_.monotonic_event_time;
+ first_message_ = FirstMessage::kInMessage;
+ return &message_;
+ }
+}
+
+void TimestampMapper::PopFront() {
+ CHECK(first_message_ != FirstMessage::kNeedsUpdate);
+ first_message_ = FirstMessage::kNeedsUpdate;
+
+ if (nodes_data_.empty()) {
+ // We are thin wrapper around node_merger. Call it directly.
+ node_merger_.PopFront();
+ } else {
+ // Since messages_ holds the data, drop it.
+ messages_.pop_front();
+ }
+}
+
+Message TimestampMapper::MatchingMessageFor(const Message &message) {
+ TimestampMapper *peer =
+ CHECK_NOTNULL(nodes_data_[source_node_[message.channel_index]].peer);
+ // The queue which will have the matching data, if available.
+ std::deque<Message> *data_queue =
+ &peer->nodes_data_[node()].channels[message.channel_index].messages;
+
+ // Figure out what queue index we are looking for.
+ CHECK(message.data.message().has_remote_queue_index());
+ const uint32_t remote_queue_index =
+ message.data.message().remote_queue_index();
+
+ CHECK(message.data.message().has_monotonic_remote_time());
+ CHECK(message.data.message().has_realtime_remote_time());
+
+ const monotonic_clock::time_point monotonic_remote_time(
+ std::chrono::nanoseconds(message.data.message().monotonic_remote_time()));
+ const realtime_clock::time_point realtime_remote_time(
+ std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
+
+ peer->QueueUntil(monotonic_remote_time);
+
+ if (data_queue->empty()) {
+ return Message{
+ .channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ }
+
+ // The algorithm below is constant time with some assumptions. We need there
+ // to be no missing messages in the data stream. This also assumes a queue
+ // hasn't wrapped. That is conservative, but should let us get started.
+ //
+ // TODO(austin): We can break these assumptions pretty easily once we have a
+ // need.
+ CHECK_EQ(
+ data_queue->back().queue_index - data_queue->front().queue_index + 1u,
+ data_queue->size());
+
+ if (remote_queue_index < data_queue->front().queue_index ||
+ remote_queue_index > data_queue->back().queue_index) {
+ return Message{
+ .channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ }
+
+ // Pull the data out and confirm that the timestamps match as expected.
+ Message result = std::move(
+ (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
+ CHECK_EQ(result.timestamp, monotonic_remote_time)
+ << ": Queue index matches, but timestamp doesn't. Please investigate!";
+ CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
+ result.data.message().realtime_sent_time())),
+ realtime_remote_time)
+ << ": Queue index matches, but timestamp doesn't. Please investigate!";
+ // Now drop the data off the front. We have deduplicated timestamps, so we
+ // are done. And all the data is in order.
+ data_queue->erase(data_queue->begin(),
+ data_queue->begin() + (1 + remote_queue_index -
+ data_queue->front().queue_index));
return result;
}
-} // namespace
-ChannelMerger::ChannelMerger(
- const std::vector<std::vector<std::string>> &filenames)
- : split_message_readers_(MakeSplitMessageReaders(filenames)),
- log_file_header_(split_message_readers_[0]->raw_log_file_header()) {
- // Now, confirm that the configuration matches for each and pick a start time.
- // Also return the list of possible nodes.
- for (const std::unique_ptr<SplitMessageReader> &reader :
- split_message_readers_) {
- CHECK(CompareFlatBuffer(log_file_header_.message().configuration(),
- reader->log_file_header()->configuration()))
- << ": Replaying log files with different configurations isn't "
- "supported";
+void TimestampMapper::QueueUntil(monotonic_clock::time_point t) {
+ if (queued_until_ > t) {
+ return;
}
+ while (true) {
+ if (!messages_.empty() && messages_.back().timestamp > t) {
+ queued_until_ = std::max(queued_until_, messages_.back().timestamp);
+ return;
+ }
- nodes_ = configuration::GetNodes(configuration());
+ if (!Queue()) {
+ // Found nothing to add, we are out of data!
+ queued_until_ = monotonic_clock::max_time;
+ return;
+ }
+
+ // Now that it has been added (and cannibalized), forget about it upstream.
+ node_merger_.PopFront();
+ }
}
-bool ChannelMerger::SetNode(const Node *target_node) {
- std::vector<SplitMessageReader *> split_message_readers;
- for (const std::unique_ptr<SplitMessageReader> &reader :
- split_message_readers_) {
- split_message_readers.emplace_back(reader.get());
+bool TimestampMapper::Queue() {
+ Message *m = node_merger_.Front();
+ if (m == nullptr) {
+ return false;
}
-
- // Go find a log_file_header for this node.
- {
- bool found_node = false;
-
- for (const std::unique_ptr<SplitMessageReader> &reader :
- split_message_readers_) {
- // In order to identify which logfile(s) map to the target node, do a
- // logical comparison of the nodes, by confirming that we are either in a
- // single-node setup (where the nodes will both be nullptr) or that the
- // node names match (but the other node fields--e.g., hostname lists--may
- // not).
- const bool both_null =
- reader->node() == nullptr && target_node == nullptr;
- const bool both_have_name =
- (reader->node() != nullptr) && (target_node != nullptr) &&
- (reader->node()->has_name() && target_node->has_name());
- const bool node_names_identical =
- both_have_name && (reader->node()->name()->string_view() ==
- target_node->name()->string_view());
- if (both_null || node_names_identical) {
- if (!found_node) {
- found_node = true;
- log_file_header_ = reader->raw_log_file_header();
- VLOG(1) << "Found log file " << reader->filename() << " with node "
- << FlatbufferToJson(reader->node()) << " start_time "
- << monotonic_start_time();
- } else {
- // Find the earliest start time. That way, if we get a full log file
- // directly from the node, and a partial later, we start with the
- // full. Update our header to match that.
- const monotonic_clock::time_point new_monotonic_start_time(
- chrono::nanoseconds(
- reader->log_file_header()->monotonic_start_time()));
- const realtime_clock::time_point new_realtime_start_time(
- chrono::nanoseconds(
- reader->log_file_header()->realtime_start_time()));
-
- if (monotonic_start_time() == monotonic_clock::min_time ||
- (new_monotonic_start_time != monotonic_clock::min_time &&
- new_monotonic_start_time < monotonic_start_time())) {
- log_file_header_.mutable_message()->mutate_monotonic_start_time(
- new_monotonic_start_time.time_since_epoch().count());
- log_file_header_.mutable_message()->mutate_realtime_start_time(
- new_realtime_start_time.time_since_epoch().count());
- VLOG(1) << "Updated log file " << reader->filename()
- << " with node " << FlatbufferToJson(reader->node())
- << " start_time " << new_monotonic_start_time;
- }
- }
- }
- }
-
- if (!found_node) {
- LOG(WARNING) << "Failed to find log file for node "
- << FlatbufferToJson(target_node);
- return false;
+ for (NodeData &node_data : nodes_data_) {
+ if (!node_data.any_delivered) continue;
+ if (node_data.channels[m->channel_index].delivered) {
+ // TODO(austin): This copies the data... Probably not worth stressing
+ // about yet.
+ // TODO(austin): Bound how big this can get. We tend not to send massive
+ // data, so we can probably ignore this for a bit.
+ node_data.channels[m->channel_index].messages.emplace_back(*m);
}
}
- // Build up all the timestamp mergers. This connects up all the
- // SplitMessageReaders.
- timestamp_mergers_.reserve(configuration()->channels()->size());
- for (size_t channel_index = 0;
- channel_index < configuration()->channels()->size(); ++channel_index) {
- timestamp_mergers_.emplace_back(
- configuration(), split_message_readers, channel_index,
- configuration::GetNode(configuration(), target_node), this);
- }
-
- // And prime everything.
- for (std::unique_ptr<SplitMessageReader> &split_message_reader :
- split_message_readers_) {
- split_message_reader->QueueMessages(
- split_message_reader->monotonic_start_time());
- }
-
- node_ = configuration::GetNodeOrDie(configuration(), target_node);
+ messages_.emplace_back(std::move(*m));
return true;
}
-monotonic_clock::time_point ChannelMerger::OldestMessageTime() const {
- if (channel_heap_.empty()) {
- return monotonic_clock::max_time;
- }
- return channel_heap_.front().first;
-}
-
-void ChannelMerger::PushChannelHeap(monotonic_clock::time_point timestamp,
- int channel_index) {
- // Pop and recreate the heap if it has already been pushed. And since we are
- // pushing again, we don't need to clear pushed.
- if (timestamp_mergers_[channel_index].pushed()) {
- const auto channel_iterator = std::find_if(
- channel_heap_.begin(), channel_heap_.end(),
- [channel_index](const std::pair<monotonic_clock::time_point, int> x) {
- return x.second == channel_index;
- });
- DCHECK(channel_iterator != channel_heap_.end());
- if (std::get<0>(*channel_iterator) == timestamp) {
- // It's already in the heap, in the correct spot, so nothing
- // more for us to do here.
- return;
- }
- channel_heap_.erase(channel_iterator);
- std::make_heap(channel_heap_.begin(), channel_heap_.end(),
- ChannelHeapCompare);
- }
-
- if (timestamp == monotonic_clock::min_time) {
- timestamp_mergers_[channel_index].set_pushed(false);
- return;
- }
-
- channel_heap_.push_back(std::make_pair(timestamp, channel_index));
-
- // The default sort puts the newest message first. Use a custom comparator to
- // put the oldest message first.
- std::push_heap(channel_heap_.begin(), channel_heap_.end(),
- ChannelHeapCompare);
-}
-
-void ChannelMerger::VerifyHeaps() {
- std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
- channel_heap_;
- std::make_heap(channel_heap.begin(), channel_heap.end(), &ChannelHeapCompare);
-
- for (size_t i = 0; i < channel_heap_.size(); ++i) {
- CHECK(channel_heap_[i] == channel_heap[i]) << ": Heaps diverged...";
- CHECK_EQ(
- std::get<0>(channel_heap[i]),
- timestamp_mergers_[std::get<1>(channel_heap[i])].channel_merger_time());
- }
-}
-
-std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>>
-ChannelMerger::PopOldest() {
- CHECK_GT(channel_heap_.size(), 0u);
- std::pair<monotonic_clock::time_point, int> oldest_channel_data =
- channel_heap_.front();
- int channel_index = oldest_channel_data.second;
- std::pop_heap(channel_heap_.begin(), channel_heap_.end(),
- &ChannelHeapCompare);
- channel_heap_.pop_back();
-
- timestamp_mergers_[channel_index].set_pushed(false);
-
- TimestampMerger *merger = ×tamp_mergers_[channel_index];
-
- // Merger handles any queueing needed from here.
- std::tuple<TimestampMerger::DeliveryTimestamp,
- SizePrefixedFlatbufferVector<MessageHeader>>
- message = merger->PopOldest();
- DCHECK_EQ(std::get<0>(message).monotonic_event_time,
- oldest_channel_data.first)
- << ": channel_heap_ was corrupted for " << channel_index << ": "
- << DebugString();
-
- CHECK_GE(std::get<0>(message).monotonic_event_time, last_popped_time_)
- << ": " << MaybeNodeName(log_file_header()->node())
- << "Messages came off the queue out of order. " << DebugString();
- last_popped_time_ = std::get<0>(message).monotonic_event_time;
-
- VLOG(1) << "Popped " << last_popped_time_ << " "
- << configuration::StrippedChannelToString(
- configuration()->channels()->Get(channel_index))
- << " (" << channel_index << ")";
-
- return std::make_tuple(std::get<0>(message), channel_index,
- std::move(std::get<1>(message)));
-}
-
-std::string SplitMessageReader::MessageHeaderQueue::DebugString() const {
+std::string TimestampMapper::DebugString() const {
std::stringstream ss;
- for (size_t i = 0; i < data_.size(); ++i) {
- if (i < 5 || i + 5 > data_.size()) {
- if (timestamps) {
- ss << " msg: ";
- } else {
- ss << " timestamp: ";
+ ss << "node " << node() << " [\n";
+ for (const Message &message : messages_) {
+ ss << " " << message << "\n";
+ }
+ ss << "] queued_until " << queued_until_;
+ for (const NodeData &ns : nodes_data_) {
+ if (ns.peer == nullptr) continue;
+ ss << "\nnode " << ns.peer->node() << " remote_data [\n";
+ size_t channel_index = 0;
+ for (const NodeData::ChannelData &channel_data :
+ ns.peer->nodes_data_[node()].channels) {
+ if (channel_data.messages.empty()) {
+ continue;
}
- ss << monotonic_clock::time_point(
- chrono::nanoseconds(data_[i].message().monotonic_sent_time()))
- << " ("
- << realtime_clock::time_point(
- chrono::nanoseconds(data_[i].message().realtime_sent_time()))
- << ") " << data_[i].message().queue_index();
- if (timestamps) {
- ss << " <- remote "
- << monotonic_clock::time_point(chrono::nanoseconds(
- data_[i].message().monotonic_remote_time()))
- << " ("
- << realtime_clock::time_point(chrono::nanoseconds(
- data_[i].message().realtime_remote_time()))
- << ")";
+
+ ss << " channel " << channel_index << " [\n";
+ for (const Message &m : channel_data.messages) {
+ ss << " " << m << "\n";
}
- ss << "\n";
- } else if (i == 5) {
- ss << " ...\n";
+ ss << " ]\n";
+ ++channel_index;
}
+ ss << "] queued_until " << ns.peer->queued_until_;
}
-
- return ss.str();
-}
-
-std::string SplitMessageReader::DebugString(int channel) const {
- std::stringstream ss;
- ss << "[\n";
- ss << channels_[channel].data.DebugString();
- ss << " ]";
- return ss.str();
-}
-
-std::string SplitMessageReader::DebugString(int channel, int node_index) const {
- std::stringstream ss;
- ss << "[\n";
- ss << channels_[channel].timestamps[node_index].DebugString();
- ss << " ]";
- return ss.str();
-}
-
-std::string TimestampMerger::DebugString() const {
- std::stringstream ss;
-
- if (timestamp_heap_.size() > 0) {
- ss << " timestamp_heap {\n";
- std::vector<
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
- timestamp_heap = timestamp_heap_;
- while (timestamp_heap.size() > 0u) {
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_timestamp_reader = timestamp_heap.front();
-
- ss << " " << std::get<2>(oldest_timestamp_reader) << " "
- << std::get<0>(oldest_timestamp_reader) << " queue_index ("
- << std::get<1>(oldest_timestamp_reader) << ") ttq "
- << std::get<2>(oldest_timestamp_reader)->time_to_queue() << " "
- << std::get<2>(oldest_timestamp_reader)->filename() << " -> "
- << std::get<2>(oldest_timestamp_reader)
- ->DebugString(channel_index_, node_index_)
- << "\n";
-
- std::pop_heap(timestamp_heap.begin(), timestamp_heap.end(),
- &SplitMessageReaderHeapCompare);
- timestamp_heap.pop_back();
- }
- ss << " }\n";
- }
-
- ss << " message_heap {\n";
- {
- std::vector<
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
- message_heap = message_heap_;
- while (!message_heap.empty()) {
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>
- oldest_message_reader = message_heap.front();
-
- ss << " " << std::get<2>(oldest_message_reader) << " "
- << std::get<0>(oldest_message_reader) << " queue_index ("
- << std::get<1>(oldest_message_reader) << ") ttq "
- << std::get<2>(oldest_message_reader)->time_to_queue() << " "
- << std::get<2>(oldest_message_reader)->filename() << " -> "
- << std::get<2>(oldest_message_reader)->DebugString(channel_index_)
- << "\n";
-
- std::pop_heap(message_heap.begin(), message_heap.end(),
- &SplitMessageReaderHeapCompare);
- message_heap.pop_back();
- }
- }
- ss << " }";
-
- return ss.str();
-}
-
-std::string ChannelMerger::DebugString() const {
- std::stringstream ss;
- ss << "start_time " << realtime_start_time() << " " << monotonic_start_time()
- << "\n";
- ss << "channel_heap {\n";
- std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap =
- channel_heap_;
- while (!channel_heap.empty()) {
- std::tuple<monotonic_clock::time_point, int> channel = channel_heap.front();
- ss << " " << std::get<0>(channel) << " (" << std::get<1>(channel) << ") "
- << configuration::CleanedChannelToString(
- configuration()->channels()->Get(std::get<1>(channel)))
- << "\n";
-
- ss << timestamp_mergers_[std::get<1>(channel)].DebugString() << "\n";
-
- std::pop_heap(channel_heap.begin(), channel_heap.end(),
- &ChannelHeapCompare);
- channel_heap.pop_back();
- }
- ss << "}";
-
return ss.str();
}
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 985a6bc..fd600d3 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -14,6 +14,7 @@
#include <utility>
#include <vector>
+#include "absl/container/btree_set.h"
#include "absl/types/span.h"
#include "aos/containers/resizeable_buffer.h"
#include "aos/events/event_loop.h"
@@ -280,6 +281,13 @@
std::string_view filename() const { return message_reader_.filename(); }
+ // Returns the LogParts that holds the filenames we are reading.
+ const LogParts &parts() const { return parts_; }
+
+ const LogFileHeader *log_file_header() const {
+ return message_reader_.log_file_header();
+ }
+
// Returns the minimum amount of data needed to queue up for sorting before
// we are guarenteed to not see data out of order.
std::chrono::nanoseconds max_out_of_order_duration() const {
@@ -309,452 +317,265 @@
monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
};
-class TimestampMerger;
+// Struct to hold a message as it gets sorted on a single node.
+struct Message {
+ // The channel.
+ uint32_t channel_index = 0xffffffff;
+ // The local queue index.
+ uint32_t queue_index = 0xffffffff;
+ // The local timestamp on the monotonic clock.
+ monotonic_clock::time_point timestamp = monotonic_clock::min_time;
+ // The data (either a timestamp header, or a data header).
+ SizePrefixedFlatbufferVector<MessageHeader> data;
-// A design requirement is that the relevant data for a channel is not more than
-// max_out_of_order_duration out of order. We approach sorting in layers.
-//
-// 1) Split each (maybe chunked) log file into one queue per channel. Read this
-// log file looking for data pertaining to a specific node.
-// (SplitMessageReader)
-// 2) Merge all the data per channel from the different log files into a sorted
-// list of timestamps and messages. (TimestampMerger)
-// 3) Combine the timestamps and messages. (TimestampMerger)
-// 4) Merge all the channels to produce the next message on a node.
-// (ChannelMerger)
-// 5) Duplicate this entire stack per node.
-
-// This class splits messages and timestamps up into a queue per channel, and
-// handles reading data from multiple chunks.
-class SplitMessageReader {
- public:
- SplitMessageReader(const std::vector<std::string> &filenames);
-
- // Sets the TimestampMerger that gets notified for each channel. The node
- // that the TimestampMerger is merging as needs to be passed in.
- void SetTimestampMerger(TimestampMerger *timestamp_merger, int channel,
- const Node *target_node);
-
- // Returns the (timestamp, queue_index, message_header) for the oldest message
- // in a channel, or max_time if there is nothing in the channel.
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message(int channel) {
- return channels_[channel].data.front_timestamp();
- }
-
- // Returns the (timestamp, queue_index, message_header) for the oldest
- // delivery time in a channel, or max_time if there is nothing in the channel.
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message(int channel, int destination_node) {
- return channels_[channel].timestamps[destination_node].front_timestamp();
- }
-
- // Returns the timestamp, queue_index, and message for the oldest data on a
- // channel. Requeues data as needed.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopOldest(int channel_index);
-
- // Returns the timestamp, queue_index, and message for the oldest timestamp on
- // a channel delivered to a node. Requeues data as needed.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopOldestTimestamp(int channel, int node_index);
-
- // Returns the header for the log files.
- const LogFileHeader *log_file_header() const {
- return &log_file_header_.message();
- }
-
- const SizePrefixedFlatbufferVector<LogFileHeader> &raw_log_file_header()
- const {
- return log_file_header_;
- }
-
- // Returns the starting time for this set of log files.
- monotonic_clock::time_point monotonic_start_time() {
- return monotonic_clock::time_point(
- std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
- }
- realtime_clock::time_point realtime_start_time() {
- return realtime_clock::time_point(
- std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
- }
-
- // Returns the configuration from the log file header.
- const Configuration *configuration() const {
- return log_file_header()->configuration();
- }
-
- // Returns the node who's point of view this log file is from. Make sure this
- // is a pointer in the configuration() nodes list so it can be consumed
- // elsewhere.
- const Node *node() const {
- if (configuration()->has_nodes()) {
- return configuration::GetNodeOrDie(configuration(),
- log_file_header()->node());
- } else {
- CHECK(!log_file_header()->has_node());
- return nullptr;
- }
- }
-
- // Returns the timestamp of the newest message read from the log file, and the
- // timestamp that we need to re-queue data.
- monotonic_clock::time_point newest_timestamp() const {
- return newest_timestamp_;
- }
-
- // Returns the next time to trigger a requeue.
- monotonic_clock::time_point time_to_queue() const { return time_to_queue_; }
-
- // Returns the minimum amount of data needed to queue up for sorting before
- // we are guarenteed to not see data out of order.
- std::chrono::nanoseconds max_out_of_order_duration() const {
- return message_reader_->max_out_of_order_duration();
- }
-
- std::string_view filename() const { return message_reader_->filename(); }
-
- // Adds more messages to the sorted list. This reads enough data such that
- // oldest_message_time can be replayed safely. Returns false if the log file
- // has all been read.
- bool QueueMessages(monotonic_clock::time_point oldest_message_time);
-
- // Returns debug strings for a channel, and timestamps for a node.
- std::string DebugString(int channel) const;
- std::string DebugString(int channel, int node_index) const;
-
- // Returns true if all the messages have been queued from the last log file in
- // the list of log files chunks.
- bool at_end() const { return at_end_; }
-
- private:
- // TODO(austin): Need to copy or refcount the message instead of running
- // multiple copies of the reader. Or maybe have a "as_node" index and hide it
- // inside.
-
- // Moves to the next log file in the list.
- bool NextLogFile();
-
- // Filenames of the log files.
- std::vector<std::string> filenames_;
- // And the index of the next file to open.
- size_t next_filename_index_ = 0;
-
- // Node we are reading as.
- const Node *target_node_ = nullptr;
-
- // Log file header to report. This is a copy.
- SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
- // Current log file being read.
- std::unique_ptr<MessageReader> message_reader_;
-
- // Datastructure to hold the list of messages, cached timestamp for the
- // oldest message, and sender to send with.
- struct MessageHeaderQueue {
- // If true, this is a timestamp queue.
- bool timestamps = false;
-
- // Returns a reference to the the oldest message.
- SizePrefixedFlatbufferVector<MessageHeader> &front() {
- CHECK_GT(data_.size(), 0u);
- return data_.front();
- }
-
- // Adds a message to the back of the queue. Returns true if it was actually
- // emplaced.
- bool emplace_back(SizePrefixedFlatbufferVector<MessageHeader> &&msg);
-
- // Drops the front message. Invalidates the front() reference.
- void PopFront();
-
- // The size of the queue.
- size_t size() { return data_.size(); }
-
- // Returns a debug string with info about each message in the queue.
- std::string DebugString() const;
-
- // Returns the (timestamp, queue_index, message_header) for the oldest
- // message.
- const std::tuple<monotonic_clock::time_point, uint32_t,
- const MessageHeader *>
- front_timestamp() {
- const MessageHeader &message = front().message();
- return std::make_tuple(
- monotonic_clock::time_point(
- std::chrono::nanoseconds(message.monotonic_sent_time())),
- message.queue_index(), &message);
- }
-
- // Pointer to the timestamp merger for this queue if available.
- TimestampMerger *timestamp_merger = nullptr;
- // Pointer to the reader which feeds this queue.
- SplitMessageReader *split_reader = nullptr;
-
- private:
- // The data.
- std::deque<SizePrefixedFlatbufferVector<MessageHeader>> data_;
- };
-
- // All the queues needed for a channel. There isn't going to be data in all
- // of these.
- struct ChannelData {
- // The data queue for the channel.
- MessageHeaderQueue data;
- // Queues for timestamps for each node.
- std::vector<MessageHeaderQueue> timestamps;
- };
-
- // Data for all the channels.
- std::vector<ChannelData> channels_;
-
- // Once we know the node that this SplitMessageReader will be writing as,
- // there will be only one MessageHeaderQueue that a specific channel matches.
- // Precompute this here for efficiency.
- std::vector<MessageHeaderQueue *> channels_to_write_;
-
- monotonic_clock::time_point time_to_queue_ = monotonic_clock::min_time;
-
- // Latches true when we hit the end of the last log file and there is no sense
- // poking it further.
- bool at_end_ = false;
-
- // Timestamp of the newest message that was read and actually queued. We want
- // to track this independently from the log file because we need the
- // timestamps here to be timestamps of messages that are queued.
- monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+ bool operator<(const Message &m2) const;
+ bool operator>=(const Message &m2) const;
+ bool operator==(const Message &m2) const;
};
-class ChannelMerger;
+std::ostream &operator<<(std::ostream &os, const Message &m);
-// Sorts channels (and timestamps) from multiple log files for a single channel.
-class TimestampMerger {
- public:
- TimestampMerger(const Configuration *configuration,
- std::vector<SplitMessageReader *> split_message_readers,
- int channel_index, const Node *target_node,
- ChannelMerger *channel_merger);
+// Structure to hold a full message and all the timestamps, which may or may not
+// have been sent from a remote node. The remote_queue_index will be invalid if
+// this message is from the point of view of the node which sent it.
+struct TimestampedMessage {
+ uint32_t channel_index = 0xffffffff;
- // Metadata used to schedule the message.
- struct DeliveryTimestamp {
- monotonic_clock::time_point monotonic_event_time =
- monotonic_clock::min_time;
- realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
- uint32_t queue_index = 0xffffffff;
+ uint32_t queue_index = 0xffffffff;
+ monotonic_clock::time_point monotonic_event_time = monotonic_clock::min_time;
+ realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
- monotonic_clock::time_point monotonic_remote_time =
- monotonic_clock::min_time;
- realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
- uint32_t remote_queue_index = 0xffffffff;
- };
+ uint32_t remote_queue_index = 0xffffffff;
+ monotonic_clock::time_point monotonic_remote_time = monotonic_clock::min_time;
+ realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
- // Pushes SplitMessageReader onto the timestamp heap. This should only be
- // called when timestamps are placed in the channel this class is merging for
- // the reader.
- void UpdateTimestamp(
- SplitMessageReader *split_message_reader,
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message_time) {
- PushTimestampHeap(oldest_message_time, split_message_reader);
- }
- // Pushes SplitMessageReader onto the message heap. This should only be
- // called when data is placed in the channel this class is merging for the
- // reader.
- void Update(
- SplitMessageReader *split_message_reader,
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message_time) {
- PushMessageHeap(oldest_message_time, split_message_reader);
- }
-
- // Returns the oldest combined timestamp and data for this channel. If there
- // isn't a matching piece of data, returns only the timestamp with no data.
- // The caller can determine what the appropriate action is to recover.
- std::tuple<DeliveryTimestamp, SizePrefixedFlatbufferVector<MessageHeader>>
- PopOldest();
-
- // Tracks if the channel merger has pushed this onto it's heap or not.
- bool pushed() { return pushed_; }
- // Sets if this has been pushed to the channel merger heap. Should only be
- // called by the channel merger.
- void set_pushed(bool pushed) { pushed_ = pushed; }
-
- // Returns a debug string with the heaps printed out.
- std::string DebugString() const;
-
- // Returns true if we have timestamps.
- bool has_timestamps() const { return has_timestamps_; }
-
- // Records that one of the log files ran out of data. This should only be
- // called by a SplitMessageReader.
- void NoticeAtEnd();
-
- aos::monotonic_clock::time_point channel_merger_time() {
- if (has_timestamps_) {
- return std::get<0>(timestamp_heap_[0]);
- } else {
- return std::get<0>(message_heap_[0]);
- }
- }
-
- private:
- // Pushes messages and timestamps to the corresponding heaps.
- void PushMessageHeap(
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp,
- SplitMessageReader *split_message_reader);
- void PushTimestampHeap(
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- timestamp,
- SplitMessageReader *split_message_reader);
-
- // Pops a message from the message heap. This automatically triggers the
- // split message reader to re-fetch any new data.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopMessageHeap();
-
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_message() const;
- std::tuple<monotonic_clock::time_point, uint32_t, const MessageHeader *>
- oldest_timestamp() const;
- // Pops a message from the timestamp heap. This automatically triggers the
- // split message reader to re-fetch any new data.
- std::tuple<monotonic_clock::time_point, uint32_t,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopTimestampHeap();
-
- const Configuration *configuration_;
-
- // If true, this is a forwarded channel and timestamps should be matched.
- bool has_timestamps_ = false;
-
- // Tracks if the ChannelMerger has pushed this onto it's queue.
- bool pushed_ = false;
-
- // The split message readers used for source data.
- std::vector<SplitMessageReader *> split_message_readers_;
-
- // The channel to merge.
- int channel_index_;
-
- // Our node.
- int node_index_;
-
- // Heaps for messages and timestamps.
- std::vector<
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
- message_heap_;
- std::vector<
- std::tuple<monotonic_clock::time_point, uint32_t, SplitMessageReader *>>
- timestamp_heap_;
-
- // Parent channel merger.
- ChannelMerger *channel_merger_;
+ SizePrefixedFlatbufferVector<MessageHeader> data;
};
-// This class handles constructing all the split message readers, channel
-// mergers, and combining the results.
-class ChannelMerger {
+std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m);
+
+// Class to sort the resulting messages from a PartsMessageReader.
+class LogPartsSorter {
public:
- // Builds a ChannelMerger around a set of log files. These are of the format:
- // {
- // {log1_part0, log1_part1, ...},
- // {log2}
- // }
- // The inner vector is a list of log file chunks which form up a log file.
- // The outer vector is a list of log files with subsets of the messages, or
- // messages from different nodes.
- ChannelMerger(const std::vector<std::vector<std::string>> &filenames);
+ LogPartsSorter(LogParts log_parts);
- // Returns the nodes that we know how to merge.
- const std::vector<const Node *> nodes() const;
- // Sets the node that we will return messages as. Returns true if the node
- // has log files and will produce data. This can only be called once, and
- // will likely corrupt state if called a second time.
- bool SetNode(const Node *target_node);
-
- // Everything else needs the node set before it works.
-
- // Returns a timestamp for the oldest message in this group of logfiles.
- monotonic_clock::time_point OldestMessageTime() const;
- // Pops the oldest message.
- std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopOldest();
-
- // Returns the config for this set of log files.
- const Configuration *configuration() const {
- return log_file_header()->configuration();
- }
-
+ // Returns the current log file header.
+ // TODO(austin): Is this the header we want to report? Do we want a better
+ // start time?
+ // TODO(austin): Report a start time from the LogParts. Figure out how that
+ // all works.
const LogFileHeader *log_file_header() const {
- return &log_file_header_.message();
+ return parts_message_reader_.log_file_header();
}
- // Returns the start times for the configured node's log files.
monotonic_clock::time_point monotonic_start_time() const {
- return monotonic_clock::time_point(
- std::chrono::nanoseconds(log_file_header()->monotonic_start_time()));
+ return parts_message_reader_.parts().monotonic_start_time;
}
realtime_clock::time_point realtime_start_time() const {
- return realtime_clock::time_point(
- std::chrono::nanoseconds(log_file_header()->realtime_start_time()));
+ return parts_message_reader_.parts().realtime_start_time;
}
- // Returns the node set by SetNode above.
- const Node *node() const { return node_; }
+ // The time this data is sorted until.
+ monotonic_clock::time_point sorted_until() const { return sorted_until_; }
- // Called by the TimestampMerger when new data is available with the provided
- // timestamp and channel_index.
- void Update(monotonic_clock::time_point timestamp, int channel_index) {
- PushChannelHeap(timestamp, channel_index);
- }
+ // Returns the next sorted message from the log file. It is safe to call
+ // std::move() on the result to move the data flatbuffer from it.
+ Message *Front();
+ // Pops the front message. This should only be called after a call to
+ // Front().
+ void PopFront();
- // Returns a debug string with all the heaps in it. Generally only useful for
- // debugging what went wrong.
+ // Returns a debug string representing the contents of this sorter.
std::string DebugString() const;
- // Returns true if one of the log files has finished reading everything. When
- // log file chunks are involved, this means that the last chunk in a log file
- // has been read. It is acceptable to be missing data at this point in time.
- bool at_end() const { return at_end_; }
+ private:
+ // Log parts reader we are wrapping.
+ PartsMessageReader parts_message_reader_;
+ // Cache of the time we are sorted until.
+ aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
- // Marks that one of the log files is at the end. This should only be called
- // by timestamp mergers.
- void NoticeAtEnd() { at_end_ = true; }
+ // Timestamp of the last message returned. Used to make sure nothing goes
+ // backwards.
+ monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
+
+ // Set used for efficient sorting of messages. We can benchmark and evaluate
+ // other data structures if this proves to be the bottleneck.
+ absl::btree_set<Message> messages_;
+};
+
+// Class to run merge sort on the messages from multiple LogPartsSorter
+// instances.
+class NodeMerger {
+ public:
+ NodeMerger(std::vector<LogParts> parts);
+
+ // Node index in the configuration of this node.
+ int node() const { return node_; }
+
+ // The log file header for one of the log files.
+ const LogFileHeader *log_file_header() const {
+ CHECK(!parts_sorters_.empty());
+ return parts_sorters_[0].log_file_header();
+ }
+
+ monotonic_clock::time_point monotonic_start_time() const {
+ return monotonic_start_time_;
+ }
+ realtime_clock::time_point realtime_start_time() const {
+ return realtime_start_time_;
+ }
+
+ // The time this data is sorted until.
+ monotonic_clock::time_point sorted_until() const { return sorted_until_; }
+
+ // Returns the next sorted message from the set of log files. It is safe to
+ // call std::move() on the result to move the data flatbuffer from it.
+ Message *Front();
+ // Pops the front message. This should only be called after a call to
+ // Front().
+ void PopFront();
private:
- // Pushes the timestamp for new data on the provided channel.
- void PushChannelHeap(monotonic_clock::time_point timestamp,
- int channel_index);
+ // Unsorted list of all parts sorters.
+ std::vector<LogPartsSorter> parts_sorters_;
+ // Pointer to the parts sorter holding the current Front message if one
+ // exists, or nullptr if a new one needs to be found.
+ LogPartsSorter *current_ = nullptr;
+ // Cached sorted_until value.
+ aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
- // CHECKs that channel_heap_ and timestamp_heap_ are valid heaps.
- void VerifyHeaps();
+ // Cached node.
+ int node_;
- // All the message readers.
- std::vector<std::unique_ptr<SplitMessageReader>> split_message_readers_;
+ // Timestamp of the last message returned. Used to make sure nothing goes
+ // backwards.
+ monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
- // The log header we are claiming to be.
- SizePrefixedFlatbufferVector<LogFileHeader> log_file_header_;
+ realtime_clock::time_point realtime_start_time_ = realtime_clock::max_time;
+ monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::max_time;
+};
- // The timestamp mergers which combine data from the split message readers.
- std::vector<TimestampMerger> timestamp_mergers_;
+// Class to match timestamps with the corresponding data from other nodes.
+class TimestampMapper {
+ public:
+ TimestampMapper(std::vector<LogParts> file);
- // A heap of the channel readers and timestamps for the oldest data in each.
- std::vector<std::pair<monotonic_clock::time_point, int>> channel_heap_;
+ // Copying and moving will mess up the internal raw pointers. Just don't do
+ // it.
+ TimestampMapper(TimestampMapper const &) = delete;
+ TimestampMapper(TimestampMapper &&) = delete;
+ void operator=(TimestampMapper const &) = delete;
+ void operator=(TimestampMapper &&) = delete;
- // Configured node.
- const Node *node_;
+ // TODO(austin): It would be super helpful to provide a way to queue up to
+ // time X without matching timestamps, and to then be able to pull the
+ // timestamps out of this queue. This lets us bootstrap time estimation
+ // without exploding memory usage worst case.
- bool at_end_ = false;
+ // Returns a log file header for this node.
+ const LogFileHeader *log_file_header() const {
+ return node_merger_.log_file_header();
+ }
- // Cached copy of the list of nodes.
- std::vector<const Node *> nodes_;
+ // Returns which node this is sorting for.
+ size_t node() const { return node_merger_.node(); }
- // Last time popped. Used to detect events being returned out of order.
- monotonic_clock::time_point last_popped_time_ = monotonic_clock::min_time;
+ // The start time of this log.
+ monotonic_clock::time_point monotonic_start_time() const {
+ return node_merger_.monotonic_start_time();
+ }
+ realtime_clock::time_point realtime_start_time() const {
+ return node_merger_.realtime_start_time();
+ }
+
+ // Uses timestamp_mapper as the peer for its node. Only one mapper may be set
+ // for each node. Peers are used to look up the data for timestamps on this
+ // node.
+ void AddPeer(TimestampMapper *timestamp_mapper);
+
+ // Time that we are sorted until internally.
+ monotonic_clock::time_point sorted_until() const {
+ return node_merger_.sorted_until();
+ }
+
+ // Returns the next message for this node.
+ TimestampedMessage *Front();
+ // Pops the next message. Front must be called first.
+ void PopFront();
+
+ // Returns debug information about this node.
+ std::string DebugString() const;
+
+ private:
+ // The state for a remote node. This holds the data that needs to be matched
+ // with the remote node's timestamps.
+ struct NodeData {
+ // True if we should save data here. This should be true if any of the
+ // bools in delivered below are true.
+ bool any_delivered = false;
+
+ // Peer pointer. This node is only to be considered if a peer is set.
+ TimestampMapper *peer = nullptr;
+
+ struct ChannelData {
+ // Deque per channel. This contains the data from the outside
+ // TimestampMapper node which is relevant for the node this NodeData
+ // points to.
+ std::deque<Message> messages;
+ // Bool tracking per channel if a message is delivered to the node this
+ // NodeData represents.
+ bool delivered = false;
+ };
+
+ // Vector with per channel data.
+ std::vector<ChannelData> channels;
+ };
+
+ // Returns (and forgets about) the data for the provided timestamp message
+ // showing when it was delivered to this node.
+ Message MatchingMessageFor(const Message &message);
+
+ // Queues up a single message into our message queue, and any nodes that this
+ // message is delivered to. Returns true if one was available, false
+ // otherwise.
+ bool Queue();
+
+ // Queues up data until we have at least one message >= to time t.
+ // Useful for triggering a remote node to read enough data to have the
+ // timestamp you care about available.
+ void QueueUntil(monotonic_clock::time_point t);
+
+ // Fills message_ with the contents of m.
+ void FillMessage(Message *m);
+
+ // The node merger to source messages from.
+ NodeMerger node_merger_;
+ // The buffer of messages for this node. These are not matched with any
+ // remote data.
+ std::deque<Message> messages_;
+ // The node index for the source node for each channel.
+ std::vector<size_t> source_node_;
+
+ // Vector per node. Not all nodes will have anything.
+ std::vector<NodeData> nodes_data_;
+
+ // Latest message to return.
+ TimestampedMessage message_;
+
+ // Tracks if the first message points to message_, nullptr (all done), or is
+ // invalid.
+ enum class FirstMessage {
+ kNeedsUpdate,
+ kInMessage,
+ kNullptr,
+ };
+ FirstMessage first_message_ = FirstMessage::kNeedsUpdate;
+
+ // Timestamp of the last message returned. Used to make sure nothing goes
+ // backwards.
+ monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
+ // Time this node is queued up until. Used for caching.
+ monotonic_clock::time_point queued_until_ = monotonic_clock::min_time;
};
// Returns the node name with a trailing space, or an empty string if we are on
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 14d1de7..1d83466 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -5,6 +5,7 @@
#include "aos/events/logging/logfile_sorting.h"
#include "aos/events/logging/test_message_generated.h"
+#include "aos/flatbuffer_merge.h"
#include "aos/flatbuffers.h"
#include "aos/json_to_flatbuffer.h"
#include "aos/testing/tmpdir.h"
@@ -31,11 +32,9 @@
unlink(logfile.c_str());
const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m1 =
- JsonToSizedFlatbuffer<TestMessage>(
- R"({ "value": 1 })");
+ JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 1 })");
const aos::SizePrefixedFlatbufferDetachedBuffer<TestMessage> m2 =
- JsonToSizedFlatbuffer<TestMessage>(
- R"({ "value": 2 })");
+ JsonToSizedFlatbuffer<TestMessage>(R"({ "value": 2 })");
{
DetachedBufferWriter writer(logfile, std::make_unique<DummyEncoder>());
@@ -215,6 +214,896 @@
EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::max_time);
}
+// Tests that Message's operator < works as expected.
+TEST(MessageTest, Sorting) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+
+ Message m1{.channel_index = 0,
+ .queue_index = 0,
+ .timestamp = e + chrono::milliseconds(1),
+ .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ Message m2{.channel_index = 0,
+ .queue_index = 0,
+ .timestamp = e + chrono::milliseconds(2),
+ .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+
+ EXPECT_LT(m1, m2);
+ EXPECT_GE(m2, m1);
+
+ m1.timestamp = e;
+ m2.timestamp = e;
+
+ m1.channel_index = 1;
+ m2.channel_index = 2;
+
+ EXPECT_LT(m1, m2);
+ EXPECT_GE(m2, m1);
+
+ m1.channel_index = 0;
+ m2.channel_index = 0;
+ m1.queue_index = 0;
+ m2.queue_index = 1;
+
+ EXPECT_LT(m1, m2);
+ EXPECT_GE(m2, m1);
+}
+
+aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> MakeHeader(
+ const aos::FlatbufferDetachedBuffer<Configuration> &config,
+ const std::string_view json) {
+ flatbuffers::FlatBufferBuilder fbb;
+ flatbuffers::Offset<Configuration> config_offset =
+ aos::CopyFlatBuffer(config, &fbb);
+ LogFileHeader::Builder header_builder(fbb);
+ header_builder.add_configuration(config_offset);
+ fbb.Finish(header_builder.Finish());
+ aos::FlatbufferDetachedBuffer<LogFileHeader> config_header(fbb.Release());
+
+ aos::FlatbufferDetachedBuffer<LogFileHeader> header_updates(
+ JsonToFlatbuffer<LogFileHeader>(json));
+ CHECK(header_updates.Verify());
+ flatbuffers::FlatBufferBuilder fbb2;
+ fbb2.FinishSizePrefixed(
+ aos::MergeFlatBuffers(config_header, header_updates, &fbb2));
+ return fbb2.Release();
+}
+
+class SortingElementTest : public ::testing::Test {
+ public:
+ SortingElementTest()
+ : config_(JsonToFlatbuffer<Configuration>(
+ R"({
+ "channels": [
+ {
+ "name": "/a",
+ "type": "aos.logger.testing.TestMessage",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2"
+ },
+ {
+ "name": "pi3"
+ }
+ ]
+ },
+ {
+ "name": "/b",
+ "type": "aos.logger.testing.TestMessage",
+ "source_node": "pi1"
+ },
+ {
+ "name": "/c",
+ "type": "aos.logger.testing.TestMessage",
+ "source_node": "pi1"
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1"
+ },
+ {
+ "name": "pi2"
+ },
+ {
+ "name": "pi3"
+ }
+ ]
+}
+)")),
+ config0_(MakeHeader(config_, R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
+ "parts_index": 0
+})")),
+ config1_(MakeHeader(config_,
+ R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
+ "parts_index": 0
+})")),
+ config2_(MakeHeader(config_,
+ R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi2"
+ },
+ "logger_node": {
+ "name": "pi2"
+ },
+ "monotonic_start_time": 0,
+ "realtime_start_time": 1000000000000,
+ "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
+ "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
+ "parts_index": 0
+})")),
+ config3_(MakeHeader(config_,
+ R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 2000000,
+ "realtime_start_time": 1000000000,
+ "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
+ "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
+ "parts_index": 0
+})")) {
+ unlink(logfile0_.c_str());
+ unlink(logfile1_.c_str());
+ unlink(logfile2_.c_str());
+ queue_index_.resize(kChannels);
+ }
+
+ protected:
+ static constexpr size_t kChannels = 3u;
+
+ flatbuffers::DetachedBuffer MakeLogMessage(
+ const aos::monotonic_clock::time_point monotonic_now, int channel_index,
+ int value) {
+ flatbuffers::FlatBufferBuilder message_fbb;
+ message_fbb.ForceDefaults(true);
+ TestMessage::Builder test_message_builder(message_fbb);
+ test_message_builder.add_value(value);
+ message_fbb.Finish(test_message_builder.Finish());
+
+ aos::Context context;
+ context.monotonic_event_time = monotonic_now;
+ context.realtime_event_time = aos::realtime_clock::epoch() +
+ chrono::seconds(1000) +
+ monotonic_now.time_since_epoch();
+ context.queue_index = queue_index_[channel_index];
+ context.size = message_fbb.GetSize();
+ context.data = message_fbb.GetBufferPointer();
+
+ ++queue_index_[channel_index];
+
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.FinishSizePrefixed(
+ PackMessage(&fbb, context, channel_index, LogType::kLogMessage));
+
+ return fbb.Release();
+ }
+
+ flatbuffers::DetachedBuffer MakeTimestampMessage(
+ const aos::monotonic_clock::time_point sender_monotonic_now,
+ int channel_index, chrono::nanoseconds receiver_monotonic_offset) {
+ aos::Context context;
+ context.monotonic_remote_time = sender_monotonic_now;
+ context.realtime_remote_time = aos::realtime_clock::epoch() +
+ chrono::seconds(1000) +
+ sender_monotonic_now.time_since_epoch();
+ context.remote_queue_index = queue_index_[channel_index] - 1;
+ context.monotonic_event_time =
+ sender_monotonic_now + receiver_monotonic_offset;
+ context.realtime_event_time =
+ aos::realtime_clock::epoch() + chrono::seconds(1000) +
+ context.monotonic_event_time.time_since_epoch();
+ context.queue_index = queue_index_[channel_index] - 1 + 100;
+ context.size = 0;
+ context.data = nullptr;
+
+ flatbuffers::FlatBufferBuilder fbb;
+ fbb.FinishSizePrefixed(PackMessage(&fbb, context, channel_index,
+ LogType::kLogDeliveryTimeOnly));
+ LOG(INFO) << aos::FlatbufferToJson(
+ aos::SizePrefixedFlatbufferSpan<MessageHeader>(
+ absl::Span<uint8_t>(fbb.GetBufferPointer(), fbb.GetSize())));
+
+ return fbb.Release();
+ }
+
+ const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
+ const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
+ const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
+
+ const aos::FlatbufferDetachedBuffer<Configuration> config_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
+
+ std::vector<uint32_t> queue_index_;
+};
+
+using LogPartsSorterTest = SortingElementTest;
+using LogPartsSorterDeathTest = LogPartsSorterTest;
+using NodeMergerTest = SortingElementTest;
+using TimestampMapperTest = SortingElementTest;
+
+// Tests that we can pull messages out of a log sorted in order.
+TEST_F(LogPartsSorterTest, Pull) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+ writer.QueueSpan(config0_.span());
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_});
+
+ LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+ // Confirm we aren't sorted until any time until the message is popped.
+ // Peeking shouldn't change the sorted until time.
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+
+ std::deque<Message> output;
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), e + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(parts_sorter.Front() == nullptr);
+
+ EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+ EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1000));
+ EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1901));
+ EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
+}
+
+// Tests that we can pull messages out of a log sorted in order.
+TEST_F(LogPartsSorterTest, WayBeforeStart) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+ writer.QueueSpan(config0_.span());
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e - chrono::milliseconds(500), 0, 0x005));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e - chrono::milliseconds(10), 2, 0x005));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e - chrono::milliseconds(1000), 1, 0x105));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1901), 1, 0x107));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_});
+
+ LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+ // Confirm we aren't sorted until any time until the message is popped.
+ // Peeking shouldn't change the sorted until time.
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+
+ std::deque<Message> output;
+
+ for (monotonic_clock::time_point t :
+ {e + chrono::milliseconds(1900), e + chrono::milliseconds(1900),
+ e + chrono::milliseconds(1900), monotonic_clock::max_time,
+ monotonic_clock::max_time}) {
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ output.emplace_back(std::move(*parts_sorter.Front()));
+ parts_sorter.PopFront();
+ EXPECT_EQ(parts_sorter.sorted_until(), t);
+ }
+
+ ASSERT_TRUE(parts_sorter.Front() == nullptr);
+
+ EXPECT_EQ(output[0].timestamp, e - chrono::milliseconds(1000));
+ EXPECT_EQ(output[1].timestamp, e - chrono::milliseconds(500));
+ EXPECT_EQ(output[2].timestamp, e - chrono::milliseconds(10));
+ EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(1901));
+ EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(2000));
+}
+
+// Tests that messages too far out of order trigger death.
+TEST_F(LogPartsSorterDeathTest, Pull) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer(logfile0_, std::make_unique<DummyEncoder>());
+ writer.QueueSpan(config0_.span());
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 1, 0x105));
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2001), 0, 0x006));
+ // The following message is too far out of order and will trigger the CHECK.
+ writer.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1900), 1, 0x107));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_});
+
+ LogPartsSorter parts_sorter(parts[0].parts[0]);
+
+ // Confirm we aren't sorted until any time until the message is popped.
+ // Peeking shouldn't change the sorted until time.
+ EXPECT_EQ(parts_sorter.sorted_until(), monotonic_clock::min_time);
+ std::deque<Message> output;
+
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ parts_sorter.PopFront();
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ ASSERT_TRUE(parts_sorter.Front() != nullptr);
+ parts_sorter.PopFront();
+
+ EXPECT_DEATH({ parts_sorter.Front(); }, "Max out of order exceeded.");
+}
+
+// Tests that we can merge data from 2 separate files, including duplicate data.
+TEST_F(NodeMergerTest, TwoFileMerger) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config1_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1001), 1, 0x105));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1002), 1, 0x106));
+
+ // Make a duplicate!
+ SizePrefixedFlatbufferDetachedBuffer<MessageHeader> msg(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer0.QueueSpan(msg.span());
+ writer1.QueueSpan(msg.span());
+
+ writer1.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3002), 1, 0x107));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+ ASSERT_EQ(parts.size(), 1u);
+
+ NodeMerger merger(FilterPartsForNode(parts, "pi1"));
+
+ EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
+
+ std::deque<Message> output;
+
+ EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
+ ASSERT_TRUE(merger.Front() != nullptr);
+ EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
+
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), e + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(merger.Front() != nullptr);
+ output.emplace_back(std::move(*merger.Front()));
+ merger.PopFront();
+ EXPECT_EQ(merger.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(merger.Front() == nullptr);
+
+ EXPECT_EQ(output[0].timestamp, e + chrono::milliseconds(1000));
+ EXPECT_EQ(output[1].timestamp, e + chrono::milliseconds(1001));
+ EXPECT_EQ(output[2].timestamp, e + chrono::milliseconds(1002));
+ EXPECT_EQ(output[3].timestamp, e + chrono::milliseconds(2000));
+ EXPECT_EQ(output[4].timestamp, e + chrono::milliseconds(3000));
+ EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
+}
+
+// Tests that we can match timestamps on delivered messages.
+TEST_F(TimestampMapperTest, ReadNode0First) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+ EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
+ EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
+ EXPECT_TRUE(output0[2].data.Verify());
+ }
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ }
+}
+
+// Tests that we can match timestamps on delivered messages. By doing this in
+// the reverse order, the second node needs to queue data up from the first node
+// to find the matching timestamp.
+TEST_F(TimestampMapperTest, ReadNode1First) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ }
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+ EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
+ EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
+ EXPECT_TRUE(output0[2].data.Verify());
+ }
+}
+
+// Tests that we return just the timestamps if we couldn't find the data and the
+// missing data was at the beginning of the file.
+TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_FALSE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ }
+}
+
+// Tests that we return just the timestamps if we couldn't find the data and the
+// missing data was at the end of the file.
+TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_FALSE(output1[2].data.Verify());
+ }
+}
+
+// Tests that we properly sort log files with duplicate timestamps.
+TEST_F(TimestampMapperTest, ReadSameTimestamp) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ for (int i = 0; i < 4; ++i) {
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ }
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_EQ(output1[3].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[3].data.Verify());
+ }
+}
+
+// Tests that we properly sort log files with duplicate timestamps.
+TEST_F(TimestampMapperTest, StartTime) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config1_.span());
+ DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
+ writer2.QueueSpan(config3_.span());
+ }
+
+ const std::vector<LogFile> parts =
+ SortParts({logfile0_, logfile1_, logfile2_});
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+
+ EXPECT_EQ(mapper0.monotonic_start_time(), e + chrono::milliseconds(1));
+ EXPECT_EQ(mapper0.realtime_start_time(),
+ realtime_clock::time_point(chrono::seconds(1000)));
+}
+
} // namespace testing
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index b32c748..5a6bb8f 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -38,11 +38,12 @@
namespace {
// Helper to safely read a header, or CHECK.
SizePrefixedFlatbufferVector<LogFileHeader> MaybeReadHeaderOrDie(
- const std::vector<std::vector<std::string>> &filenames) {
- CHECK_GE(filenames.size(), 1u) << ": Empty filenames list";
- CHECK_GE(filenames[0].size(), 1u) << ": Empty filenames list";
+ const std::vector<LogFile> &log_files) {
+ CHECK_GE(log_files.size(), 1u) << ": Empty filenames list";
+ CHECK_GE(log_files[0].parts.size(), 1u) << ": Empty filenames list";
+ CHECK_GE(log_files[0].parts[0].parts.size(), 1u) << ": Empty filenames list";
std::optional<SizePrefixedFlatbufferVector<LogFileHeader>> result =
- ReadHeader(filenames[0][0]);
+ ReadHeader(log_files[0].parts[0].parts[0]);
CHECK(result);
return result.value();
}
@@ -717,24 +718,12 @@
LogReader::LogReader(std::string_view filename,
const Configuration *replay_configuration)
- : LogReader(std::vector<std::string>{std::string(filename)},
- replay_configuration) {}
+ : LogReader(SortParts({std::string(filename)}), replay_configuration) {}
-LogReader::LogReader(const std::vector<std::string> &filenames,
+LogReader::LogReader(std::vector<LogFile> log_files,
const Configuration *replay_configuration)
- : LogReader(std::vector<std::vector<std::string>>{filenames},
- replay_configuration) {}
-
-// TODO(austin): Make this the base and kill the others. This has much better
-// context for sorting.
-LogReader::LogReader(const std::vector<LogFile> &log_files,
- const Configuration *replay_configuration)
- : LogReader(ToLogReaderVector(log_files), replay_configuration) {}
-
-LogReader::LogReader(const std::vector<std::vector<std::string>> &filenames,
- const Configuration *replay_configuration)
- : filenames_(filenames),
- log_file_header_(MaybeReadHeaderOrDie(filenames)),
+ : log_files_(std::move(log_files)),
+ log_file_header_(MaybeReadHeaderOrDie(log_files_)),
replay_configuration_(replay_configuration) {
MakeRemappedConfig();
@@ -762,8 +751,8 @@
}
if (!configuration::MultiNode(configuration())) {
- states_.emplace_back(
- std::make_unique<State>(std::make_unique<ChannelMerger>(filenames)));
+ states_.emplace_back(std::make_unique<State>(
+ std::make_unique<TimestampMapper>(FilterPartsForNode(log_files_, ""))));
} else {
if (replay_configuration) {
CHECK_EQ(logged_configuration()->nodes()->size(),
@@ -851,8 +840,12 @@
for (const Node *node : configuration::GetNodes(configuration())) {
const size_t node_index =
configuration::GetNodeIndex(configuration(), node);
- states_[node_index] =
- std::make_unique<State>(std::make_unique<ChannelMerger>(filenames_));
+ std::vector<LogParts> filtered_parts = FilterPartsForNode(
+ log_files_, node != nullptr ? node->name()->string_view() : "");
+ states_[node_index] = std::make_unique<State>(
+ filtered_parts.size() == 0u
+ ? nullptr
+ : std::make_unique<TimestampMapper>(std::move(filtered_parts)));
State *state = states_[node_index].get();
state->set_event_loop(state->SetNodeEventLoopFactory(
event_loop_factory_->GetNodeEventLoopFactory(node)));
@@ -860,6 +853,20 @@
state->SetChannelCount(logged_configuration()->channels()->size());
}
+ for (const Node *node : configuration::GetNodes(configuration())) {
+ const size_t node_index =
+ configuration::GetNodeIndex(configuration(), node);
+ State *state = states_[node_index].get();
+ for (const Node *other_node : configuration::GetNodes(configuration())) {
+ const size_t other_node_index =
+ configuration::GetNodeIndex(configuration(), other_node);
+ State *other_state = states_[other_node_index].get();
+ if (other_state != state) {
+ state->AddPeer(other_state);
+ }
+ }
+ }
+
// Register after making all the State objects so we can build references
// between them.
for (const Node *node : configuration::GetNodes(configuration())) {
@@ -971,6 +978,9 @@
VLOG(1) << "Start time is " << state->monotonic_start_time() << " for node "
<< MaybeNodeName(state->event_loop()->node()) << "now "
<< state->monotonic_now();
+ if (state->monotonic_start_time() == monotonic_clock::min_time) {
+ continue;
+ }
// And start computing the start time on the distributed clock now that
// that works.
start_time = std::max(
@@ -1221,8 +1231,6 @@
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
- const bool has_data = state->SetNode();
-
for (size_t logged_channel_index = 0;
logged_channel_index < logged_configuration()->channels()->size();
++logged_channel_index) {
@@ -1267,7 +1275,7 @@
// If we didn't find any log files with data in them, we won't ever get a
// callback or be live. So skip the rest of the setup.
- if (!has_data) {
+ if (state->OldestMessageTime() == monotonic_clock::max_time) {
return;
}
@@ -1283,45 +1291,39 @@
}
return;
}
- TimestampMerger::DeliveryTimestamp channel_timestamp;
- int channel_index;
- SizePrefixedFlatbufferVector<MessageHeader> channel_data =
- SizePrefixedFlatbufferVector<MessageHeader>::Empty();
-
if (VLOG_IS_ON(1)) {
LogFit("Offset was");
}
bool update_time;
- std::tie(channel_timestamp, channel_index, channel_data) =
- state->PopOldest(&update_time);
+ TimestampedMessage timestamped_message = state->PopOldest(&update_time);
const monotonic_clock::time_point monotonic_now =
state->event_loop()->context().monotonic_event_time;
if (!FLAGS_skip_order_validation) {
- CHECK(monotonic_now == channel_timestamp.monotonic_event_time)
+ CHECK(monotonic_now == timestamped_message.monotonic_event_time)
<< ": " << FlatbufferToJson(state->event_loop()->node()) << " Now "
<< monotonic_now << " trying to send "
- << channel_timestamp.monotonic_event_time << " failure "
+ << timestamped_message.monotonic_event_time << " failure "
<< state->DebugString();
- } else if (monotonic_now != channel_timestamp.monotonic_event_time) {
+ } else if (monotonic_now != timestamped_message.monotonic_event_time) {
LOG(WARNING) << "Check failed: monotonic_now == "
- "channel_timestamp.monotonic_event_time) ("
+ "timestamped_message.monotonic_event_time) ("
<< monotonic_now << " vs. "
- << channel_timestamp.monotonic_event_time
+ << timestamped_message.monotonic_event_time
<< "): " << FlatbufferToJson(state->event_loop()->node())
<< " Now " << monotonic_now << " trying to send "
- << channel_timestamp.monotonic_event_time << " failure "
+ << timestamped_message.monotonic_event_time << " failure "
<< state->DebugString();
}
- if (channel_timestamp.monotonic_event_time >
+ if (timestamped_message.monotonic_event_time >
state->monotonic_start_time() ||
event_loop_factory_ != nullptr) {
if ((!ignore_missing_data_ && !FLAGS_skip_missing_forwarding_entries &&
!state->at_end()) ||
- channel_data.message().data() != nullptr) {
- CHECK(channel_data.message().data() != nullptr)
+ timestamped_message.data.span().size() != 0u) {
+ CHECK_NE(timestamped_message.data.span().size(), 0u)
<< ": Got a message without data. Forwarding entry which was "
"not matched? Use --skip_missing_forwarding_entries to "
"ignore this.";
@@ -1331,28 +1333,38 @@
// destination node (this node). As a proxy, do this by making sure
// that time on the source node is past when the message was sent.
if (!FLAGS_skip_order_validation) {
- CHECK_LT(channel_timestamp.monotonic_remote_time,
- state->monotonic_remote_now(channel_index))
+ CHECK_LT(
+ timestamped_message.monotonic_remote_time,
+ state->monotonic_remote_now(timestamped_message.channel_index))
<< state->event_loop()->node()->name()->string_view() << " to "
- << state->remote_node(channel_index)->name()->string_view()
+ << state->remote_node(timestamped_message.channel_index)
+ ->name()
+ ->string_view()
<< " " << state->DebugString();
- } else if (channel_timestamp.monotonic_remote_time >=
- state->monotonic_remote_now(channel_index)) {
+ } else if (timestamped_message.monotonic_remote_time >=
+ state->monotonic_remote_now(
+ timestamped_message.channel_index)) {
LOG(WARNING)
- << "Check failed: channel_timestamp.monotonic_remote_time < "
- "state->monotonic_remote_now(channel_index) ("
- << channel_timestamp.monotonic_remote_time << " vs. "
- << state->monotonic_remote_now(channel_index) << ") "
- << state->event_loop()->node()->name()->string_view() << " to "
- << state->remote_node(channel_index)->name()->string_view()
- << " currently " << channel_timestamp.monotonic_event_time
+ << "Check failed: timestamped_message.monotonic_remote_time < "
+ "state->monotonic_remote_now(timestamped_message.channel_"
+ "index) ("
+ << timestamped_message.monotonic_remote_time << " vs. "
+ << state->monotonic_remote_now(
+ timestamped_message.channel_index)
+ << ") " << state->event_loop()->node()->name()->string_view()
+ << " to "
+ << state->remote_node(timestamped_message.channel_index)
+ ->name()
+ ->string_view()
+ << " currently " << timestamped_message.monotonic_event_time
<< " ("
<< state->ToDistributedClock(
- channel_timestamp.monotonic_event_time)
+ timestamped_message.monotonic_event_time)
<< ") remote event time "
- << channel_timestamp.monotonic_remote_time << " ("
+ << timestamped_message.monotonic_remote_time << " ("
<< state->RemoteToDistributedClock(
- channel_index, channel_timestamp.monotonic_remote_time)
+ timestamped_message.channel_index,
+ timestamped_message.monotonic_remote_time)
<< ") " << state->DebugString();
}
@@ -1362,12 +1374,12 @@
fprintf(
offset_fp_,
"# time_since_start, offset node 0, offset node 1, ...\n");
- first_time_ = channel_timestamp.realtime_event_time;
+ first_time_ = timestamped_message.realtime_event_time;
}
fprintf(offset_fp_, "%.9f",
std::chrono::duration_cast<std::chrono::duration<double>>(
- channel_timestamp.realtime_event_time - first_time_)
+ timestamped_message.realtime_event_time - first_time_)
.count());
for (int i = 1; i < time_offset_matrix_.rows(); ++i) {
fprintf(offset_fp_, ", %.9f",
@@ -1383,15 +1395,14 @@
}
// If we have access to the factory, use it to fix the realtime time.
- state->SetRealtimeOffset(channel_timestamp.monotonic_event_time,
- channel_timestamp.realtime_event_time);
+ state->SetRealtimeOffset(timestamped_message.monotonic_event_time,
+ timestamped_message.realtime_event_time);
VLOG(1) << MaybeNodeName(state->event_loop()->node()) << "Sending "
- << channel_timestamp.monotonic_event_time;
+ << timestamped_message.monotonic_event_time;
// TODO(austin): std::move channel_data in and make that efficient in
// simulation.
- state->Send(channel_index, channel_data.message().data()->Data(),
- channel_data.message().data()->size(), channel_timestamp);
+ state->Send(std::move(timestamped_message));
} else if (state->at_end() && !ignore_missing_data_) {
// We are at the end of the log file and found missing data. Finish
// reading the rest of the log file and call it quits. We don't want
@@ -1401,15 +1412,15 @@
state->PopOldest(&update_time_dummy);
}
} else {
- CHECK(channel_data.message().data() == nullptr) << ": Nullptr";
+ CHECK(timestamped_message.data.span().data() == nullptr) << ": Nullptr";
}
} else {
LOG(WARNING)
<< "Not sending data from before the start of the log file. "
- << channel_timestamp.monotonic_event_time.time_since_epoch().count()
+ << timestamped_message.monotonic_event_time.time_since_epoch().count()
<< " start " << monotonic_start_time().time_since_epoch().count()
<< " "
- << FlatbufferToJson(channel_data,
+ << FlatbufferToJson(timestamped_message.data,
{.multi_line = false, .max_vector_size = 100});
}
@@ -1742,8 +1753,14 @@
return remapped_channel;
}
-LogReader::State::State(std::unique_ptr<ChannelMerger> channel_merger)
- : channel_merger_(std::move(channel_merger)) {}
+LogReader::State::State(std::unique_ptr<TimestampMapper> timestamp_mapper)
+ : timestamp_mapper_(std::move(timestamp_mapper)) {}
+
+void LogReader::State::AddPeer(State *peer) {
+ if (timestamp_mapper_ && peer->timestamp_mapper_) {
+ timestamp_mapper_->AddPeer(peer->timestamp_mapper_.get());
+ }
+}
EventLoop *LogReader::State::SetNodeEventLoopFactory(
NodeEventLoopFactory *node_event_loop_factory) {
@@ -1783,22 +1800,20 @@
factory_channel_index_[logged_channel_index] = factory_channel_index;
}
-bool LogReader::State::Send(
- size_t channel_index, const void *data, size_t size,
- const TimestampMerger::DeliveryTimestamp &delivery_timestamp) {
- aos::RawSender *sender = channels_[channel_index].get();
+bool LogReader::State::Send(const TimestampedMessage ×tamped_message) {
+ aos::RawSender *sender = channels_[timestamped_message.channel_index].get();
uint32_t remote_queue_index = 0xffffffff;
- if (remote_timestamp_senders_[channel_index] != nullptr) {
- std::vector<SentTimestamp> *queue_index_map =
- CHECK_NOTNULL(CHECK_NOTNULL(channel_source_state_[channel_index])
- ->queue_index_map_[channel_index]
- .get());
+ if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
+ std::vector<SentTimestamp> *queue_index_map = CHECK_NOTNULL(
+ CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index])
+ ->queue_index_map_[timestamped_message.channel_index]
+ .get());
SentTimestamp search;
- search.monotonic_event_time = delivery_timestamp.monotonic_remote_time;
- search.realtime_event_time = delivery_timestamp.realtime_remote_time;
- search.queue_index = delivery_timestamp.remote_queue_index;
+ search.monotonic_event_time = timestamped_message.monotonic_remote_time;
+ search.realtime_event_time = timestamped_message.realtime_remote_time;
+ search.queue_index = timestamped_message.remote_queue_index;
// Find the sent time if available.
auto element = std::lower_bound(
@@ -1828,10 +1843,10 @@
// receive time.
if (element != queue_index_map->end()) {
CHECK_EQ(element->monotonic_event_time,
- delivery_timestamp.monotonic_remote_time);
+ timestamped_message.monotonic_remote_time);
CHECK_EQ(element->realtime_event_time,
- delivery_timestamp.realtime_remote_time);
- CHECK_EQ(element->queue_index, delivery_timestamp.remote_queue_index);
+ timestamped_message.realtime_remote_time);
+ CHECK_EQ(element->queue_index, timestamped_message.remote_queue_index);
remote_queue_index = element->actual_queue_index;
}
@@ -1839,27 +1854,32 @@
// Send! Use the replayed queue index here instead of the logged queue index
// for the remote queue index. This makes re-logging work.
- const bool sent =
- sender->Send(data, size, delivery_timestamp.monotonic_remote_time,
- delivery_timestamp.realtime_remote_time, remote_queue_index);
+ const bool sent = sender->Send(
+ timestamped_message.data.message().data()->Data(),
+ timestamped_message.data.message().data()->size(),
+ timestamped_message.monotonic_remote_time,
+ timestamped_message.realtime_remote_time, remote_queue_index);
if (!sent) return false;
- if (queue_index_map_[channel_index]) {
+ if (queue_index_map_[timestamped_message.channel_index]) {
SentTimestamp timestamp;
- timestamp.monotonic_event_time = delivery_timestamp.monotonic_event_time;
- timestamp.realtime_event_time = delivery_timestamp.realtime_event_time;
- timestamp.queue_index = delivery_timestamp.queue_index;
+ timestamp.monotonic_event_time = timestamped_message.monotonic_event_time;
+ timestamp.realtime_event_time = timestamped_message.realtime_event_time;
+ timestamp.queue_index = timestamped_message.queue_index;
timestamp.actual_queue_index = sender->sent_queue_index();
- queue_index_map_[channel_index]->emplace_back(timestamp);
- } else if (remote_timestamp_senders_[channel_index] != nullptr) {
+ queue_index_map_[timestamped_message.channel_index]->emplace_back(
+ timestamp);
+ } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
+ nullptr) {
aos::Sender<MessageHeader>::Builder builder =
- remote_timestamp_senders_[channel_index]->MakeBuilder();
+ remote_timestamp_senders_[timestamped_message.channel_index]
+ ->MakeBuilder();
logger::MessageHeader::Builder message_header_builder =
builder.MakeBuilder<logger::MessageHeader>();
message_header_builder.add_channel_index(
- factory_channel_index_[channel_index]);
+ factory_channel_index_[timestamped_message.channel_index]);
// Swap the remote and sent metrics. They are from the sender's
// perspective, not the receiver's perspective.
@@ -1870,9 +1890,9 @@
message_header_builder.add_queue_index(sender->sent_queue_index());
message_header_builder.add_monotonic_remote_time(
- delivery_timestamp.monotonic_remote_time.time_since_epoch().count());
+ timestamped_message.monotonic_remote_time.time_since_epoch().count());
message_header_builder.add_realtime_remote_time(
- delivery_timestamp.realtime_remote_time.time_since_epoch().count());
+ timestamped_message.realtime_remote_time.time_since_epoch().count());
message_header_builder.add_remote_queue_index(remote_queue_index);
@@ -1899,28 +1919,23 @@
return &(sender->second);
}
-std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>>
-LogReader::State::PopOldest(bool *update_time) {
+TimestampedMessage LogReader::State::PopOldest(bool *update_time) {
CHECK_GT(sorted_messages_.size(), 0u);
- std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>,
- message_bridge::NoncausalOffsetEstimator *>
+ std::tuple<TimestampedMessage, message_bridge::NoncausalOffsetEstimator *>
result = std::move(sorted_messages_.front());
VLOG(2) << MaybeNodeName(event_loop_->node()) << "PopOldest Popping "
<< std::get<0>(result).monotonic_event_time;
sorted_messages_.pop_front();
SeedSortedMessages();
- if (std::get<3>(result) != nullptr) {
- *update_time = std::get<3>(result)->Pop(
+ if (std::get<1>(result) != nullptr) {
+ *update_time = std::get<1>(result)->Pop(
event_loop_->node(), std::get<0>(result).monotonic_event_time);
} else {
*update_time = false;
}
- return std::make_tuple(std::get<0>(result), std::get<1>(result),
- std::move(std::get<2>(result)));
+ return std::move(std::get<0>(result));
}
monotonic_clock::time_point LogReader::State::OldestMessageTime() const {
@@ -1930,18 +1945,25 @@
return std::get<0>(sorted_messages_.front()).monotonic_event_time;
}
- return channel_merger_->OldestMessageTime();
+ TimestampedMessage *m =
+ timestamp_mapper_ ? timestamp_mapper_->Front() : nullptr;
+ if (m == nullptr) {
+ return monotonic_clock::max_time;
+ }
+ return m->monotonic_event_time;
}
void LogReader::State::SeedSortedMessages() {
+ if (!timestamp_mapper_) return;
const aos::monotonic_clock::time_point end_queue_time =
(sorted_messages_.size() > 0
? std::get<0>(sorted_messages_.front()).monotonic_event_time
- : channel_merger_->monotonic_start_time()) +
+ : timestamp_mapper_->monotonic_start_time()) +
std::chrono::seconds(2);
while (true) {
- if (channel_merger_->OldestMessageTime() == monotonic_clock::max_time) {
+ TimestampedMessage *m = timestamp_mapper_->Front();
+ if (m == nullptr) {
return;
}
if (sorted_messages_.size() > 0) {
@@ -1953,31 +1975,25 @@
}
}
- TimestampMerger::DeliveryTimestamp channel_timestamp;
- int channel_index;
- SizePrefixedFlatbufferVector<MessageHeader> channel_data =
- SizePrefixedFlatbufferVector<MessageHeader>::Empty();
-
message_bridge::NoncausalOffsetEstimator *filter = nullptr;
- std::tie(channel_timestamp, channel_index, channel_data) =
- channel_merger_->PopOldest();
+ TimestampedMessage timestamped_message = std::move(*m);
+ timestamp_mapper_->PopFront();
// Skip any messages without forwarding information.
- if (channel_timestamp.monotonic_remote_time != monotonic_clock::min_time) {
+ if (timestamped_message.monotonic_remote_time != monotonic_clock::min_time) {
// Got a forwarding timestamp!
- filter = filters_[channel_index];
+ filter = filters_[timestamped_message.channel_index];
CHECK(filter != nullptr);
// Call the correct method depending on if we are the forward or
// reverse direction here.
filter->Sample(event_loop_->node(),
- channel_timestamp.monotonic_event_time,
- channel_timestamp.monotonic_remote_time);
+ timestamped_message.monotonic_event_time,
+ timestamped_message.monotonic_remote_time);
}
- sorted_messages_.emplace_back(channel_timestamp, channel_index,
- std::move(channel_data), filter);
+ sorted_messages_.emplace_back(std::move(timestamped_message), filter);
}
}
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index f6a037b..f0f0a69 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -319,24 +319,10 @@
// pass it in here. It must provide all the channels that the original logged
// config did.
//
- // Log filenames are in the following format:
- //
- // {
- // {log1_part0, log1_part1, ...},
- // {log2}
- // }
- // The inner vector is a list of log file chunks which form up a log file.
- // The outer vector is a list of log files with subsets of the messages, or
- // messages from different nodes.
- //
- // If the outer vector isn't provided, it is assumed to be of size 1.
+ // The single file constructor calls SortParts internally.
LogReader(std::string_view filename,
const Configuration *replay_configuration = nullptr);
- LogReader(const std::vector<std::string> &filenames,
- const Configuration *replay_configuration = nullptr);
- LogReader(const std::vector<std::vector<std::string>> &filenames,
- const Configuration *replay_configuration = nullptr);
- LogReader(const std::vector<LogFile> &log_files,
+ LogReader(std::vector<LogFile> log_files,
const Configuration *replay_configuration = nullptr);
~LogReader();
@@ -450,7 +436,7 @@
: logged_configuration()->nodes()->size();
}
- const std::vector<std::vector<std::string>> filenames_;
+ const std::vector<LogFile> log_files_;
// This is *a* log file header used to provide the logged config. The rest of
// the header is likely distracting.
@@ -466,14 +452,15 @@
// State per node.
class State {
public:
- State(std::unique_ptr<ChannelMerger> channel_merger);
+ State(std::unique_ptr<TimestampMapper> timestamp_mapper);
+
+ // Connects up the timestamp mappers.
+ void AddPeer(State *peer);
// Returns the timestamps, channel_index, and message from a channel.
// update_time (will be) set to true when popping this message causes the
// filter to change the time offset estimation function.
- std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>>
- PopOldest(bool *update_time);
+ TimestampedMessage PopOldest(bool *update_time);
// Returns the monotonic time of the oldest message.
monotonic_clock::time_point OldestMessageTime() const;
@@ -484,10 +471,12 @@
// Returns the starting time for this node.
monotonic_clock::time_point monotonic_start_time() const {
- return channel_merger_->monotonic_start_time();
+ return timestamp_mapper_ ? timestamp_mapper_->monotonic_start_time()
+ : monotonic_clock::min_time;
}
realtime_clock::time_point realtime_start_time() const {
- return channel_merger_->realtime_start_time();
+ return timestamp_mapper_ ? timestamp_mapper_->realtime_start_time()
+ : realtime_clock::min_time;
}
// Sets the node event loop factory for replaying into a
@@ -555,10 +544,6 @@
return node_event_loop_factory_->monotonic_now();
}
- // Sets the node we will be merging as, and returns true if there is any
- // data on it.
- bool SetNode() { return channel_merger_->SetNode(event_loop_->node()); }
-
// Sets the number of channels.
void SetChannelCount(size_t count);
@@ -570,7 +555,9 @@
State *source_state);
// Returns if we have read all the messages from all the logs.
- bool at_end() const { return channel_merger_->at_end(); }
+ bool at_end() const {
+ return timestamp_mapper_ ? timestamp_mapper_->Front() == nullptr : true;
+ }
// Unregisters everything so we can destory the event loop.
void Deregister();
@@ -586,8 +573,7 @@
}
// Sends a buffer on the provided channel index.
- bool Send(size_t channel_index, const void *data, size_t size,
- const TimestampMerger::DeliveryTimestamp &delivery_timestamp);
+ bool Send(const TimestampedMessage ×tamped_message);
// Returns a debug string for the channel merger.
std::string DebugString() const {
@@ -599,22 +585,24 @@
<< "]: " << std::get<0>(message).monotonic_event_time << " "
<< configuration::StrippedChannelToString(
event_loop_->configuration()->channels()->Get(
- std::get<2>(message).message().channel_index()))
+ std::get<0>(message).channel_index))
<< "\n";
} else if (i == 7) {
messages << "...\n";
}
++i;
}
- return messages.str() + channel_merger_->DebugString();
+ if (!timestamp_mapper_) {
+ return messages.str();
+ }
+ return messages.str() + timestamp_mapper_->DebugString();
}
private:
// Log file.
- std::unique_ptr<ChannelMerger> channel_merger_;
+ std::unique_ptr<TimestampMapper> timestamp_mapper_;
- std::deque<std::tuple<TimestampMerger::DeliveryTimestamp, int,
- SizePrefixedFlatbufferVector<MessageHeader>,
+ std::deque<std::tuple<TimestampedMessage,
message_bridge::NoncausalOffsetEstimator *>>
sorted_messages_;
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 2dc02ef..a6385d3 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -279,8 +279,7 @@
// Even though it doesn't make any difference here, exercise the logic for
// passing in a separate config.
- LogReader reader(std::vector<std::string>{logfile0, logfile1},
- &config_.message());
+ LogReader reader(SortParts({logfile0, logfile1}), &config_.message());
// Confirm that we can remap logged channels to point to new buses.
reader.RemapLoggedChannel<aos::examples::Ping>("/test", "/original");
@@ -763,7 +762,7 @@
"/pi2/aos", "aos.message_bridge.Timestamp", 190)));
}
- LogReader reader(structured_logfiles_);
+ LogReader reader(SortParts(logfiles_));
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -939,7 +938,8 @@
}
)");
- EXPECT_DEATH(LogReader(structured_logfiles_, &extra_nodes_config.message()),
+ const std::vector<LogFile> sorted_parts = SortParts(logfiles_);
+ EXPECT_DEATH(LogReader(sorted_parts, &extra_nodes_config.message()),
"Log file and replay config need to have matching nodes lists.");
}
@@ -961,7 +961,7 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(structured_logfiles_);
+ LogReader reader(SortParts(logfiles_));
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -1101,7 +1101,7 @@
event_loop_factory_.RunFor(chrono::milliseconds(400));
}
- LogReader reader(structured_logfiles_);
+ LogReader reader(SortParts(logfiles_));
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
@@ -1319,7 +1319,7 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(structured_logfiles_);
+ LogReader reader(SortParts(logfiles_));
// Remap just on pi1.
reader.RemapLoggedChannel<aos::timing::Report>(
@@ -1385,7 +1385,7 @@
event_loop_factory_.RunFor(chrono::milliseconds(20000));
}
- LogReader reader(structured_logfiles_);
+ LogReader reader(SortParts(logfiles_));
SimulatedEventLoopFactory log_reader_factory(reader.configuration());
log_reader_factory.set_send_delay(chrono::microseconds(0));
diff --git a/aos/flatbuffer_merge.h b/aos/flatbuffer_merge.h
index d98a9eb..0c09b22 100644
--- a/aos/flatbuffer_merge.h
+++ b/aos/flatbuffer_merge.h
@@ -50,6 +50,15 @@
MergeFlatBuffers<T>(&fb1.message(), &fb2.message()));
}
+template <class T>
+inline flatbuffers::Offset<T> MergeFlatBuffers(
+ const aos::Flatbuffer<T> &fb1, const aos::Flatbuffer<T> &fb2,
+ flatbuffers::FlatBufferBuilder *fbb) {
+ return MergeFlatBuffers<T>(
+ reinterpret_cast<const flatbuffers::Table *>(&fb1.message()),
+ reinterpret_cast<const flatbuffers::Table *>(&fb2.message()), fbb);
+}
+
// Copies a flatbuffer by walking the tree and copying all the pieces. This
// converts DAGs to trees.
template <class T>
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 81a8cee..b751a4e 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -117,6 +117,9 @@
void Wipe() { memset(span().data(), 0, span().size()); }
bool Verify() const {
+ if (span().size() < 4u) {
+ return false;
+ }
flatbuffers::Verifier v(span().data(), span().size());
return v.VerifyTable(&message());
}
@@ -477,6 +480,37 @@
ResizeableBuffer data_;
};
+// Non-owning Span backed flatbuffer.
+template <typename T>
+class SizePrefixedFlatbufferSpan : public SizePrefixedFlatbuffer<T> {
+ public:
+ // Builds a flatbuffer pointing to the contents of a span.
+ SizePrefixedFlatbufferSpan(const absl::Span<const uint8_t> data)
+ : data_(data) {}
+ // Builds a Flatbuffer pointing to the contents of another flatbuffer.
+ SizePrefixedFlatbufferSpan(const SizePrefixedFlatbuffer<T> &other) {
+ data_ = other.span();
+ }
+
+ // Points to the data in the other flatbuffer.
+ SizePrefixedFlatbufferSpan &operator=(
+ const SizePrefixedFlatbuffer<T> &other) {
+ data_ = other.span();
+ return *this;
+ }
+
+ ~SizePrefixedFlatbufferSpan() override {}
+
+ absl::Span<uint8_t> span() override {
+ LOG(FATAL) << "Unimplemented";
+ return absl::Span<uint8_t>(nullptr, 0);
+ }
+ absl::Span<const uint8_t> span() const override { return data_; }
+
+ private:
+ absl::Span<const uint8_t> data_;
+};
+
inline flatbuffers::DetachedBuffer CopySpanAsDetachedBuffer(
absl::Span<const uint8_t> span) {
// Copy the data from the span.
diff --git a/aos/flatbuffers_test.cc b/aos/flatbuffers_test.cc
new file mode 100644
index 0000000..e3030f1
--- /dev/null
+++ b/aos/flatbuffers_test.cc
@@ -0,0 +1,25 @@
+#include "aos/flatbuffers.h"
+
+#include "gtest/gtest.h"
+
+#include "aos/json_to_flatbuffer.h"
+#include "aos/json_to_flatbuffer_generated.h"
+
+namespace aos {
+namespace testing {
+
+// Tests that Verify works.
+TEST(FlatbufferTest, Verify) {
+ FlatbufferDetachedBuffer<Configuration> fb =
+ JsonToFlatbuffer<Configuration>("{}");
+ FlatbufferSpan<Configuration> fb_span(fb);
+ EXPECT_TRUE(fb.Verify());
+ EXPECT_TRUE(fb_span.Verify());
+
+ // Now confirm it works on an empty flatbuffer.
+ FlatbufferSpan<Configuration> empty(absl::Span<const uint8_t>(nullptr, 0));
+ EXPECT_FALSE(empty.Verify());
+}
+
+} // namespace testing
+} // namespace aos
diff --git a/aos/json_tokenizer.cc b/aos/json_tokenizer.cc
index a3d804e..9403daa 100644
--- a/aos/json_tokenizer.cc
+++ b/aos/json_tokenizer.cc
@@ -270,7 +270,8 @@
ConsumeWhitespace();
if (!Consume(":")) {
- fprintf(stderr, "Error on line %d\n", linenumber_);
+ fprintf(stderr, "Error on line %d, expected ':', got '%c'\n",
+ linenumber_, Char());
return TokenType::kError;
}
diff --git a/aos/starter/starterd_lib.cc b/aos/starter/starterd_lib.cc
index ea7fdee..9d74089 100644
--- a/aos/starter/starterd_lib.cc
+++ b/aos/starter/starterd_lib.cc
@@ -406,8 +406,22 @@
if (config_msg_->has_applications()) {
const flatbuffers::Vector<flatbuffers::Offset<aos::Application>>
*applications = config_msg_->applications();
- for (const aos::Application *application : *applications) {
- AddApplication(application);
+
+ if (aos::configuration::MultiNode(config_msg_)) {
+ std::string_view current_node = event_loop_.node()->name()->string_view();
+ for (const aos::Application *application : *applications) {
+ CHECK(application->has_nodes());
+ for (const flatbuffers::String *node : *application->nodes()) {
+ if (node->string_view() == current_node) {
+ AddApplication(application);
+ break;
+ }
+ }
+ }
+ } else {
+ for (const aos::Application *application : *applications) {
+ AddApplication(application);
+ }
}
}
}
diff --git a/tools/bazel b/tools/bazel
index 36e88ae..21bb619 100755
--- a/tools/bazel
+++ b/tools/bazel
@@ -24,7 +24,7 @@
exec "${BAZEL_OVERRIDE}" "$@"
fi
-readonly VERSION="4.0.0rc2-202011211956+37a429ad12"
+readonly VERSION="4.0.0rc2-202012022031+a3c94ec2ed"
readonly DOWNLOAD_DIR="${HOME}/.cache/bazel"
# Directory to unpack bazel into. This must change whenever bazel changes.
diff --git a/y2020/BUILD b/y2020/BUILD
index 4643ae8..59642fe 100644
--- a/y2020/BUILD
+++ b/y2020/BUILD
@@ -210,6 +210,7 @@
src = "y2020_roborio.json",
flatbuffers = [
":setpoint_fbs",
+ "//aos/events/logging:logger_fbs",
"//aos/network:message_bridge_client_fbs",
"//aos/network:message_bridge_server_fbs",
"//aos/network:timestamp_fbs",
diff --git a/y2020/y2020_pi_template.json b/y2020/y2020_pi_template.json
index 4b04c6c..b504599 100644
--- a/y2020/y2020_pi_template.json
+++ b/y2020/y2020_pi_template.json
@@ -36,12 +36,16 @@
"source_node": "pi{{ NUM }}",
"frequency": 10,
"num_senders": 2,
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_nodes": ["roborio"],
"max_size": 200,
"destination_nodes": [
{
"name": "roborio",
"priority": 1,
- "time_to_live": 5000000
+ "time_to_live": 5000000,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["roborio"]
}
]
},
@@ -84,6 +88,28 @@
"max_size": 2000000
}
],
+ "applications": [
+ {
+ "name": "message_bridge_client",
+ "executable_name": "message_bridge_client.stripped",
+ "nodes": ["pi{{ NUM }}"]
+ },
+ {
+ "name": "message_bridge_server",
+ "executable_name": "message_bridge_server.stripped",
+ "nodes": ["pi{{ NUM }}"]
+ },
+ {
+ "name": "web_proxy",
+ "executable_name": "web_proxy_main.stripped",
+ "nodes": ["pi{{ NUM }}"]
+ },
+ {
+ "name": "camera_reader",
+ "executable_name": "camera_reader.stripped",
+ "nodes": ["pi{{ NUM }}"]
+ }
+ ],
"maps": [
{
"match": {
diff --git a/y2020/y2020_roborio.json b/y2020/y2020_roborio.json
index 0ab9664..703d560 100644
--- a/y2020/y2020_roborio.json
+++ b/y2020/y2020_roborio.json
@@ -11,33 +11,7 @@
"name": "/roborio/aos",
"type": "aos.RobotState",
"source_node": "roborio",
- "frequency": 200,
- "destination_nodes": [
- {
- "name": "pi1",
- "priority": 2,
- "timestamp_logger": "LOCAL_LOGGER",
- "time_to_live": 10000000
- },
- {
- "name": "pi2",
- "priority": 2,
- "timestamp_logger": "LOCAL_LOGGER",
- "time_to_live": 10000000
- },
- {
- "name": "pi3",
- "priority": 2,
- "timestamp_logger": "LOCAL_LOGGER",
- "time_to_live": 10000000
- },
- {
- "name": "pi4",
- "priority": 2,
- "timestamp_logger": "LOCAL_LOGGER",
- "time_to_live": 10000000
- }
- ]
+ "frequency": 200
},
{
"name": "/roborio/aos",
@@ -69,6 +43,30 @@
"num_senders": 2
},
{
+ "name": "/roborio/aos/remote_timestamps/pi1",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "roborio"
+ },
+ {
+ "name": "/roborio/aos/remote_timestamps/pi2",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "roborio"
+ },
+ {
+ "name": "/roborio/aos/remote_timestamps/pi3",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "roborio"
+ },
+ {
+ "name": "/roborio/aos/remote_timestamps/pi4",
+ "type": "aos.logger.MessageHeader",
+ "logger": "NOT_LOGGED",
+ "source_node": "roborio"
+ },
+ {
"name": "/roborio/aos",
"type": "aos.message_bridge.Timestamp",
"source_node": "roborio",
@@ -79,21 +77,29 @@
{
"name": "pi1",
"priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["roborio"],
"time_to_live": 5000000
},
{
"name": "pi2",
"priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["roborio"],
"time_to_live": 5000000
},
{
"name": "pi3",
"priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["roborio"],
"time_to_live": 5000000
},
{
"name": "pi4",
"priority": 1,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["roborio"],
"time_to_live": 5000000
}
]
@@ -177,16 +183,29 @@
{
"name": "pi1",
"priority": 5,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["roborio"],
"time_to_live": 5000000
},
{
"name": "pi2",
"priority": 5,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["roborio"],
"time_to_live": 5000000
},
{
"name": "pi3",
"priority": 5,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["roborio"],
+ "time_to_live": 5000000
+ },
+ {
+ "name": "pi4",
+ "priority": 5,
+ "timestamp_logger": "LOCAL_AND_REMOTE_LOGGER",
+ "timestamp_logger_nodes": ["roborio"],
"time_to_live": 5000000
}
]
@@ -240,10 +259,44 @@
],
"applications": [
{
- "name": "drivetrain"
+ "name": "drivetrain",
+ "executable_name": "drivetrain.stripped",
+ "nodes": ["roborio"]
},
{
- "name": "camera_reader"
+ "name": "superstructure",
+ "executable_name": "superstructure.stripped",
+ "nodes": ["roborio"]
+ },
+ {
+ "name": "joystick_reader",
+ "executable_name": "joystick_reader.stripped",
+ "nodes": ["roborio"]
+ },
+ {
+ "name": "wpilib_interface",
+ "executable_name": "wpilib_interface.stripped",
+ "nodes": ["roborio"]
+ },
+ {
+ "name": "autonomous_action",
+ "executable_name": "autonomous_action.stripped",
+ "nodes": ["roborio"]
+ },
+ {
+ "name": "message_bridge_client",
+ "executable_name": "message_bridge_client.stripped",
+ "nodes": ["roborio"]
+ },
+ {
+ "name": "message_bridge_server",
+ "executable_name": "message_bridge_server.stripped",
+ "nodes": ["roborio"]
+ },
+ {
+ "name": "logger",
+ "executable_name": "logger_main.stripped",
+ "nodes": ["roborio"]
}
],
"maps": [