Merge "Support indexing of MCAP files"
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index 9ab4012..1cb9c32 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -4,6 +4,11 @@
 #include "single_include/nlohmann/json.hpp"
 
 namespace aos {
+namespace {
+// Amount of data to allow in each chunk before creating a new chunk.
+constexpr size_t kChunkSize = 10000000;
+}
+
 nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
                                        JsonSchemaRecursion recursion_level) {
   nlohmann::json schema;
@@ -71,39 +76,100 @@
 }
 
 McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path)
-    : output_(output_path) {
+    : event_loop_(event_loop), output_(output_path) {
   event_loop->SkipTimingReport();
   event_loop->SkipAosLog();
   CHECK(output_);
   WriteMagic();
   WriteHeader();
-  uint16_t id = 1;
-  for (const Channel *channel : *event_loop->configuration()->channels()) {
-    if (!configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
-      continue;
-    }
-
-    WriteSchema(id, channel);
-
-    // Write out the channel entry that uses the schema (we just re-use the
-    // chema ID for the channel ID, since we aren't deduplicating schemas for
-    // channels that are of the same type).
-    WriteChannel(id, id, channel);
-
-    event_loop->MakeRawWatcher(
-        channel, [this, id, channel](const Context &context, const void *) {
-          WriteMessage(id, channel, context);
-        });
-    ++id;
-  }
+  // Schemas and channels get written out both at the start and end of the file,
+  // per the MCAP spec.
+  WriteSchemasAndChannels(RegisterHandlers::kYes);
 }
 
 McapLogger::~McapLogger() {
+  // If we have any data remaining, write one last chunk.
+  if (current_chunk_.tellp() > 0) {
+    WriteChunk();
+  }
   WriteDataEnd();
-  WriteFooter();
+
+  // Now we enter the Summary section, where we write out all the channel/index
+  // information that readers need to be able to seek to arbitrary locations
+  // within the log.
+  const uint64_t summary_offset = output_.tellp();
+  const SummaryOffset chunk_indices_offset = WriteChunkIndices();
+  const SummaryOffset stats_offset = WriteStatistics();
+  // Schemas/Channels need to get reproduced in the summary section for random
+  // access reading.
+  const std::vector<SummaryOffset> offsets =
+      WriteSchemasAndChannels(RegisterHandlers::kNo);
+
+  // Next we have the summary offset section, which references the individual
+  // pieces of the summary section.
+  const uint64_t summary_offset_offset = output_.tellp();
+
+  // SummarytOffset's must all be the final thing before the footer.
+  WriteSummaryOffset(chunk_indices_offset);
+  WriteSummaryOffset(stats_offset);
+  for (const auto &offset : offsets) {
+    WriteSummaryOffset(offset);
+  }
+
+  // And finally, the footer which must itself reference the start of the
+  // summary and summary offset sections.
+  WriteFooter(summary_offset, summary_offset_offset);
   WriteMagic();
 }
 
+std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
+    RegisterHandlers register_handlers) {
+  uint16_t id = 1;
+  std::map<uint16_t, const Channel *> channels;
+  for (const Channel *channel : *event_loop_->configuration()->channels()) {
+    if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
+      continue;
+    }
+    channels[id] = channel;
+
+    if (register_handlers == RegisterHandlers::kYes) {
+      message_counts_[id] = 0;
+      event_loop_->MakeRawWatcher(
+          channel, [this, id, channel](const Context &context, const void *) {
+            WriteMessage(id, channel, context, &current_chunk_);
+            if (static_cast<uint64_t>(current_chunk_.tellp()) > kChunkSize) {
+              WriteChunk();
+            }
+          });
+    }
+    ++id;
+  }
+
+  std::vector<SummaryOffset> offsets;
+
+  const uint64_t schema_offset = output_.tellp();
+
+  for (const auto &pair : channels) {
+    WriteSchema(pair.first, pair.second);
+  }
+
+  const uint64_t channel_offset = output_.tellp();
+
+  offsets.push_back(
+      {OpCode::kSchema, schema_offset, channel_offset - schema_offset});
+
+  for (const auto &pair : channels) {
+    // Write out the channel entry that uses the schema (we just re-use
+    // the schema ID for the channel ID, since we aren't deduplicating
+    // schemas for channels that are of the same type).
+    WriteChannel(pair.first, pair.first, pair.second);
+  }
+
+  offsets.push_back({OpCode::kChannel, channel_offset,
+                     static_cast<uint64_t>(output_.tellp()) - channel_offset});
+  return offsets;
+}
+
 void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
 
 void McapLogger::WriteHeader() {
@@ -115,11 +181,12 @@
   WriteRecord(OpCode::kHeader, string_builder_.Result());
 }
 
