Add PartsMessageReader to read part files

This abstracts the multi-part reading logic out to simplify the new log
sorting code.

Change-Id: I83f673fea9f3b9b584eec58bd111da76b2fc2d85
diff --git a/aos/events/logging/buffer_encoder.h b/aos/events/logging/buffer_encoder.h
index cc16fea..1eddd00 100644
--- a/aos/events/logging/buffer_encoder.h
+++ b/aos/events/logging/buffer_encoder.h
@@ -56,6 +56,11 @@
 // and queues it up as is.
 class DummyEncoder final : public DetachedBufferEncoder {
  public:
+  DummyEncoder() {}
+  DummyEncoder(const DummyEncoder &) = delete;
+  DummyEncoder(DummyEncoder &&other) = delete;
+  DummyEncoder &operator=(const DummyEncoder &) = delete;
+  DummyEncoder &operator=(DummyEncoder &&other) = delete;
   ~DummyEncoder() override = default;
 
   // No encoding happens, the raw data is queued up as is.
@@ -90,6 +95,10 @@
 class DummyDecoder final : public DataDecoder {
  public:
   explicit DummyDecoder(std::string_view filename);
+  DummyDecoder(const DummyDecoder &) = delete;
+  DummyDecoder(DummyDecoder &&other) = delete;
+  DummyDecoder &operator=(const DummyDecoder &) = delete;
+  DummyDecoder &operator=(DummyDecoder &&other) = delete;
   ~DummyDecoder() override;
 
   size_t Read(uint8_t *begin, uint8_t *end) final;
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 908e5e1..6b7a598 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -431,6 +431,32 @@
   return std::move(result);
 }
 
+PartsMessageReader::PartsMessageReader(LogParts log_parts)
+    : parts_(std::move(log_parts)), message_reader_(parts_.parts[0]) {}
+
+std::optional<FlatbufferVector<MessageHeader>>
+PartsMessageReader::ReadMessage() {
+  while (!done_) {
+    std::optional<FlatbufferVector<MessageHeader>> message =
+        message_reader_.ReadMessage();
+    if (message) {
+      newest_timestamp_ = message_reader_.newest_timestamp();
+      return message;
+    }
+    NextLog();
+  }
+  return std::nullopt;
+}
+
+void PartsMessageReader::NextLog() {
+  if (next_part_index_ == parts_.parts.size()) {
+    done_ = true;
+    return;
+  }
+  message_reader_ = MessageReader(parts_.parts[next_part_index_]);
+  ++next_part_index_;
+}
+
 SplitMessageReader::SplitMessageReader(
     const std::vector<std::string> &filenames)
     : filenames_(filenames),
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 60bd39c..12d7cac 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -18,6 +18,7 @@
 #include "aos/containers/resizeable_buffer.h"
 #include "aos/events/event_loop.h"
 #include "aos/events/logging/buffer_encoder.h"
+#include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/logger_generated.h"
 #include "aos/flatbuffers.h"
 #include "flatbuffers/flatbuffers.h"
@@ -206,7 +207,7 @@
   // Reads a chunk of data into data_.  Returns false if no data was read.
   bool ReadBlock();
 
-  const std::string filename_;
+  std::string filename_;
 
   // File reader and data decoder.
   std::unique_ptr<DataDecoder> decoder_;
@@ -270,6 +271,42 @@
   monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
 };
 
+// A class to seamlessly read messages from a list of part files.
+class PartsMessageReader {
+ public:
+  PartsMessageReader(LogParts log_parts);
+
+  std::string_view filename() const { return message_reader_.filename(); }
+
+  // Returns the minimum amount of data needed to queue up for sorting before
+  // we are guarenteed to not see data out of order.
+  std::chrono::nanoseconds max_out_of_order_duration() const {
+    return message_reader_.max_out_of_order_duration();
+  }
+
+  // Returns the newest timestamp read out of the log file.
+  monotonic_clock::time_point newest_timestamp() const {
+    return newest_timestamp_;
+  }
+
+  // Returns the next message if there is one, or nullopt if we have reached the
+  // end of all the files.
+  // Note: reading the next message may change the max_out_of_order_duration().
+  std::optional<FlatbufferVector<MessageHeader>> ReadMessage();
+
+ private:
+  // Opens the next log and updates message_reader_.  Sets done_ if there is
+  // nothing more to do.
+  void NextLog();
+
+  const LogParts parts_;
+  size_t next_part_index_ = 1u;
+  bool done_ = false;
+  MessageReader message_reader_;
+
+  monotonic_clock::time_point newest_timestamp_ = monotonic_clock::min_time;
+};
+
 class TimestampMerger;
 
 // A design requirement is that the relevant data for a channel is not more than
@@ -369,7 +406,7 @@
   monotonic_clock::time_point time_to_queue() const { return time_to_queue_; }
 
   // Returns the minimum amount of data needed to queue up for sorting before
-  // ware guarenteed to not see data out of order.
+  // we are guarenteed to not see data out of order.
   std::chrono::nanoseconds max_out_of_order_duration() const {
     return message_reader_->max_out_of_order_duration();
   }
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index f6412c3..57d1d9d 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -3,6 +3,7 @@
 #include <chrono>
 #include <string>
 
