Compress the remote timestamp matching queue
The queue_index_map_ was saving a timestamp for every message ever sent.
This is used to match the queue index that a message was replayed with
to the queue index that is put back in the RemoteMessage to be
re-logged. This ends up being pretty expensive in terms of memory and
wasn't being cleaned out.
Change-Id: I1d675dec2d782e457304b6f443ebd6c0eac77789
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 9da790a..fbea092 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1876,7 +1876,7 @@
if (remote_timestamp_sender != nullptr) {
source_state->queue_index_map_[logged_channel_index] =
- std::make_unique<std::vector<State::SentTimestamp>>();
+ std::make_unique<std::vector<State::ContiguousSentTimestamp>>();
}
}
@@ -1888,35 +1888,39 @@
uint32_t remote_queue_index = 0xffffffff;
if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
- std::vector<SentTimestamp> *queue_index_map = CHECK_NOTNULL(
+ std::vector<ContiguousSentTimestamp> *queue_index_map = CHECK_NOTNULL(
CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index])
->queue_index_map_[timestamped_message.channel_index]
.get());
- SentTimestamp search;
+ struct SentTimestamp {
+ monotonic_clock::time_point monotonic_event_time;
+ uint32_t queue_index;
+ } search;
+
search.monotonic_event_time = timestamped_message.monotonic_remote_time;
- search.realtime_event_time = timestamped_message.realtime_remote_time;
search.queue_index = timestamped_message.remote_queue_index;
// Find the sent time if available.
auto element = std::lower_bound(
queue_index_map->begin(), queue_index_map->end(), search,
- [](SentTimestamp a, SentTimestamp b) {
- if (b.monotonic_event_time < a.monotonic_event_time) {
- return false;
- }
- if (b.monotonic_event_time > a.monotonic_event_time) {
+ [](ContiguousSentTimestamp a, SentTimestamp b) {
+ if (a.ending_monotonic_event_time < b.monotonic_event_time) {
return true;
}
-
- if (b.queue_index < a.queue_index) {
+ if (a.starting_monotonic_event_time > b.monotonic_event_time) {
return false;
}
- if (b.queue_index > a.queue_index) {
+
+ if (a.ending_queue_index < b.queue_index) {
return true;
}
+ if (a.starting_queue_index >= b.queue_index) {
+ return false;
+ }
- CHECK_EQ(a.realtime_event_time, b.realtime_event_time);
+ // If it isn't clearly below or above, it is below. Since we return
+ // the last element <, this will return a match.
return false;
});
@@ -1925,13 +1929,20 @@
// other node isn't done yet. So there is no send time, but there is a
// receive time.
if (element != queue_index_map->end()) {
- CHECK_EQ(element->monotonic_event_time,
- timestamped_message.monotonic_remote_time);
- CHECK_EQ(element->realtime_event_time,
- timestamped_message.realtime_remote_time);
- CHECK_EQ(element->queue_index, timestamped_message.remote_queue_index);
+ CHECK_GE(timestamped_message.monotonic_remote_time,
+ element->starting_monotonic_event_time);
+ CHECK_LE(timestamped_message.monotonic_remote_time,
+ element->ending_monotonic_event_time);
+ CHECK_GE(timestamped_message.remote_queue_index,
+ element->starting_queue_index);
+ CHECK_LE(timestamped_message.remote_queue_index,
+ element->ending_queue_index);
- remote_queue_index = element->actual_queue_index;
+ remote_queue_index = timestamped_message.remote_queue_index +
+ element->actual_queue_index -
+ element->starting_queue_index;
+ } else {
+ VLOG(1) << "No timestamp match in the map.";
}
}
@@ -1945,13 +1956,44 @@
if (!sent) return false;
if (queue_index_map_[timestamped_message.channel_index]) {
- SentTimestamp timestamp;
- timestamp.monotonic_event_time = timestamped_message.monotonic_event_time;
- timestamp.realtime_event_time = timestamped_message.realtime_event_time;
- timestamp.queue_index = timestamped_message.queue_index;
- timestamp.actual_queue_index = sender->sent_queue_index();
- queue_index_map_[timestamped_message.channel_index]->emplace_back(
- timestamp);
+ if (queue_index_map_[timestamped_message.channel_index]->empty()) {
+ // Nothing here, start a range with 0 length.
+ ContiguousSentTimestamp timestamp;
+ timestamp.starting_monotonic_event_time =
+ timestamp.ending_monotonic_event_time =
+ timestamped_message.monotonic_event_time;
+ timestamp.starting_queue_index = timestamp.ending_queue_index =
+ timestamped_message.queue_index;
+ timestamp.actual_queue_index = sender->sent_queue_index();
+ queue_index_map_[timestamped_message.channel_index]->emplace_back(
+ timestamp);
+ } else {
+ // We've got something. See if the next timestamp is still contiguous. If
+ // so, grow it.
+ ContiguousSentTimestamp *back =
+ &queue_index_map_[timestamped_message.channel_index]->back();
+ if ((back->starting_queue_index - back->actual_queue_index) ==
+ (timestamped_message.queue_index - sender->sent_queue_index())) {
+ back->ending_queue_index = timestamped_message.queue_index;
+ back->ending_monotonic_event_time =
+ timestamped_message.monotonic_event_time;
+ } else {
+ // Otherwise, make a new one.
+ ContiguousSentTimestamp timestamp;
+ timestamp.starting_monotonic_event_time =
+ timestamp.ending_monotonic_event_time =
+ timestamped_message.monotonic_event_time;
+ timestamp.starting_queue_index = timestamp.ending_queue_index =
+ timestamped_message.queue_index;
+ timestamp.actual_queue_index = sender->sent_queue_index();
+ queue_index_map_[timestamped_message.channel_index]->emplace_back(
+ timestamp);
+ }
+ }
+
+ // TODO(austin): Should we prune the map? On a many day log, I only saw the
+ // queue index diverge a couple of elements, which would be a very small
+ // map.
} else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
nullptr) {
flatbuffers::FlatBufferBuilder fbb;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e3f5380..9ec4011 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -688,23 +688,34 @@
// sending out MessageHeaders.
std::vector<int> factory_channel_index_;
- struct SentTimestamp {
- monotonic_clock::time_point monotonic_event_time =
+ struct ContiguousSentTimestamp {
+ // Most timestamps make it through the network, so it saves a ton of
+ // memory and CPU to store the start and end, and search for valid ranges.
+ // For one of the logs I looked at, we had 2 ranges for 4 days.
+ //
+ // Save monotonic times as well to help if a queue index ever wraps. Odds
+ // are very low, but doesn't hurt.
+ //
+ // The starting time and matching queue index.
+ monotonic_clock::time_point starting_monotonic_event_time =
monotonic_clock::min_time;
- realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
- uint32_t queue_index = 0xffffffff;
+ uint32_t starting_queue_index = 0xffffffff;
- // The queue index that this message *actually* was sent with.
+ // Ending time and queue index.
+ monotonic_clock::time_point ending_monotonic_event_time =
+ monotonic_clock::max_time;
+ uint32_t ending_queue_index = 0xffffffff;
+
+ // The queue index that the first message was *actually* sent with. The
+ // queue indices are assumed to be contiguous through this range.
uint32_t actual_queue_index = 0xffffffff;
};
// Stores all the timestamps that have been sent on this channel. This is
// only done for channels which are forwarded and on the node which
- // initially sends the message.
- //
- // TODO(austin): This whole concept is a hack. We should be able to
- // associate state with the message as it gets sorted and recover it.
- std::vector<std::unique_ptr<std::vector<SentTimestamp>>> queue_index_map_;
+ // initially sends the message. Compress using ranges and offsets.
+ std::vector<std::unique_ptr<std::vector<ContiguousSentTimestamp>>>
+ queue_index_map_;
// Factory (if we are in sim) that this loop was created on.
NodeEventLoopFactory *node_event_loop_factory_ = nullptr;