Add PeekMessage/ConsumeMessage to SpanReader

We have log files with duplicate headers on the front of them.
Considering they have useful data, add a flag which enables support for
detecting and recovering.

Change-Id: I30a85c7023b71dfa8ecce63fb8288565d5a01737
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index f008ac6..f32b337 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -325,7 +325,7 @@
   }
 }
 
-absl::Span<const uint8_t> SpanReader::ReadMessage() {
+absl::Span<const uint8_t> SpanReader::PeekMessage() {
   // Make sure we have enough for the size.
   if (data_.size() - consumed_data_ < sizeof(flatbuffers::uoffset_t)) {
     if (!ReadBlock()) {
@@ -355,11 +355,23 @@
   // And return it, consuming the data.
   const uint8_t *data_ptr = data_.data() + consumed_data_;
 
-  consumed_data_ += data_size;
-
   return absl::Span<const uint8_t>(data_ptr, data_size);
 }
 
+void SpanReader::ConsumeMessage() {
+  consumed_data_ +=
+      flatbuffers::GetPrefixedSize(data_.data() + consumed_data_) +
+      sizeof(flatbuffers::uoffset_t);
+}
+
+absl::Span<const uint8_t> SpanReader::ReadMessage() {
+  absl::Span<const uint8_t> result = PeekMessage();
+  if (result != absl::Span<const uint8_t>()) {
+    ConsumeMessage();
+  }
+  return result;
+}
+
 bool SpanReader::ReadBlock() {
   // This is the amount of data we grab at a time. Doing larger chunks minimizes
   // syscalls and helps decompressors batch things more efficiently.
diff --git a/aos/events/logging/logfile_utils.h b/aos/events/logging/logfile_utils.h
index 4bb481d..ee13a7c 100644
--- a/aos/events/logging/logfile_utils.h
+++ b/aos/events/logging/logfile_utils.h
@@ -211,10 +211,19 @@
 
   std::string_view filename() const { return filename_; }
 
-  // Returns a span with the data for a message from the log file, excluding
-  // the size.
+  // Returns a span with the data for the next message from the log file,
+  // including the size.  The result is only guarenteed to be valid until
+  // ReadMessage() or PeekMessage() is called again.
   absl::Span<const uint8_t> ReadMessage();
 
+  // Returns a span with the data for the next message without consuming it.
+  // Multiple calls to PeekMessage return the same data.  ReadMessage or
+  // ConsumeMessage must be called to get the next message.
+  absl::Span<const uint8_t> PeekMessage();
+  // Consumes the message so the next call to ReadMessage or PeekMessage returns
+  // new data.  This does not invalidate the data.
+  void ConsumeMessage();
+
  private:
   // TODO(austin): Optimization:
   //   Allocate the 256k blocks like we do today.  But, refcount them with
diff --git a/aos/events/logging/logfile_utils_test.cc b/aos/events/logging/logfile_utils_test.cc
index 90a716e..4973d02 100644
--- a/aos/events/logging/logfile_utils_test.cc
+++ b/aos/events/logging/logfile_utils_test.cc
@@ -45,8 +45,11 @@
   SpanReader reader(logfile);
 
   EXPECT_EQ(reader.filename(), logfile);
+  EXPECT_EQ(reader.PeekMessage(), m1.span());
+  EXPECT_EQ(reader.PeekMessage(), m1.span());
   EXPECT_EQ(reader.ReadMessage(), m1.span());
   EXPECT_EQ(reader.ReadMessage(), m2.span());
+  EXPECT_EQ(reader.PeekMessage(), absl::Span<const uint8_t>());
   EXPECT_EQ(reader.ReadMessage(), absl::Span<const uint8_t>());
 }