Add PartsMessageReader to read part files

This abstracts the multi-part reading logic out to simplify the new log
sorting code.

Change-Id: I83f673fea9f3b9b584eec58bd111da76b2fc2d85
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index f6412c3..57d1d9d 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -3,6 +3,7 @@
 #include <chrono>
 #include <string>
 
+#include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/test_message_generated.h"
 #include "aos/flatbuffers.h"
 #include "aos/json_to_flatbuffer.h"
@@ -90,6 +91,87 @@
   EXPECT_FALSE(reader.ReadMessage());
 }
 
+// Tests that we can transparently re-assemble part files with a
+// PartsMessageReader.
+TEST(PartsMessageReaderTest, ReadWrite) {
+  const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
+  const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
+  unlink(logfile0.c_str());
+  unlink(logfile1.c_str());
+
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
+      JsonToSizedFlatbuffer<LogFileHeader>(
+          R"({
+  "max_out_of_order_duration": 100000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
+  "parts_index": 0
+})");
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
+      JsonToSizedFlatbuffer<LogFileHeader>(
+          R"({
+  "max_out_of_order_duration": 200000000,
+  "monotonic_start_time": 0,
+  "realtime_start_time": 0,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
+  "parts_index": 1
+})");
+
+  const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
+      JsonToSizedFlatbuffer<MessageHeader>(
+          R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
+  const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
+      JsonToSizedFlatbuffer<MessageHeader>(
+          R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
+
+  {
+    DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config0.full_span());
+    writer.QueueSpan(m1.full_span());
+  }
+  {
+    DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config1.full_span());
+    writer.QueueSpan(m2.full_span());
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
+
+  PartsMessageReader reader(parts[0].parts[0]);
+
+  EXPECT_EQ(reader.filename(), logfile0);
+
+  // Confirm that the timestamps track, and the filename also updates.
+  // Read the first message.
+  EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
+  EXPECT_EQ(
+      reader.max_out_of_order_duration(),
+      std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
+  EXPECT_TRUE(reader.ReadMessage());
+  EXPECT_EQ(reader.filename(), logfile0);
+  EXPECT_EQ(reader.newest_timestamp(),
+            monotonic_clock::time_point(chrono::nanoseconds(1)));
+  EXPECT_EQ(
+      reader.max_out_of_order_duration(),
+      std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
+
+  // Read the second message.
+  EXPECT_TRUE(reader.ReadMessage());
+  EXPECT_EQ(reader.filename(), logfile1);
+  EXPECT_EQ(reader.newest_timestamp(),
+            monotonic_clock::time_point(chrono::nanoseconds(2)));
+  EXPECT_EQ(
+      reader.max_out_of_order_duration(),
+      std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
+
+  // And then confirm that reading again returns no message.
+  EXPECT_FALSE(reader.ReadMessage());
+  EXPECT_EQ(reader.filename(), logfile1);
+  EXPECT_EQ(
+      reader.max_out_of_order_duration(),
+      std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
+}
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos