Handle replaying logs with dropped forwarded messages
If a -> b, unreliably, we will end up with data log files with
non-contiguous queue indices. Add a test for handling this behavior,
and when it occurs, fall back to a simpler search algorithm.
Change-Id: I4b3e9724e2b527ccde6fd2cdd315eb6ab52fa47c
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 98527dc..3f85cc7 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -863,16 +863,6 @@
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
- // The algorithm below is constant time with some assumptions. We need there
- // to be no missing messages in the data stream. This also assumes a queue
- // hasn't wrapped. That is conservative, but should let us get started.
- //
- // TODO(austin): We can break these assumptions pretty easily once we have a
- // need.
- CHECK_EQ(
- data_queue->back().queue_index - data_queue->front().queue_index + 1u,
- data_queue->size());
-
if (remote_queue_index < data_queue->front().queue_index ||
remote_queue_index > data_queue->back().queue_index) {
return Message{
@@ -882,21 +872,53 @@
.data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
}
- // Pull the data out and confirm that the timestamps match as expected.
- Message result = std::move(
- (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
- CHECK_EQ(result.timestamp, monotonic_remote_time)
- << ": Queue index matches, but timestamp doesn't. Please investigate!";
- CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
- result.data.message().realtime_sent_time())),
- realtime_remote_time)
- << ": Queue index matches, but timestamp doesn't. Please investigate!";
- // Now drop the data off the front. We have deduplicated timestamps, so we
- // are done. And all the data is in order.
- data_queue->erase(data_queue->begin(),
- data_queue->begin() + (1 + remote_queue_index -
- data_queue->front().queue_index));
- return result;
+ // The algorithm below is constant time with some assumptions. We need there
+ // to be no missing messages in the data stream. This also assumes a queue
+ // hasn't wrapped. That is conservative, but should let us get started.
+ if (data_queue->back().queue_index - data_queue->front().queue_index + 1u ==
+ data_queue->size()) {
+ // Pull the data out and confirm that the timestamps match as expected.
+ Message result = std::move(
+ (*data_queue)[remote_queue_index - data_queue->front().queue_index]);
+
+ CHECK_EQ(result.timestamp, monotonic_remote_time)
+ << ": Queue index matches, but timestamp doesn't. Please investigate!";
+ CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
+ result.data.message().realtime_sent_time())),
+ realtime_remote_time)
+ << ": Queue index matches, but timestamp doesn't. Please investigate!";
+ // Now drop the data off the front. We have deduplicated timestamps, so we
+ // are done. And all the data is in order.
+ data_queue->erase(data_queue->begin(),
+ data_queue->begin() + (1 + remote_queue_index -
+ data_queue->front().queue_index));
+ return result;
+ } else {
+ auto it = std::find_if(data_queue->begin(), data_queue->end(),
+ [remote_queue_index](const Message &m) {
+ return m.queue_index == remote_queue_index;
+ });
+ if (it == data_queue->end()) {
+ return Message{
+ .channel_index = message.channel_index,
+ .queue_index = remote_queue_index,
+ .timestamp = monotonic_remote_time,
+ .data = SizePrefixedFlatbufferVector<MessageHeader>::Empty()};
+ }
+
+ Message result = std::move(*it);
+
+ CHECK_EQ(result.timestamp, monotonic_remote_time)
+ << ": Queue index matches, but timestamp doesn't. Please investigate!";
+ CHECK_EQ(realtime_clock::time_point(std::chrono::nanoseconds(
+ result.data.message().realtime_sent_time())),
+ realtime_remote_time)
+ << ": Queue index matches, but timestamp doesn't. Please investigate!";
+
+ data_queue->erase(it);
+
+ return result;
+ }
}
void TimestampMapper::QueueUntil(monotonic_clock::time_point t) {