+#include "aos/events/logging/logfile_sorting.h"
 #include "aos/events/logging/test_message_generated.h"
 #include "aos/flatbuffers.h"
 #include "aos/json_to_flatbuffer.h"
@@ -90,6 +91,87 @@
   EXPECT_FALSE(reader.ReadMessage());
 }
 
+// Tests that we can transparently re-assemble part files with a
+// PartsMessageReader.
+TEST(PartsMessageReaderTest, ReadWrite) {
+  const std::string logfile0 = aos::testing::TestTmpDir() + "/log0.bfbs";
+  const std::string logfile1 = aos::testing::TestTmpDir() + "/log1.bfbs";
+  unlink(logfile0.c_str());
+  unlink(logfile1.c_str());
+
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config0 =
+      JsonToSizedFlatbuffer<LogFileHeader>(
+          R"({
+  "max_out_of_order_duration": 100000000,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
+  "parts_index": 0
+})");
+  const aos::SizePrefixedFlatbufferDetachedBuffer<LogFileHeader> config1 =
+      JsonToSizedFlatbuffer<LogFileHeader>(
+          R"({
+  "max_out_of_order_duration": 200000000,
+  "monotonic_start_time": 0,
+  "realtime_start_time": 0,
+  "log_event_uuid": "30ef1283-81d7-4004-8c36-1c162dbcb2b2",
+  "parts_uuid": "2a05d725-5d5c-4c0b-af42-88de2f3c3876",
+  "parts_index": 1
+})");
+
+  const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m1 =
+      JsonToSizedFlatbuffer<MessageHeader>(
+          R"({ "channel_index": 0, "monotonic_sent_time": 1 })");
+  const aos::SizePrefixedFlatbufferDetachedBuffer<MessageHeader> m2 =
+      JsonToSizedFlatbuffer<MessageHeader>(
+          R"({ "channel_index": 0, "monotonic_sent_time": 2 })");
+
+  {
+    DetachedBufferWriter writer(logfile0, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config0.full_span());
+    writer.QueueSpan(m1.full_span());
+  }
+  {
+    DetachedBufferWriter writer(logfile1, std::make_unique<DummyEncoder>());
+    writer.QueueSpan(config1.full_span());
+    writer.QueueSpan(m2.full_span());
+  }
+
+  const std::vector<LogFile> parts = SortParts({logfile0, logfile1});
+
+  PartsMessageReader reader(parts[0].parts[0]);
+
+  EXPECT_EQ(reader.filename(), logfile0);
+
+  // Confirm that the timestamps track, and the filename also updates.
+  // Read the first message.
+  EXPECT_EQ(reader.newest_timestamp(), monotonic_clock::min_time);
+  EXPECT_EQ(
+      reader.max_out_of_order_duration(),
+      std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
+  EXPECT_TRUE(reader.ReadMessage());
+  EXPECT_EQ(reader.filename(), logfile0);
+  EXPECT_EQ(reader.newest_timestamp(),
+            monotonic_clock::time_point(chrono::nanoseconds(1)));
+  EXPECT_EQ(
+      reader.max_out_of_order_duration(),
+      std::chrono::nanoseconds(config0.message().max_out_of_order_duration()));
+
+  // Read the second message.
+  EXPECT_TRUE(reader.ReadMessage());
+  EXPECT_EQ(reader.filename(), logfile1);
+  EXPECT_EQ(reader.newest_timestamp(),
+            monotonic_clock::time_point(chrono::nanoseconds(2)));
+  EXPECT_EQ(
+      reader.max_out_of_order_duration(),
+      std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
+
+  // And then confirm that reading again returns no message.
+  EXPECT_FALSE(reader.ReadMessage());
+  EXPECT_EQ(reader.filename(), logfile1);
+  EXPECT_EQ(
+      reader.max_out_of_order_duration(),
+      std::chrono::nanoseconds(config1.message().max_out_of_order_duration()));
+}
 }  // namespace testing
 }  // namespace logger
 }  // namespace aos
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index d7080a9..8caca1b 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -16,6 +16,10 @@
  public:
   // Initializes the LZMA stream and encoder.
   explicit LzmaEncoder(uint32_t compression_preset);
+  LzmaEncoder(const LzmaEncoder &) = delete;
+  LzmaEncoder(LzmaEncoder &&other) = delete;
+  LzmaEncoder &operator=(const LzmaEncoder &) = delete;
+  LzmaEncoder &operator=(LzmaEncoder &&other) = delete;
   // Gracefully shuts down the encoder.
   ~LzmaEncoder() final;
 
@@ -45,6 +49,10 @@
 class LzmaDecoder final : public DataDecoder {
  public:
   explicit LzmaDecoder(std::string_view filename);
+  LzmaDecoder(const LzmaDecoder &) = delete;
+  LzmaDecoder(LzmaDecoder &&other) = delete;
+  LzmaDecoder &operator=(const LzmaDecoder &) = delete;
+  LzmaDecoder &operator=(LzmaDecoder &&other) = delete;
   ~LzmaDecoder();
 
   size_t Read(uint8_t *begin, uint8_t *end) final;