Read all timestamps into RAM where possible when reading logs

There are cases where we fail to buffer far enough in the future to
solve the timestamp problem, or where we end up choosing to buffer until
the end of time to solve the timestamp problem.  These both result in
log reading failures either through CHECKs or OOMs.

Timestamps are tiny.  The existence of timestamp_extractor proves that
we can store the whole timestamp problem in memory relatively easily.
When timestamps are stored in separate files, let's just load them at
the start.

This also adds flags to force this behavior on and off.  When a log with
data and timestamps mixed in it is found, and the flag forces it on, we
will read the data files twice to extract the timestamps the first time.

I'd like to add more tests to logfile_utils_test to test this all
explicitly, but the multinode_logger tests appear to do a really good
job already.

Change-Id: I38e23836afa980e3e3a839125e78e132066e2c90
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index a0abcd8..ea56d8d 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -779,7 +779,10 @@
   LogFilesContainer log_files(parts);
   ASSERT_EQ(parts.size(), 1u);
 
-  PartsMerger merger("pi1", 0, log_files);
+  PartsMerger merger(
+      log_files.SelectParts("pi1", 0,
+                            {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
+                             StoredDataType::REMOTE_TIMESTAMPS}));
 
   EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
 
@@ -881,7 +884,10 @@
   LogFilesContainer log_files(parts);
   ASSERT_EQ(parts.size(), 1u);
 
-  PartsMerger merger("pi1", 0, log_files);
+  PartsMerger merger(
+      log_files.SelectParts("pi1", 0,
+                            {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
+                             StoredDataType::REMOTE_TIMESTAMPS}));
 
   EXPECT_EQ(merger.sorted_until(), monotonic_clock::min_time);
 
@@ -949,11 +955,13 @@
 
   size_t mapper0_count = 0;
 
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1094,13 +1102,15 @@
   // messages due to the channel filter callbacks used
   size_t mapper0_count = 0;
 
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   mapper0.set_replay_channels_callback(
       [&](const TimestampedMessage &) -> bool { return mapper0_count != 2; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
   mapper1.set_replay_channels_callback(
@@ -1231,11 +1241,13 @@
   ASSERT_EQ(parts.size(), 1u);
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1353,11 +1365,13 @@
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1473,11 +1487,13 @@
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1557,11 +1573,13 @@
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1642,11 +1660,13 @@
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1717,11 +1737,13 @@
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1781,7 +1803,8 @@
   LogFilesContainer log_files(parts);
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
 
@@ -1823,7 +1846,8 @@
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -1897,11 +1921,13 @@
   ASSERT_EQ(parts[1].logger_node, "pi2");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -2154,7 +2180,9 @@
   ASSERT_EQ(parts.size(), 1u);
   ASSERT_EQ(parts[0].parts.size(), 2u);
 
-  BootMerger merger("pi2", log_files);
+  BootMerger merger("pi2", log_files,
+                    {StoredDataType::DATA, StoredDataType::TIMESTAMPS,
+                     StoredDataType::REMOTE_TIMESTAMPS});
 
   EXPECT_EQ(merger.node(), 1u);
 
@@ -2338,11 +2366,13 @@
   ASSERT_EQ(parts[0].logger_node, "pi1");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });
 
@@ -2557,11 +2587,13 @@
   ASSERT_EQ(parts[0].logger_node, "pi1");
 
   size_t mapper0_count = 0;
-  TimestampMapper mapper0("pi1", log_files);
+  TimestampMapper mapper0("pi1", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper0.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper0_count; });
   size_t mapper1_count = 0;
-  TimestampMapper mapper1("pi2", log_files);
+  TimestampMapper mapper1("pi2", log_files,
+                          TimestampQueueStrategy::kQueueTogether);
   mapper1.set_timestamp_callback(
       [&](TimestampedMessage *) { ++mapper1_count; });