-void McapLogger::WriteFooter() {
+void McapLogger::WriteFooter(uint64_t summary_offset,
+                             uint64_t summary_offset_offset) {
   string_builder_.Reset();
-  // Offsets and CRC32 for summary section, which we don't populate.
-  AppendInt64(&string_builder_, 0);
-  AppendInt64(&string_builder_, 0);
+  AppendInt64(&string_builder_, summary_offset);
+  AppendInt64(&string_builder_, summary_offset_offset);
+  // CRC32 for the Summary section, which we don't bother populating.
   AppendInt32(&string_builder_, 0);
   WriteRecord(OpCode::kFooter, string_builder_.Result());
 }
@@ -167,9 +234,19 @@
 }
 
 void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
-                              const Context &context) {
+                              const Context &context, std::ostream *output) {
   CHECK_NOTNULL(context.data);
 
+  message_counts_[channel_id]++;
+
+  if (!earliest_message_.has_value()) {
+    earliest_message_ = context.monotonic_event_time;
+  }
+  if (!earliest_chunk_message_.has_value()) {
+    earliest_chunk_message_ = context.monotonic_event_time;
+  }
+  latest_message_ = context.monotonic_event_time;
+
   string_builder_.Reset();
   // Channel ID
   AppendInt16(&string_builder_, channel_id);
@@ -193,15 +270,117 @@
   aos::FlatbufferToJson(&string_builder_, channel->schema(),
                         static_cast<const uint8_t *>(context.data));
 
-  WriteRecord(OpCode::kMessage, string_builder_.Result());
+  message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
+      context.monotonic_event_time.time_since_epoch().count(),
+      output->tellp()));
+
+  WriteRecord(OpCode::kMessage, string_builder_.Result(), output);
 }
 
