Compress the remote timestamp matching queue

The queue_index_map_ was saving a timestamp for every message ever sent.
This is used to match the queue index that a message was replayed with
to the queue index that is put back in the RemoteMessage to be
re-logged.  This ends up being pretty expensive in terms of memory and
wasn't being cleaned out.

Change-Id: I1d675dec2d782e457304b6f443ebd6c0eac77789
diff --git a/aos/events/logging/logger.cc b/aos/events/logging/logger.cc
index 9da790a..fbea092 100644
--- a/aos/events/logging/logger.cc
+++ b/aos/events/logging/logger.cc
@@ -1876,7 +1876,7 @@
 
     if (remote_timestamp_sender != nullptr) {
       source_state->queue_index_map_[logged_channel_index] =
-          std::make_unique<std::vector<State::SentTimestamp>>();
+          std::make_unique<std::vector<State::ContiguousSentTimestamp>>();
     }
   }
 
@@ -1888,35 +1888,39 @@
   uint32_t remote_queue_index = 0xffffffff;
 
   if (remote_timestamp_senders_[timestamped_message.channel_index] != nullptr) {
-    std::vector<SentTimestamp> *queue_index_map = CHECK_NOTNULL(
+    std::vector<ContiguousSentTimestamp> *queue_index_map = CHECK_NOTNULL(
         CHECK_NOTNULL(channel_source_state_[timestamped_message.channel_index])
             ->queue_index_map_[timestamped_message.channel_index]
             .get());
 
-    SentTimestamp search;
+    struct SentTimestamp {
+      monotonic_clock::time_point monotonic_event_time;
+      uint32_t queue_index;
+    } search;
+
     search.monotonic_event_time = timestamped_message.monotonic_remote_time;
-    search.realtime_event_time = timestamped_message.realtime_remote_time;
     search.queue_index = timestamped_message.remote_queue_index;
 
     // Find the sent time if available.
     auto element = std::lower_bound(
         queue_index_map->begin(), queue_index_map->end(), search,
-        [](SentTimestamp a, SentTimestamp b) {
-          if (b.monotonic_event_time < a.monotonic_event_time) {
-            return false;
-          }
-          if (b.monotonic_event_time > a.monotonic_event_time) {
+        [](ContiguousSentTimestamp a, SentTimestamp b) {
+          if (a.ending_monotonic_event_time < b.monotonic_event_time) {
             return true;
           }
-
-          if (b.queue_index < a.queue_index) {
+          if (a.starting_monotonic_event_time > b.monotonic_event_time) {
             return false;
           }
-          if (b.queue_index > a.queue_index) {
+
+          if (a.ending_queue_index < b.queue_index) {
             return true;
           }
+          if (a.starting_queue_index >= b.queue_index) {
+            return false;
+          }
 
-          CHECK_EQ(a.realtime_event_time, b.realtime_event_time);
+          // If it isn't clearly below or above, it is below.  Since we return
+          // the last element <, this will return a match.
           return false;
         });
 
@@ -1925,13 +1929,20 @@
     // other node isn't done yet.  So there is no send time, but there is a
     // receive time.
     if (element != queue_index_map->end()) {
-      CHECK_EQ(element->monotonic_event_time,
-               timestamped_message.monotonic_remote_time);
-      CHECK_EQ(element->realtime_event_time,
-               timestamped_message.realtime_remote_time);
-      CHECK_EQ(element->queue_index, timestamped_message.remote_queue_index);
+      CHECK_GE(timestamped_message.monotonic_remote_time,
+               element->starting_monotonic_event_time);
+      CHECK_LE(timestamped_message.monotonic_remote_time,
+               element->ending_monotonic_event_time);
+      CHECK_GE(timestamped_message.remote_queue_index,
+               element->starting_queue_index);
+      CHECK_LE(timestamped_message.remote_queue_index,
+               element->ending_queue_index);
 
-      remote_queue_index = element->actual_queue_index;
+      remote_queue_index = timestamped_message.remote_queue_index +
+                           element->actual_queue_index -
+                           element->starting_queue_index;
+    } else {
+      VLOG(1) << "No timestamp match in the map.";
     }
   }
 
@@ -1945,13 +1956,44 @@
   if (!sent) return false;
 
   if (queue_index_map_[timestamped_message.channel_index]) {
-    SentTimestamp timestamp;
-    timestamp.monotonic_event_time = timestamped_message.monotonic_event_time;
-    timestamp.realtime_event_time = timestamped_message.realtime_event_time;
-    timestamp.queue_index = timestamped_message.queue_index;
-    timestamp.actual_queue_index = sender->sent_queue_index();
-    queue_index_map_[timestamped_message.channel_index]->emplace_back(
-        timestamp);
+    if (queue_index_map_[timestamped_message.channel_index]->empty()) {
+      // Nothing here, start a range with 0 length.
+      ContiguousSentTimestamp timestamp;
+      timestamp.starting_monotonic_event_time =
+          timestamp.ending_monotonic_event_time =
+              timestamped_message.monotonic_event_time;
+      timestamp.starting_queue_index = timestamp.ending_queue_index =
+          timestamped_message.queue_index;
+      timestamp.actual_queue_index = sender->sent_queue_index();
+      queue_index_map_[timestamped_message.channel_index]->emplace_back(
+          timestamp);
+    } else {
+      // We've got something.  See if the next timestamp is still contiguous. If
+      // so, grow it.
+      ContiguousSentTimestamp *back =
+          &queue_index_map_[timestamped_message.channel_index]->back();
+      if ((back->starting_queue_index - back->actual_queue_index) ==
+          (timestamped_message.queue_index - sender->sent_queue_index())) {
+        back->ending_queue_index = timestamped_message.queue_index;
+        back->ending_monotonic_event_time =
+            timestamped_message.monotonic_event_time;
+      } else {
+        // Otherwise, make a new one.
+        ContiguousSentTimestamp timestamp;
+        timestamp.starting_monotonic_event_time =
+            timestamp.ending_monotonic_event_time =
+                timestamped_message.monotonic_event_time;
+        timestamp.starting_queue_index = timestamp.ending_queue_index =
+            timestamped_message.queue_index;
+        timestamp.actual_queue_index = sender->sent_queue_index();
+        queue_index_map_[timestamped_message.channel_index]->emplace_back(
+            timestamp);
+      }
+    }
+
+    // TODO(austin): Should we prune the map?  On a many day log, I only saw the
+    // queue index diverge a couple of elements, which would be a very small
+    // map.
   } else if (remote_timestamp_senders_[timestamped_message.channel_index] !=
              nullptr) {
     flatbuffers::FlatBufferBuilder fbb;
diff --git a/aos/events/logging/logger.h b/aos/events/logging/logger.h
index e3f5380..9ec4011 100644
--- a/aos/events/logging/logger.h
+++ b/aos/events/logging/logger.h
@@ -688,23 +688,34 @@
     // sending out MessageHeaders.
     std::vector<int> factory_channel_index_;
 
-    struct SentTimestamp {
-      monotonic_clock::time_point monotonic_event_time =
+    struct ContiguousSentTimestamp {
+      // Most timestamps make it through the network, so it saves a ton of
+      // memory and CPU to store the start and end, and search for valid ranges.
+      // For one of the logs I looked at, we had 2 ranges for 4 days.
+      //
+      // Save monotonic times as well to help if a queue index ever wraps.  Odds
+      // are very low, but doesn't hurt.
+      //
+      // The starting time and matching queue index.
+      monotonic_clock::time_point starting_monotonic_event_time =
           monotonic_clock::min_time;
-      realtime_clock::time_point realtime_event_time = realtime_clock::min_time;
-      uint32_t queue_index = 0xffffffff;
+      uint32_t starting_queue_index = 0xffffffff;
 
-      // The queue index that this message *actually* was sent with.
+      // Ending time and queue index.
+      monotonic_clock::time_point ending_monotonic_event_time =
+          monotonic_clock::max_time;
+      uint32_t ending_queue_index = 0xffffffff;
+
+      // The queue index that the first message was *actually* sent with.  The
+      // queue indices are assumed to be contiguous through this range.
       uint32_t actual_queue_index = 0xffffffff;
     };
 
     // Stores all the timestamps that have been sent on this channel.  This is
     // only done for channels which are forwarded and on the node which
-    // initially sends the message.
-    //
-    // TODO(austin): This whole concept is a hack.  We should be able to
-    // associate state with the message as it gets sorted and recover it.
-    std::vector<std::unique_ptr<std::vector<SentTimestamp>>> queue_index_map_;
+    // initially sends the message.  Compress using ranges and offsets.
+    std::vector<std::unique_ptr<std::vector<ContiguousSentTimestamp>>>
+        queue_index_map_;
 
     // Factory (if we are in sim) that this loop was created on.
     NodeEventLoopFactory *node_event_loop_factory_ = nullptr;