Support indexing of MCAP files

Tested manually against Foxglove Studio and the `mcap doctor` tool.

Future work is to build the upstream `mcap` tool
(https://github.com/foxglove/mcap/tree/main/go/cli/mcap) which
includes a format validator, so that that can be tested automatically.

Change-Id: I05bb0e7487d76ac95e309545aceffc0304de1f1d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/mcap_logger.h b/aos/util/mcap_logger.h
index 15d11db..dcacb68 100644
--- a/aos/util/mcap_logger.h
+++ b/aos/util/mcap_logger.h
@@ -26,6 +26,8 @@
 
 // Generates an MCAP file, per the specification at
 // https://github.com/foxglove/mcap/tree/main/docs/specification
+// This currently generates an uncompressed logfile with full message indexing
+// available, to be able to support Foxglove fully.
 class McapLogger {
  public:
   McapLogger(EventLoop *event_loop, const std::string &output_path);
@@ -38,30 +40,109 @@
     kSchema = 0x03,
     kChannel = 0x04,
     kMessage = 0x05,
+    kChunk = 0x06,
+    kMessageIndex = 0x07,
+    kChunkIndex = 0x08,
+    kAttachment = 0x09,
+    kAttachmentIndex = 0x0A,
+    kStatistics = 0x0B,
+    kMetadata = 0x0C,
+    kMetadataIndex = 0x0D,
+    kSummaryOffset = 0x0E,
     kDataEnd = 0x0F,
   };
+  // Stores information associated with a SummaryOffset entry (an offset to the
+  // start of a section within Summary section, which allows readers to quickly
+  // find all the indices/channel definitions/etc. for a given log).
+  struct SummaryOffset {
+    OpCode op_code;
+    // Offset from the start of the file.
+    uint64_t offset;
+    // Total length of the section, in bytes.
+    uint64_t size;
+  };
+  // Information needed to build a ChunkIndex entry.
+  struct ChunkIndex {
+    // Earliest and latest message times within the Chunk being referenced.
+    aos::monotonic_clock::time_point start_time;
+    aos::monotonic_clock::time_point end_time;
+    // Offset from the start of the file to the start of the relevant Chunk.
+    uint64_t offset;
+    // Total size of the Chunk, in bytes.
+    uint64_t chunk_size;
+    // Total size of the records portion of the Chunk, in bytes.
+    uint64_t records_size;
+    // Mapping of channel IDs to the MessageIndex entry for that channel within
+    // the referenced Chunk. The MessageIndex is referenced by an offset from
+    // the start of the file.
+    std::map<uint16_t, uint64_t> message_index_offsets;
+    // Total size, in bytes, of all the MessageIndex entries for this Chunk
+    // together (note that they are required to be contiguous).
+    uint64_t message_index_size;
+  };
+  enum class RegisterHandlers { kYes, kNo };
   // Helpers to write each type of relevant record.
   void WriteMagic();
   void WriteHeader();
-  void WriteFooter();
+  void WriteFooter(uint64_t summary_offset, uint64_t summary_offset_offset);
   void WriteDataEnd();
   void WriteSchema(const uint16_t id, const aos::Channel *channel);
   void WriteChannel(const uint16_t id, const uint16_t schema_id,
                     const aos::Channel *channel);
   void WriteMessage(uint16_t channel_id, const Channel *channel,
-                    const Context &context);
-  void WriteConfig();
+                    const Context &context, std::ostream *output);
+  void WriteChunk();
+
+  // The helpers for writing records which appear in the Summary section will
+  // return SummaryOffset's so that they can be referenced in the SummaryOffset
+  // section.
+  SummaryOffset WriteChunkIndices();
+  SummaryOffset WriteStatistics();
+  std::vector<SummaryOffset> WriteSchemasAndChannels(
+      RegisterHandlers register_handlers);
+  void WriteSummaryOffset(const SummaryOffset &offset);
 
   // Writes an MCAP record to the output file.
-  void WriteRecord(OpCode op, std::string_view record);
-  // Adds an MCAP-spec string/fixed-size integer to a buffer.
+  void WriteRecord(OpCode op, std::string_view record, std::ostream *ostream);
+  void WriteRecord(OpCode op, std::string_view record) {
+    WriteRecord(op, record, &output_);
+  }
+  // Adds an MCAP-spec string/byte-array/map/array of pairs/fixed-size integer
+  // to a buffer.
   static void AppendString(FastStringBuilder *builder, std::string_view string);
+  static void AppendBytes(FastStringBuilder *builder, std::string_view bytes);
+  static void AppendChannelMap(FastStringBuilder *builder,
+                               const std::map<uint16_t, uint64_t> &map);
+  static void AppendMessageIndices(
+      FastStringBuilder *builder,
+      const std::vector<std::pair<uint64_t, uint64_t>> &messages);
   static void AppendInt16(FastStringBuilder *builder, uint16_t val);
   static void AppendInt32(FastStringBuilder *builder, uint32_t val);
   static void AppendInt64(FastStringBuilder *builder, uint64_t val);
 
+  aos::EventLoop *event_loop_;
   std::ofstream output_;
+  // Buffer containing serialized message data for the currently-being-built
+  // chunk.
+  std::stringstream current_chunk_;
   FastStringBuilder string_builder_;
+
+  // Earliest message observed in this logfile.
+  std::optional<aos::monotonic_clock::time_point> earliest_message_;
+  // Earliest message observed in the current chunk.
+  std::optional<aos::monotonic_clock::time_point> earliest_chunk_message_;
+  // Latest message observed.
+  aos::monotonic_clock::time_point latest_message_ =
+      aos::monotonic_clock::min_time;
+  // Count of all messages on each channel, indexed by channel ID.
+  std::map<uint16_t, uint64_t> message_counts_;
+  // MessageIndex's for each message. The std::map is indexed by channel ID. The
+  // vector is then a series of pairs of (timestamp, offset from start of
+  // current_chunk_).
+  std::map<uint16_t, std::vector<std::pair<uint64_t, uint64_t>>>
+      message_indices_;
+  // ChunkIndex's for all fully written Chunks.
+  std::vector<ChunkIndex> chunk_indices_;
 };
 }  // namespace aos
 #endif  // AOS_UTIL_MCAP_LOGGER_H_