-void McapLogger::WriteRecord(OpCode op, std::string_view record) {
-  output_.put(static_cast<char>(op));
+void McapLogger::WriteRecord(OpCode op, std::string_view record,
+                             std::ostream *ostream) {
+  ostream->put(static_cast<char>(op));
   uint64_t record_length = record.size();
-  output_.write(reinterpret_cast<const char *>(&record_length),
-                sizeof(record_length));
-  output_ << record;
+  ostream->write(reinterpret_cast<const char *>(&record_length),
+                 sizeof(record_length));
+  *ostream << record;
+}
+
+void McapLogger::WriteChunk() {
+  string_builder_.Reset();
+
+  CHECK(earliest_chunk_message_.has_value());
+  const uint64_t chunk_offset = output_.tellp();
+  AppendInt64(&string_builder_,
+              earliest_chunk_message_->time_since_epoch().count());
+  AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
+
+  std::string chunk_records = current_chunk_.str();
+  // Reset the chunk buffer.
+  current_chunk_.str("");
+
+  const uint64_t records_size = chunk_records.size();
+  // Uncompressed chunk size.
+  AppendInt64(&string_builder_, records_size);
+  // Uncompressed CRC (unpopulated).
+  AppendInt32(&string_builder_, 0);
+  AppendString(&string_builder_, "");
+  AppendBytes(&string_builder_, chunk_records);
+  WriteRecord(OpCode::kChunk, string_builder_.Result());
+
+  std::map<uint16_t, uint64_t> index_offsets;
+  const uint64_t message_index_start = output_.tellp();
+  for (const auto &indices : message_indices_) {
+    index_offsets[indices.first] = output_.tellp();
+    string_builder_.Reset();
+    AppendInt16(&string_builder_, indices.first);
+    AppendMessageIndices(&string_builder_, indices.second);
+    WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
+  }
+  message_indices_.clear();
+  chunk_indices_.push_back(ChunkIndex{
+      earliest_chunk_message_.value(), latest_message_, chunk_offset,
+      message_index_start - chunk_offset, records_size, index_offsets,
+      static_cast<uint64_t>(output_.tellp()) - message_index_start});
+  earliest_chunk_message_.reset();
+}
+
+McapLogger::SummaryOffset McapLogger::WriteStatistics() {
+  const uint64_t stats_offset = output_.tellp();
+  const uint64_t message_count = std::accumulate(
+      message_counts_.begin(), message_counts_.end(), 0,
+      [](const uint64_t &count, const std::pair<uint16_t, uint64_t> &val) {
+        return count + val.second;
+      });
+  string_builder_.Reset();
+  AppendInt64(&string_builder_, message_count);
+  // Schema count.
+  AppendInt16(&string_builder_, message_counts_.size());
+  // Channel count.
+  AppendInt32(&string_builder_, message_counts_.size());
+  // Attachment count.
+  AppendInt32(&string_builder_, 0);
+  // Metadata count.
+  AppendInt32(&string_builder_, 0);
+  // Chunk count.
+  AppendInt32(&string_builder_, chunk_indices_.size());
+  // Earliest & latest message times.
+  AppendInt64(&string_builder_, earliest_message_->time_since_epoch().count());
+  AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
+  // Per-channel message counts.
+  AppendChannelMap(&string_builder_, message_counts_);
+  WriteRecord(OpCode::kStatistics, string_builder_.Result());
+  return {OpCode::kStatistics, stats_offset,
+          static_cast<uint64_t>(output_.tellp()) - stats_offset};
+}
+
+McapLogger::SummaryOffset McapLogger::WriteChunkIndices() {
+  const uint64_t index_offset = output_.tellp();
+  for (const ChunkIndex &index : chunk_indices_) {
+    string_builder_.Reset();
+    AppendInt64(&string_builder_, index.start_time.time_since_epoch().count());
+    AppendInt64(&string_builder_, index.end_time.time_since_epoch().count());
+    AppendInt64(&string_builder_, index.offset);
+    AppendInt64(&string_builder_, index.chunk_size);
+    AppendChannelMap(&string_builder_, index.message_index_offsets);
+    AppendInt64(&string_builder_, index.message_index_size);
+    // Compression used.
+    AppendString(&string_builder_, "");
+    // Compressed and uncompressed records size.
+    AppendInt64(&string_builder_, index.records_size);
+    AppendInt64(&string_builder_, index.records_size);
+    WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
+  }
+  return {OpCode::kChunkIndex, index_offset,
+          static_cast<uint64_t>(output_.tellp()) - index_offset};
+}
+
+void McapLogger::WriteSummaryOffset(const SummaryOffset &offset) {
+  string_builder_.Reset();
+  string_builder_.AppendChar(static_cast<char>(offset.op_code));
+  AppendInt64(&string_builder_, offset.offset);
+  AppendInt64(&string_builder_, offset.size);
+  WriteRecord(OpCode::kSummaryOffset, string_builder_.Result());
 }
 
 void McapLogger::AppendString(FastStringBuilder *builder,
@@ -210,14 +389,41 @@
   builder->Append(string);
 }
 
+void McapLogger::AppendBytes(FastStringBuilder *builder,
+                             std::string_view bytes) {
+  AppendInt64(builder, bytes.size());
+  builder->Append(bytes);
+}
+
 namespace {
 template <typename T>
 static void AppendInt(FastStringBuilder *builder, T val) {
   builder->Append(
       std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
 }
+template <typename T>
+void AppendMap(FastStringBuilder *builder, const T &map) {
+  AppendInt<uint32_t>(
+      builder, map.size() * (sizeof(typename T::value_type::first_type) +
+                             sizeof(typename T::value_type::second_type)));
+  for (const auto &pair : map) {
+    AppendInt(builder, pair.first);
+    AppendInt(builder, pair.second);
+  }
+}
 }  // namespace
 
+void McapLogger::AppendChannelMap(FastStringBuilder *builder,
+                                  const std::map<uint16_t, uint64_t> &map) {
+  AppendMap(builder, map);
+}
+
+void McapLogger::AppendMessageIndices(
+    FastStringBuilder *builder,
+    const std::vector<std::pair<uint64_t, uint64_t>> &messages) {
+  AppendMap(builder, messages);
+}
+
 void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
   AppendInt(builder, val);
 }
diff --git a/aos/util/mcap_logger.h b/aos/util/mcap_logger.h
index 15d11db..dcacb68 100644
--- a/aos/util/mcap_logger.h
+++ b/aos/util/mcap_logger.h
@@ -26,6 +26,8 @@
 
 // Generates an MCAP file, per the specification at
 // https://github.com/foxglove/mcap/tree/main/docs/specification
+// This currently generates an uncompressed logfile with full message indexing
+// available, to be able to support Foxglove fully.
 class McapLogger {
  public:
   McapLogger(EventLoop *event_loop, const std::string &output_path);
@@ -38,30 +40,109 @@
     kSchema = 0x03,
     kChannel = 0x04,
     kMessage = 0x05,
+    kChunk = 0x06,
+    kMessageIndex = 0x07,
+    kChunkIndex = 0x08,
+    kAttachment = 0x09,
+    kAttachmentIndex = 0x0A,
+    kStatistics = 0x0B,
+    kMetadata = 0x0C,
+    kMetadataIndex = 0x0D,
+    kSummaryOffset = 0x0E,
     kDataEnd = 0x0F,
   };
+  // Stores information associated with a SummaryOffset entry (an offset to the
+  // start of a section within Summary section, which allows readers to quickly
+  // find all the indices/channel definitions/etc. for a given log).
+  struct SummaryOffset {
+    OpCode op_code;
+    // Offset from the start of the file.
+    uint64_t offset;
+    // Total length of the section, in bytes.
+    uint64_t size;
+  };
+  // Information needed to build a ChunkIndex entry.
+  struct ChunkIndex {
+    // Earliest and latest message times within the Chunk being referenced.
+    aos::monotonic_clock::time_point start_time;
+    aos::monotonic_clock::time_point end_time;
+    // Offset from the start of the file to the start of the relevant Chunk.
+    uint64_t offset;
+    // Total size of the Chunk, in bytes.
+    uint64_t chunk_size;
+    // Total size of the records portion of the Chunk, in bytes.
+    uint64_t records_size;
+    // Mapping of channel IDs to the MessageIndex entry for that channel within
+    // the referenced Chunk. The MessageIndex is referenced by an offset from
+    // the start of the file.
+    std::map<uint16_t, uint64_t> message_index_offsets;
+    // Total size, in bytes, of all the MessageIndex entries for this Chunk
+    // together (note that they are required to be contiguous).
+    uint64_t message_index_size;
+  };
+  enum class RegisterHandlers { kYes, kNo };
   // Helpers to write each type of relevant record.
   void WriteMagic();
   void WriteHeader();
-  void WriteFooter();
+  void WriteFooter(uint64_t summary_offset, uint64_t summary_offset_offset);
   void WriteDataEnd();
   void WriteSchema(const uint16_t id, const aos::Channel *channel);
   void WriteChannel(const uint16_t id, const uint16_t schema_id,
                     const aos::Channel *channel);
   void WriteMessage(uint16_t channel_id, const Channel *channel,
-                    const Context &context);
-  void WriteConfig();
+                    const Context &context, std::ostream *output);
+  void WriteChunk();
+
+  // The helpers for writing records which appear in the Summary section will
+  // return SummaryOffset's so that they can be referenced in the SummaryOffset
+  // section.
+  SummaryOffset WriteChunkIndices();
+  SummaryOffset WriteStatistics();
+  std::vector<SummaryOffset> WriteSchemasAndChannels(
+      RegisterHandlers register_handlers);
+  void WriteSummaryOffset(const SummaryOffset &offset);
 
   // Writes an MCAP record to the output file.
