Add TimestampMapper to match timestamps with data
This finishes the main sorting code for
https://docs.google.com/document/d/1RZ6ZlADRUHmwiFOOmXA87FHPLFuN-7mS7tbFCwguZDE/edit#
This creates an object to buffer sorted data in, and to buffer data to
match every time we find a timestamp. Currently, there is no timing out
of the data buffered, but only the data that *could* be forwarded is
buffered.
We also have a bunch of restrictions here to simplify the logic. The
plan is to relax them as we run into them rather than solve everything
at the start. And, we can then add tests a lot better.
Change-Id: Idbc515e5594bc031139c7b994aaa71826ff68c0a
diff --git a/aos/events/logging/logfile_sorting.cc b/aos/events/logging/logfile_sorting.cc
index 08a230c..168230e 100644
--- a/aos/events/logging/logfile_sorting.cc
+++ b/aos/events/logging/logfile_sorting.cc
@@ -257,6 +257,33 @@
return result;
}
+std::vector<std::string> FindNodes(const std::vector<LogFile> &parts) {
+ std::set<std::string> nodes;
+ for (const LogFile &log_file : parts) {
+ for (const LogParts& part : log_file.parts) {
+ nodes.insert(part.node);
+ }
+ }
+ std::vector<std::string> node_list;
+ while (!nodes.empty()) {
+ node_list.emplace_back(std::move(nodes.extract(nodes.begin()).value()));
+ }
+ return node_list;
+}
+
+std::vector<LogParts> FilterPartsForNode(const std::vector<LogFile> &parts,
+ std::string_view node) {
+ std::vector<LogParts> result;
+ for (const LogFile &log_file : parts) {
+ for (const LogParts& part : log_file.parts) {
+ if (part.node == node) {
+ result.emplace_back(part);
+ }
+ }
+ }
+ return result;
+}
+
std::ostream &operator<<(std::ostream &stream, const LogFile &file) {
stream << "{";
if (!file.log_event_uuid.empty()) {
diff --git a/aos/events/logging/logfile_sorting.h b/aos/events/logging/logfile_sorting.h
index 0d2a0fb..94cb771 100644
--- a/aos/events/logging/logfile_sorting.h
+++ b/aos/events/logging/logfile_sorting.h
@@ -56,6 +56,12 @@
// Takes a bunch of parts and sorts them based on part_uuid and part_index.
std::vector<LogFile> SortParts(const std::vector<std::string> &parts);
+// Finds all the nodes which have parts logged from their point of view.
+std::vector<std::string> FindNodes(const std::vector<LogFile> &parts);
+// Finds all the parts which are from the point of view of a single node.
+std::vector<LogParts> FilterPartsForNode(const std::vector<LogFile> &parts,
+ std::string_view node);
+
} // namespace logger
} // namespace aos
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 4474c7e..2886a8e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -490,11 +490,36 @@
std::ostream &operator<<(std::ostream &os, const Message &m) {
os << "{.channel_index=" << m.channel_index
- << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp
- << ", .data="
- << aos::FlatbufferToJson(m.data,
- {.multi_line = false, .max_vector_size = 1})
- << "}";
+ << ", .queue_index=" << m.queue_index << ", .timestamp=" << m.timestamp;
+ if (m.data.Verify()) {
+ os << ", .data="
+ << aos::FlatbufferToJson(m.data,
+ {.multi_line = false, .max_vector_size = 1});
+ }
+ os << "}";
+ return os;
+}
+
+std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m) {
+ os << "{.channel_index=" << m.channel_index
+ << ", .queue_index=" << m.queue_index
+ << ", .monotonic_event_time=" << m.monotonic_event_time
+ << ", .realtime_event_time=" << m.realtime_event_time;
+ if (m.remote_queue_index != 0xffffffff) {
+ os << ", .remote_queue_index=" << m.remote_queue_index;
+ }
+ if (m.monotonic_remote_time != monotonic_clock::min_time) {
+ os << ", .monotonic_remote_time=" << m.monotonic_remote_time;
+ }
+ if (m.realtime_remote_time != realtime_clock::min_time) {
+ os << ", .realtime_remote_time=" << m.realtime_remote_time;
+ }
+ if (m.data.Verify()) {
+ os << ", .data="
+ << aos::FlatbufferToJson(m.data,
+ {.multi_line = false, .max_vector_size = 1});
+ }
+ os << "}";
return os;
}
@@ -559,8 +584,28 @@
return ss.str();
}
-NodeMerger::NodeMerger(std::vector<std::unique_ptr<LogPartsSorter>> parts)
- : parts_sorters_(std::move(parts)) {}
+NodeMerger::NodeMerger(std::vector<LogParts> parts) {
+ CHECK_GE(parts.size(), 1u);
+ const std::string part0_node = parts[0].node;
+ for (size_t i = 1; i < parts.size(); ++i) {
+ CHECK_EQ(part0_node, parts[i].node) << ": Can't merge different nodes.";
+ }
+ for (LogParts &part : parts) {
+ parts_sorters_.emplace_back(std::move(part));
+ }
+
+ node_ = configuration::GetNodeIndex(log_file_header()->configuration(),
+ part0_node);
+
+ monotonic_start_time_ = monotonic_clock::max_time;
+ realtime_start_time_ = realtime_clock::max_time;
+ for (const LogPartsSorter &parts_sorter : parts_sorters_) {
+ if (parts_sorter.monotonic_start_time() < monotonic_start_time_) {
+ monotonic_start_time_ = parts_sorter.monotonic_start_time();
+ realtime_start_time_ = parts_sorter.realtime_start_time();
+ }
+ }
+}
Message *NodeMerger::Front() {
// Return the current Front if we have one, otherwise go compute one.
@@ -572,23 +617,23 @@
// duplicates.
Message *oldest = nullptr;
sorted_until_ = monotonic_clock::max_time;
- for (std::unique_ptr<LogPartsSorter> &parts_sorter : parts_sorters_) {
- Message *m = parts_sorter->Front();
+ for (LogPartsSorter &parts_sorter : parts_sorters_) {
+ Message *m = parts_sorter.Front();
if (!m) {
- sorted_until_ = std::min(sorted_until_, parts_sorter->sorted_until());
+ sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
continue;
}
if (oldest == nullptr || *m < *oldest) {
oldest = m;
- current_ = parts_sorter.get();
+ current_ = &parts_sorter;
} else if (*m == *oldest) {
// Found a duplicate. It doesn't matter which one we return. It is
// easiest to just drop the new one.
- parts_sorter->PopFront();
+ parts_sorter.PopFront();
}
// PopFront may change this, so compute it down here.
- sorted_until_ = std::min(sorted_until_, parts_sorter->sorted_until());
+ sorted_until_ = std::min(sorted_until_, parts_sorter.sorted_until());
}
// Return the oldest message found. This will be nullptr if nothing was
@@ -602,6 +647,306 @@
current_ = nullptr;
}
+TimestampMapper::TimestampMapper(std::vector<LogParts> parts)
+ : node_merger_(std::move(parts)),
+ node_(node_merger_.node()),
+ message_{.channel_index = 0xffffffff,
+ .queue_index = 0xffffffff,
+ .monotonic_event_time = monotonic_clock::min_time,
+ .realtime_event_time = realtime_clock::min_time,
+ .remote_queue_index = 0xffffffff,
+ .monotonic_remote_time = monotonic_clock::min_time,
+ .realtime_remote_time = realtime_clock::min_time,
+ .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()} {
+ const Configuration *config = log_file_header()->configuration();
+ // Only fill out nodes_data_ if there are nodes. Otherwise everything gets
+ // pretty simple.
+ if (configuration::MultiNode(config)) {
+ nodes_data_.resize(config->nodes()->size());
+ const Node *my_node = config->nodes()->Get(node());
+ for (size_t node_index = 0; node_index < nodes_data_.size(); ++node_index) {
+ const Node *node = config->nodes()->Get(node_index);
+ NodeData *node_data = &nodes_data_[node_index];
+ node_data->channels.resize(config->channels()->size());
+ // We should save the channel if it is delivered to the node represented
+ // by the NodeData, but not sent by that node. That combo means it is
+ // forwarded.
+ size_t channel_index = 0;
+ node_data->any_delivered = false;
+ for (const Channel *channel : *config->channels()) {
+ node_data->channels[channel_index].delivered =
+ configuration::ChannelIsReadableOnNode(channel, node) &&
+ configuration::ChannelIsSendableOnNode(channel, my_node);
+ node_data->any_delivered = node_data->any_delivered ||
+ node_data->channels[channel_index].delivered;
+ ++channel_index;
+ }
+ }
+
+ for (const Channel *channel : *config->channels()) {
+ source_node_.emplace_back(configuration::GetNodeIndex(
+ config, channel->source_node()->string_view()));
+ }
+ }
+}
+
+void TimestampMapper::AddPeer(TimestampMapper *timestamp_mapper) {
+ CHECK(configuration::MultiNode(log_file_header()->configuration()));
+ CHECK_NE(timestamp_mapper->node(), node());
+ CHECK_LT(timestamp_mapper->node(), nodes_data_.size());
+
+ NodeData *node_data = &nodes_data_[timestamp_mapper->node()];
+ // Only set it if this node delivers to the peer timestamp_mapper. Otherwise
+ // we could needlessly save data.
+ if (node_data->any_delivered) {
+ LOG(INFO) << "Registering on node " << node() << " for peer node "
+ << timestamp_mapper->node();
+ CHECK(timestamp_mapper->nodes_data_[node()].peer == nullptr);
+
+ timestamp_mapper->nodes_data_[node()].peer = this;
+ }
+}
+
+void TimestampMapper::FillMessage(Message *m) {
+ message_ = {
+ .channel_index = m->channel_index,
+ .queue_index = m->queue_index,
+ .monotonic_event_time = m->timestamp,
+ .realtime_event_time = aos::realtime_clock::time_point(
+ std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+ .remote_queue_index = 0xffffffff,
+ .monotonic_remote_time = monotonic_clock::min_time,
+ .realtime_remote_time = realtime_clock::min_time,
+ .data = std::move(m->data)};
+}
+
+TimestampedMessage *TimestampMapper::Front() {
+ // No need to fetch anything new. A previous message still exists.
+ switch (first_message_) {
+ case FirstMessage::kNeedsUpdate:
+ break;
+ case FirstMessage::kInMessage:
+ return &message_;
+ case FirstMessage::kNullptr:
+ return nullptr;
+ }
+
+ if (nodes_data_.empty()) {
+ // Simple path. We are single node, so there are no timestamps to match!
+ CHECK_EQ(messages_.size(), 0u);
+ Message *m = node_merger_.Front();
+ if (!m) {
+ first_message_ = FirstMessage::kNullptr;
+ return nullptr;
+ }
+ // Fill in message_ so we have a place to associate remote timestamps, and
+ // return it.
+ FillMessage(m);
+
+ CHECK_GE(message_.monotonic_event_time, last_message_time_);
+ last_message_time_ = message_.monotonic_event_time;
+ first_message_ = FirstMessage::kInMessage;
+ return &message_;
+ }
+
+ // We need to only add messages to the list so they get processed for messages
+ // which are delivered. Reuse the flow below which uses messages_ by just
+ // adding the new message to messages_ and continuing.
+ if (messages_.empty()) {
+ if (!Queue()) {
+ // Found nothing to add, we are out of data!
+ first_message_ = FirstMessage::kNullptr;
+ return nullptr;
+ }
+
+ // Now that it has been added (and cannibalized), forget about it upstream.
+ node_merger_.PopFront();
+ }
+
+ Message *m = &(messages_.front());
+
+ if (source_node_[m->channel_index] == node()) {
+ // From us, just forward it on, filling the remote data in as invalid.
+ FillMessage(m);
+ CHECK_GE(message_.monotonic_event_time, last_message_time_);
+ last_message_time_ = message_.monotonic_event_time;
+ first_message_ = FirstMessage::kInMessage;
+ return &message_;
+ } else {
+ // Got a timestamp, find the matching remote data, match it, and return it.
+ Message data = MatchingMessageFor(*m);
+
+ // Return the data from the remote. The local message only has timestamp
+ // info which isn't relevant anymore once extracted.
+ message_ = {
+ .channel_index = m->channel_index,
+ .queue_index = m->queue_index,
+ .monotonic_event_time = m->timestamp,
+ .realtime_event_time = aos::realtime_clock::time_point(
+ std::chrono::nanoseconds(m->data.message().realtime_sent_time())),
+ .remote_queue_index = m->data.message().remote_queue_index(),
+ .monotonic_remote_time =
+ monotonic_clock::time_point(std::chrono::nanoseconds(
+ m->data.message().monotonic_remote_time())),
+ .realtime_remote_time = realtime_clock::time_point(
+ std::chrono::nanoseconds(m->data.message().realtime_remote_time())),
+ .data = std::move(data.data)};
+ CHECK_GE(message_.monotonic_event_time, last_message_time_);
+ last_message_time_ = message_.monotonic_event_time;
+ first_message_ = FirstMessage::kInMessage;
+ return &message_;
+ }
+}
+
+void TimestampMapper::PopFront() {
+ CHECK(first_message_ != FirstMessage::kNeedsUpdate);
+ first_message_ = FirstMessage::kNeedsUpdate;
+
+ if (nodes_data_.empty()) {
+ // We are thin wrapper around node_merger. Call it directly.
+ node_merger_.PopFront();
+ } else {
+ // Since messages_ holds the data, drop it.
+ messages_.pop_front();
+ }
+}
+
+Message TimestampMapper::MatchingMessageFor(const Message &message) {
+ TimestampMapper *peer =
+ CHECK_NOTNULL(nodes_data_[source_node_[message.channel_index]].peer);
+ // The queue which will have the matching data, if available.
+ std::deque<Message> *data_queue =
+ &peer->nodes_data_[node()].channels[message.channel_index].messages;
+
+ // Figure out what queue index we are looking for.
+ CHECK(message.data.message().has_remote_queue_index());
+ const uint32_t remote_queue_index =
+ message.data.message().remote_queue_index();
+
+ CHECK(message.data.message().has_monotonic_remote_time());
+ CHECK(message.data.message().has_realtime_remote_time());
+
+ const monotonic_clock::time_point monotonic_remote_time(
+ std::chrono::nanoseconds(message.data.message().monotonic_remote_time()));
+ const realtime_clock::time_point realtime_remote_time(
+ std::chrono::nanoseconds(message.data.message().realtime_remote_time()));
+
+ peer->QueueUntil(monotonic_remote_time);
+
+ if (data_queue->empty()) {
+ return Message{
+ .channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ }
+
+ // The algorithm below is constant time with some assumptions. We need there
+ // to be no missing messages in the data stream. This also assumes a queue
+ // hasn't wrapped. That is conservative, but should let us get started.
+ //
+ // TODO(austin): We can break these assumptions pretty easily once we have a
+ // need.
+ CHECK_EQ(
+ data_queue->back().queue_index - data_queue->front().queue_index + 1u,
+ data_queue->size());
+
+ if (remote_queue_index < data_queue->front().queue_index ||
+ remote_queue_index > data_queue->back().queue_index) {
+ return Message{
+ .channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ }
+
+ // Pull the data out and confirm that the timestamps match as expected.
+ Message result = std::move(
+ (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
+ CHECK_EQ(result.timestamp, monotonic_remote_time)
+ << ": Queue index matches, but timestamp doesn't. Please investigate!";
+ CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
+ result.data.message().realtime_sent_time())),
+ realtime_remote_time)
+ << ": Queue index matches, but timestamp doesn't. Please investigate!";
+ // Now drop the data off the front. We have deduplicated timestamps, so we
+ // are done. And all the data is in order.
+ data_queue->erase(data_queue->begin(),
+ data_queue->begin() + (1 + remote_queue_index -
+ data_queue->front().queue_index));
+ return result;
+}
+
+void TimestampMapper::QueueUntil(monotonic_clock::time_point t) {
+ if (queued_until_ > t) {
+ return;
+ }
+ while (true) {
+ if (!messages_.empty() && messages_.back().timestamp > t) {
+ queued_until_ = std::max(queued_until_, messages_.back().timestamp);
+ return;
+ }
+
+ if (!Queue()) {
+ // Found nothing to add, we are out of data!
+ queued_until_ = monotonic_clock::max_time;
+ return;
+ }
+
+ // Now that it has been added (and cannibalized), forget about it upstream.
+ node_merger_.PopFront();
+ }
+}
+
+bool TimestampMapper::Queue() {
+ Message *m = node_merger_.Front();
+ if (m == nullptr) {
+ return false;
+ }
+ for (NodeData &node_data : nodes_data_) {
+ if (!node_data.any_delivered) continue;
+ if (node_data.channels[m->channel_index].delivered) {
+ // TODO(austin): This copies the data... Probably not worth stressing
+ // about yet.
+ // TODO(austin): Bound how big this can get. We tend not to send massive
+ // data, so we can probably ignore this for a bit.
+ node_data.channels[m->channel_index].messages.emplace_back(*m);
+ }
+ }
+
+ messages_.emplace_back(std::move(*m));
+ return true;
+}
+
+std::string TimestampMapper::DebugString() const {
+ std::stringstream ss;
+ ss << "node " << node() << " [\n";
+ for (const Message &message : messages_) {
+ ss << " " << message << "\n";
+ }
+ ss << "] queued_until " << queued_until_;
+ for (const NodeData &ns : nodes_data_) {
+ if (ns.peer == nullptr) continue;
+ ss << "\nnode " << ns.peer->node() << " remote_data [\n";
+ size_t channel_index = 0;
+ for (const NodeData::ChannelData &channel_data :
+ ns.peer->nodes_data_[node()].channels) {
+ if (channel_data.messages.empty()) {
+ continue;
+ }
+
+ ss << " channel " << channel_index << " [\n";
+ for (const Message &m : channel_data.messages) {
+ ss << " " << m << "\n";
+ }
+ ss << " ]\n";
+ ++channel_index;
+ }
+ ss << "] queued_until " << ns.peer->queued_until_;
+ }
+ return ss.str();
+}
+
SplitMessageReader::SplitMessageReader(
const std::vector<std::string> &filenames)
: filenames_(filenames),
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index d0afdc9..4a19209 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -281,6 +281,9 @@
std::string_view filename() const { return message_reader_.filename(); }
+ // Returns the LogParts that holds the filenames we are reading.
+ const LogParts &parts() const { return parts_; }
+
const LogFileHeader *log_file_header() const {
return message_reader_.log_file_header();
}
@@ -332,6 +335,25 @@
std::ostream &operator<<(std::ostream &os, const Message &m);
+// Structure to hold a full message and all the timestamps, which may or may not
+// have been sent from a remote node. The remote_queue_index will be invalid if
+// this message is from the point of view of the node which sent it.
+struct TimestampedMessage {
+ uint32_t channel_index = 0xffffffff;
+
+ uint32_t queue_index = 0xffffffff;
+ monotonic_clock::time_point monotonic_event_time = monotonic_clock::min_time;
+ realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
+
+ uint32_t remote_queue_index = 0xffffffff;
+ monotonic_clock::time_point monotonic_remote_time = monotonic_clock::min_time;
+ realtime_clock::time_point realtime_remote_time = realtime_clock::min_time;
+
+ SizePrefixedFlatbufferVector<MessageHeader> data;
+};
+
+std::ostream &operator<<(std::ostream &os, const TimestampedMessage &m);
+
// Class to sort the resulting messages from a PartsMessageReader.
class LogPartsSorter {
public:
@@ -346,6 +368,13 @@
return parts_message_reader_.log_file_header();
}
+ monotonic_clock::time_point monotonic_start_time() const {
+ return parts_message_reader_.parts().monotonic_start_time;
+ }
+ realtime_clock::time_point realtime_start_time() const {
+ return parts_message_reader_.parts().realtime_start_time;
+ }
+
// The time this data is sorted until.
monotonic_clock::time_point sorted_until() const { return sorted_until_; }
@@ -374,12 +403,22 @@
// instances.
class NodeMerger {
public:
- NodeMerger(std::vector<std::unique_ptr<LogPartsSorter>> parts);
+ NodeMerger(std::vector<LogParts> parts);
+
+ // Node index in the configuration of this node.
+ int node() const { return node_; }
// The log file header for one of the log files.
const LogFileHeader *log_file_header() const {
CHECK(!parts_sorters_.empty());
- return parts_sorters_[0]->log_file_header();
+ return parts_sorters_[0].log_file_header();
+ }
+
+ monotonic_clock::time_point monotonic_start_time() const {
+ return monotonic_start_time_;
+ }
+ realtime_clock::time_point realtime_start_time() const {
+ return realtime_start_time_;
}
// The time this data is sorted until.
@@ -394,12 +433,143 @@
private:
// Unsorted list of all parts sorters.
- std::vector<std::unique_ptr<LogPartsSorter>> parts_sorters_;
+ std::vector<LogPartsSorter> parts_sorters_;
// Pointer to the parts sorter holding the current Front message if one
// exists, or nullptr if a new one needs to be found.
LogPartsSorter *current_ = nullptr;
// Cached sorted_until value.
aos::monotonic_clock::time_point sorted_until_ = monotonic_clock::min_time;
+
+ // Cached node.
+ int node_;
+
+ realtime_clock::time_point realtime_start_time_ = realtime_clock::max_time;
+ monotonic_clock::time_point monotonic_start_time_ = monotonic_clock::max_time;
+};
+
+// Class to match timestamps with the corresponding data from other nodes.
+class TimestampMapper {
+ public:
+ TimestampMapper(std::vector<LogParts> file);
+
+ // Copying and moving will mess up the internal raw pointers. Just don't do
+ // it.
+ TimestampMapper(TimestampMapper const &) = delete;
+ TimestampMapper(TimestampMapper &&) = delete;
+ void operator=(TimestampMapper const &) = delete;
+ void operator=(TimestampMapper &&) = delete;
+
+ // TODO(austin): It would be super helpful to provide a way to queue up to
+ // time X without matching timestamps, and to then be able to pull the
+ // timestamps out of this queue. This lets us bootstrap time estimation
+ // without exploding memory usage worst case.
+
+ // Returns a log file header for this node.
+ const LogFileHeader *log_file_header() const {
+ return node_merger_.log_file_header();
+ }
+
+ // Returns which node this is sorting for.
+ size_t node() const { return node_; }
+
+ // The start time of this log.
+ monotonic_clock::time_point monotonic_start_time() const {
+ return node_merger_.monotonic_start_time();
+ }
+ realtime_clock::time_point realtime_start_time() const {
+ return node_merger_.realtime_start_time();
+ }
+
+ // Uses timestamp_mapper as the peer for its node. Only one mapper may be set
+ // for each node. Peers are used to look up the data for timestamps on this
+ // node.
+ void AddPeer(TimestampMapper *timestamp_mapper);
+
+ // Time that we are sorted until internally.
+ monotonic_clock::time_point sorted_until() const {
+ return node_merger_.sorted_until();
+ }
+
+ // Returns the next message for this node.
+ TimestampedMessage *Front();
+ // Pops the next message. Front must be called first.
+ void PopFront();
+
+ // Returns debug information about this node.
+ std::string DebugString() const;
+
+ private:
+ // The state for a remote node. This holds the data that needs to be matched
+ // with the remote node's timestamps.
+ struct NodeData {
+ // True if we should save data here. This should be true if any of the
+ // bools in delivered below are true.
+ bool any_delivered = false;
+
+ // Peer pointer. This node is only to be considered if a peer is set.
+ TimestampMapper *peer = nullptr;
+
+ struct ChannelData {
+ // Deque per channel. This contains the data from the outside
+ // TimestampMapper node which is relevant for the node this NodeData
+ // points to.
+ std::deque<Message> messages;
+ // Bool tracking per channel if a message is delivered to the node this
+ // NodeData represents.
+ bool delivered = false;
+ };
+
+ // Vector with per channel data.
+ std::vector<ChannelData> channels;
+ };
+
+ // Returns (and forgets about) the data for the provided timestamp message
+ // showing when it was delivered to this node.
+ Message MatchingMessageFor(const Message &message);
+
+ // Queues up a single message into our message queue, and any nodes that this
+ // message is delivered to. Returns true if one was available, false
+ // otherwise.
+ bool Queue();
+
+ // Queues up data until we have at least one message >= to time t.
+ // Useful for triggering a remote node to read enough data to have the
+ // timestamp you care about available.
+ void QueueUntil(monotonic_clock::time_point t);
+
+ // Fills message_ with the contents of m.
+ void FillMessage(Message *m);
+
+ // The node merger to source messages from.
+ NodeMerger node_merger_;
+ // Our node.
+ const size_t node_;
+ // The buffer of messages for this node. These are not matched with any
+ // remote data.
+ std::deque<Message> messages_;
+ // The node index for the source node for each channel.
+ std::vector<size_t> source_node_;
+
+ // Vector per node. Not all nodes will have anything.
+ std::vector<NodeData> nodes_data_;
+
+ // Latest message to return.
+ TimestampedMessage message_;
+
+ // Tracks if the first message points to message_, nullptr (all done), or is
+ // invalid.
+ enum class FirstMessage {
+ kNeedsUpdate,
+ kInMessage,
+ kNullptr,
+ };
+ FirstMessage first_message_ = FirstMessage::kNeedsUpdate;
+
+ // Timestamp of the last message returned. Used to make sure nothing goes
+ // backwards.
+ monotonic_clock::time_point last_message_time_ = monotonic_clock::min_time;
+ // Time this node is queued up until. Used for caching.
+ monotonic_clock::time_point queued_until_ = monotonic_clock::min_time;
};
class TimestampMerger;
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 94ae25b..3bed06d 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -320,7 +320,7 @@
"logger_node": {
"name": "pi1"
},
- "monotonic_start_time": 0,
+ "monotonic_start_time": 1000000,
"realtime_start_time": 1000000000000,
"log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
"parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
@@ -336,14 +336,47 @@
"logger_node": {
"name": "pi1"
},
+ "monotonic_start_time": 1000000,
+ "realtime_start_time": 1000000000000,
+ "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+ "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
+ "parts_index": 0
+})")),
+ config2_(MakeHeader(config_,
+ R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi2"
+ },
+ "logger_node": {
+ "name": "pi2"
+ },
"monotonic_start_time": 0,
"realtime_start_time": 1000000000000,
- "log_event_uuid": "d4724d35-a6c6-4a30-8a94-d192f4c18260",
- "parts_uuid": "bafe9f8e-7dea-4bd9-95f5-3d8390e49208",
+ "log_event_uuid": "cb89a1ce-c4b6-4747-a647-051f09ac888c",
+ "parts_uuid": "e6bff6c6-757f-4675-90d8-3bfb642870e6",
+ "parts_index": 0
+})")),
+ config3_(MakeHeader(config_,
+ R"({
+ /* 100ms */
+ "max_out_of_order_duration": 100000000,
+ "node": {
+ "name": "pi1"
+ },
+ "logger_node": {
+ "name": "pi1"
+ },
+ "monotonic_start_time": 2000000,
+ "realtime_start_time": 1000000000,
+ "log_event_uuid": "cb26b86a-473e-4f74-8403-50eb92ed60ad",
+ "parts_uuid": "1f098701-949f-4392-81f9-be463e2d7bd4",
"parts_index": 0
})")) {
unlink(logfile0_.c_str());
unlink(logfile1_.c_str());
+ unlink(logfile2_.c_str());
queue_index_.resize(kChannels);
}
@@ -407,10 +440,13 @@
const std::string logfile0_ = aos::testing::TestTmpDir() + "/log0.bfbs";
const std::string logfile1_ = aos::testing::TestTmpDir() + "/log1.bfbs";
+ const std::string logfile2_ = aos::testing::TestTmpDir() + "/log2.bfbs";
const aos::FlatbufferDetachedBuffer<Configuration> config_;
const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0_;
const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config2_;
+ const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config3_;
std::vector<uint32_t> queue_index_;
};
@@ -418,6 +454,7 @@
using LogPartsSorterTest = SortingElementTest;
using LogPartsSorterDeathTest = LogPartsSorterTest;
using NodeMergerTest = SortingElementTest;
+using TimestampMapperTest = SortingElementTest;
// Tests that we can pull messages out of a log sorted in order.
TEST_F(LogPartsSorterTest, Pull) {
@@ -538,15 +575,9 @@
}
const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
- ASSERT_EQ(parts.size(), 2u);
+ ASSERT_EQ(parts.size(), 1u);
- std::vector<std::unique_ptr<LogPartsSorter>> parts_sorters;
- parts_sorters.emplace_back(
- std::make_unique<LogPartsSorter>(parts[0].parts[0]));
- parts_sorters.emplace_back(
- std::make_unique<LogPartsSorter>(parts[1].parts[0]));
-
- NodeMerger merger(std::move(parts_sorters));
+ NodeMerger merger(FilterPartsForNode(parts, "pi1"));
EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
@@ -595,6 +626,437 @@
EXPECT_EQ(output[5].timestamp, e + chrono::milliseconds(3002));
}
+// Tests that we can match timestamps on delivered messages.
+TEST_F(TimestampMapperTest, ReadNode0First) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), e + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+ EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
+ EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
+ EXPECT_TRUE(output0[2].data.Verify());
+ }
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ }
+}
+
+// Tests that we can match timestamps on delivered messages. By doing this in
+// the reverse order, the second node needs to queue data up from the first node
+// to find the matching timestamp.
+TEST_F(TimestampMapperTest, ReadNode1First) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ }
+
+ {
+ std::deque<TimestampedMessage> output0;
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() != nullptr);
+ output0.emplace_back(std::move(*mapper0.Front()));
+ mapper0.PopFront();
+ EXPECT_EQ(mapper0.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper0.Front() == nullptr);
+
+ EXPECT_EQ(output0[0].monotonic_event_time, e + chrono::milliseconds(1000));
+ EXPECT_TRUE(output0[0].data.Verify());
+ EXPECT_EQ(output0[1].monotonic_event_time, e + chrono::milliseconds(2000));
+ EXPECT_TRUE(output0[1].data.Verify());
+ EXPECT_EQ(output0[2].monotonic_event_time, e + chrono::milliseconds(3000));
+ EXPECT_TRUE(output0[2].data.Verify());
+ }
+}
+
+// Tests that we return just the timestamps if we couldn't find the data and the
+// missing data was at the beginning of the file.
+TEST_F(TimestampMapperTest, ReadMissingDataBefore) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005);
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_FALSE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ }
+}
+
+// Tests that we return just the timestamps if we couldn't find the data and the
+// missing data was at the end of the file.
+TEST_F(TimestampMapperTest, ReadMissingDataAfter) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x007);
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(1900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(),
+ e + chrono::seconds(100) + chrono::milliseconds(2900));
+
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ EXPECT_EQ(mapper1.sorted_until(), monotonic_clock::max_time);
+
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_FALSE(output1[2].data.Verify());
+ }
+}
+
+// Tests that we properly sort log files with duplicate timestamps.
+TEST_F(TimestampMapperTest, ReadSameTimestamp) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config2_.span());
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(1000), 0, 0x005));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(1000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x006));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(2000), 0, 0x007));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(2000), 0, chrono::seconds(100)));
+
+ writer0.QueueSizedFlatbuffer(
+ MakeLogMessage(e + chrono::milliseconds(3000), 0, 0x008));
+ writer1.QueueSizedFlatbuffer(MakeTimestampMessage(
+ e + chrono::milliseconds(3000), 0, chrono::seconds(100)));
+ }
+
+ const std::vector<LogFile> parts = SortParts({logfile0_, logfile1_});
+
+ ASSERT_EQ(parts[0].logger_node, "pi1");
+ ASSERT_EQ(parts[1].logger_node, "pi2");
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+ TimestampMapper mapper1(FilterPartsForNode(parts, "pi2"));
+
+ mapper0.AddPeer(&mapper1);
+ mapper1.AddPeer(&mapper0);
+
+ {
+ SCOPED_TRACE("Trying node1 now");
+ std::deque<TimestampedMessage> output1;
+
+ for (int i = 0; i < 4; ++i) {
+ ASSERT_TRUE(mapper1.Front() != nullptr);
+ output1.emplace_back(std::move(*mapper1.Front()));
+ mapper1.PopFront();
+ }
+ ASSERT_TRUE(mapper1.Front() == nullptr);
+
+ EXPECT_EQ(output1[0].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(1000));
+ EXPECT_TRUE(output1[0].data.Verify());
+ EXPECT_EQ(output1[1].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[1].data.Verify());
+ EXPECT_EQ(output1[2].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(2000));
+ EXPECT_TRUE(output1[2].data.Verify());
+ EXPECT_EQ(output1[3].monotonic_event_time,
+ e + chrono::seconds(100) + chrono::milliseconds(3000));
+ EXPECT_TRUE(output1[3].data.Verify());
+ }
+}
+
+// Tests that we properly sort log files with duplicate timestamps.
+TEST_F(TimestampMapperTest, StartTime) {
+ const aos::monotonic_clock::time_point e = monotonic_clock::epoch();
+ {
+ DetachedBufferWriter writer0(logfile0_, std::make_unique<DummyEncoder>());
+ writer0.QueueSpan(config0_.span());
+ DetachedBufferWriter writer1(logfile1_, std::make_unique<DummyEncoder>());
+ writer1.QueueSpan(config1_.span());
+ DetachedBufferWriter writer2(logfile2_, std::make_unique<DummyEncoder>());
+ writer2.QueueSpan(config3_.span());
+ }
+
+ const std::vector<LogFile> parts =
+ SortParts({logfile0_, logfile1_, logfile2_});
+
+ TimestampMapper mapper0(FilterPartsForNode(parts, "pi1"));
+
+ EXPECT_EQ(mapper0.monotonic_start_time(), e + chrono::milliseconds(1));
+ EXPECT_EQ(mapper0.realtime_start_time(),
+ realtime_clock::time_point(chrono::seconds(1000)));
+}
+
} // namespace testing
} // namespace logger
} // namespace aos