-  void WriteRecord(OpCode op, std::string_view record);
-  // Adds an MCAP-spec string/fixed-size integer to a buffer.
+  void WriteRecord(OpCode op, std::string_view record, std::ostream *ostream);
+  void WriteRecord(OpCode op, std::string_view record) {
+    WriteRecord(op, record, &output_);
+  }
+  // Adds an MCAP-spec string/byte-array/map/array of pairs/fixed-size integer
+  // to a buffer.
   static void AppendString(FastStringBuilder *builder, std::string_view string);
+  static void AppendBytes(FastStringBuilder *builder, std::string_view bytes);
+  static void AppendChannelMap(FastStringBuilder *builder,
+                               const std::map<uint16_t, uint64_t> &map);
+  static void AppendMessageIndices(
+      FastStringBuilder *builder,
+      const std::vector<std::pair<uint64_t, uint64_t>> &messages);
   static void AppendInt16(FastStringBuilder *builder, uint16_t val);
   static void AppendInt32(FastStringBuilder *builder, uint32_t val);
   static void AppendInt64(FastStringBuilder *builder, uint64_t val);
 
+  aos::EventLoop *event_loop_;
   std::ofstream output_;
+  // Buffer containing serialized message data for the currently-being-built
+  // chunk.
+  std::stringstream current_chunk_;
   FastStringBuilder string_builder_;
+
+  // Earliest message observed in this logfile.
+  std::optional<aos::monotonic_clock::time_point> earliest_message_;
+  // Earliest message observed in the current chunk.
+  std::optional<aos::monotonic_clock::time_point> earliest_chunk_message_;
+  // Latest message observed.
+  aos::monotonic_clock::time_point latest_message_ =
+      aos::monotonic_clock::min_time;
+  // Count of all messages on each channel, indexed by channel ID.
+  std::map<uint16_t, uint64_t> message_counts_;
+  // MessageIndex's for each message. The std::map is indexed by channel ID. The
+  // vector is then a series of pairs of (timestamp, offset from start of
+  // current_chunk_).
+  std::map<uint16_t, std::vector<std::pair<uint64_t, uint64_t>>>
+      message_indices_;
+  // ChunkIndex's for all fully written Chunks.
+  std::vector<ChunkIndex> chunk_indices_;
 };
 }  // namespace aos
 #endif  // AOS_UTIL_MCAP_LOGGER_H_