Support indexing of MCAP files

Tested manually against Foxglove Studio and the `mcap doctor` tool.

Future work is to build the upstream `mcap` tool
(https://github.com/foxglove/mcap/tree/main/go/cli/mcap) which
includes a format validator, so that that can be tested automatically.

Change-Id: I05bb0e7487d76ac95e309545aceffc0304de1f1d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index 9ab4012..1cb9c32 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -4,6 +4,11 @@
 #include "single_include/nlohmann/json.hpp"
 
 namespace aos {
+namespace {
+// Amount of data to allow in each chunk before creating a new chunk.
+constexpr size_t kChunkSize = 10000000;
+}
+
 nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
                                        JsonSchemaRecursion recursion_level) {
   nlohmann::json schema;
@@ -71,39 +76,100 @@
 }
 
 McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path)
-    : output_(output_path) {
+    : event_loop_(event_loop), output_(output_path) {
   event_loop->SkipTimingReport();
   event_loop->SkipAosLog();
   CHECK(output_);
   WriteMagic();
   WriteHeader();
-  uint16_t id = 1;
-  for (const Channel *channel : *event_loop->configuration()->channels()) {
-    if (!configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
-      continue;
-    }
-
-    WriteSchema(id, channel);
-
-    // Write out the channel entry that uses the schema (we just re-use the
-    // chema ID for the channel ID, since we aren't deduplicating schemas for
-    // channels that are of the same type).
-    WriteChannel(id, id, channel);
-
-    event_loop->MakeRawWatcher(
-        channel, [this, id, channel](const Context &context, const void *) {
-          WriteMessage(id, channel, context);
-        });
-    ++id;
-  }
+  // Schemas and channels get written out both at the start and end of the file,
+  // per the MCAP spec.
+  WriteSchemasAndChannels(RegisterHandlers::kYes);
 }
 
 McapLogger::~McapLogger() {
+  // If we have any data remaining, write one last chunk.
+  if (current_chunk_.tellp() > 0) {
+    WriteChunk();
+  }
   WriteDataEnd();
-  WriteFooter();
+
+  // Now we enter the Summary section, where we write out all the channel/index
+  // information that readers need to be able to seek to arbitrary locations
+  // within the log.
+  const uint64_t summary_offset = output_.tellp();
+  const SummaryOffset chunk_indices_offset = WriteChunkIndices();
+  const SummaryOffset stats_offset = WriteStatistics();
+  // Schemas/Channels need to get reproduced in the summary section for random
+  // access reading.
+  const std::vector<SummaryOffset> offsets =
+      WriteSchemasAndChannels(RegisterHandlers::kNo);
+
+  // Next we have the summary offset section, which references the individual
+  // pieces of the summary section.
+  const uint64_t summary_offset_offset = output_.tellp();
+
+  // SummarytOffset's must all be the final thing before the footer.
+  WriteSummaryOffset(chunk_indices_offset);
+  WriteSummaryOffset(stats_offset);
+  for (const auto &offset : offsets) {
+    WriteSummaryOffset(offset);
+  }
+
+  // And finally, the footer which must itself reference the start of the
+  // summary and summary offset sections.
+  WriteFooter(summary_offset, summary_offset_offset);
   WriteMagic();
 }
 
+std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
+    RegisterHandlers register_handlers) {
+  uint16_t id = 1;
+  std::map<uint16_t, const Channel *> channels;
+  for (const Channel *channel : *event_loop_->configuration()->channels()) {
+    if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
+      continue;
+    }
+    channels[id] = channel;
+
+    if (register_handlers == RegisterHandlers::kYes) {
+      message_counts_[id] = 0;
+      event_loop_->MakeRawWatcher(
+          channel, [this, id, channel](const Context &context, const void *) {
+            WriteMessage(id, channel, context, &current_chunk_);
+            if (static_cast<uint64_t>(current_chunk_.tellp()) > kChunkSize) {
+              WriteChunk();
+            }
+          });
+    }
+    ++id;
+  }
+
+  std::vector<SummaryOffset> offsets;
+
+  const uint64_t schema_offset = output_.tellp();
+
+  for (const auto &pair : channels) {
+    WriteSchema(pair.first, pair.second);
+  }
+
+  const uint64_t channel_offset = output_.tellp();
+
+  offsets.push_back(
+      {OpCode::kSchema, schema_offset, channel_offset - schema_offset});
+
+  for (const auto &pair : channels) {
+    // Write out the channel entry that uses the schema (we just re-use
+    // the schema ID for the channel ID, since we aren't deduplicating
+    // schemas for channels that are of the same type).
+    WriteChannel(pair.first, pair.first, pair.second);
+  }
+
+  offsets.push_back({OpCode::kChannel, channel_offset,
+                     static_cast<uint64_t>(output_.tellp()) - channel_offset});
+  return offsets;
+}
+
 void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
 
 void McapLogger::WriteHeader() {
@@ -115,11 +181,12 @@
   WriteRecord(OpCode::kHeader, string_builder_.Result());
 }
 
-void McapLogger::WriteFooter() {
+void McapLogger::WriteFooter(uint64_t summary_offset,
+                             uint64_t summary_offset_offset) {
   string_builder_.Reset();
-  // Offsets and CRC32 for summary section, which we don't populate.
-  AppendInt64(&string_builder_, 0);
-  AppendInt64(&string_builder_, 0);
+  AppendInt64(&string_builder_, summary_offset);
+  AppendInt64(&string_builder_, summary_offset_offset);
+  // CRC32 for the Summary section, which we don't bother populating.
   AppendInt32(&string_builder_, 0);
   WriteRecord(OpCode::kFooter, string_builder_.Result());
 }
@@ -167,9 +234,19 @@
 }
 
 void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
-                              const Context &context) {
+                              const Context &context, std::ostream *output) {
   CHECK_NOTNULL(context.data);
 
+  message_counts_[channel_id]++;
+
+  if (!earliest_message_.has_value()) {
+    earliest_message_ = context.monotonic_event_time;
+  }
+  if (!earliest_chunk_message_.has_value()) {
+    earliest_chunk_message_ = context.monotonic_event_time;
+  }
+  latest_message_ = context.monotonic_event_time;
+
   string_builder_.Reset();
   // Channel ID
   AppendInt16(&string_builder_, channel_id);
@@ -193,15 +270,117 @@
   aos::FlatbufferToJson(&string_builder_, channel->schema(),
                         static_cast<const uint8_t *>(context.data));
 
-  WriteRecord(OpCode::kMessage, string_builder_.Result());
+  message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
+      context.monotonic_event_time.time_since_epoch().count(),
+      output->tellp()));
+
+  WriteRecord(OpCode::kMessage, string_builder_.Result(), output);
 }
 
-void McapLogger::WriteRecord(OpCode op, std::string_view record) {
-  output_.put(static_cast<char>(op));
+void McapLogger::WriteRecord(OpCode op, std::string_view record,
+                             std::ostream *ostream) {
+  ostream->put(static_cast<char>(op));
   uint64_t record_length = record.size();
-  output_.write(reinterpret_cast<const char *>(&record_length),
-                sizeof(record_length));
-  output_ << record;
+  ostream->write(reinterpret_cast<const char *>(&record_length),
+                 sizeof(record_length));
+  *ostream << record;
+}
+
+void McapLogger::WriteChunk() {
+  string_builder_.Reset();
+
+  CHECK(earliest_chunk_message_.has_value());
+  const uint64_t chunk_offset = output_.tellp();
+  AppendInt64(&string_builder_,
+              earliest_chunk_message_->time_since_epoch().count());
+  AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
+
+  std::string chunk_records = current_chunk_.str();
+  // Reset the chunk buffer.
+  current_chunk_.str("");
+
+  const uint64_t records_size = chunk_records.size();
+  // Uncompressed chunk size.
+  AppendInt64(&string_builder_, records_size);
+  // Uncompressed CRC (unpopulated).
+  AppendInt32(&string_builder_, 0);
+  AppendString(&string_builder_, "");
+  AppendBytes(&string_builder_, chunk_records);
+  WriteRecord(OpCode::kChunk, string_builder_.Result());
+
+  std::map<uint16_t, uint64_t> index_offsets;
+  const uint64_t message_index_start = output_.tellp();
+  for (const auto &indices : message_indices_) {
+    index_offsets[indices.first] = output_.tellp();
+    string_builder_.Reset();
+    AppendInt16(&string_builder_, indices.first);
+    AppendMessageIndices(&string_builder_, indices.second);
+    WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
+  }
+  message_indices_.clear();
+  chunk_indices_.push_back(ChunkIndex{
+      earliest_chunk_message_.value(), latest_message_, chunk_offset,
+      message_index_start - chunk_offset, records_size, index_offsets,
+      static_cast<uint64_t>(output_.tellp()) - message_index_start});
+  earliest_chunk_message_.reset();
+}
+
+McapLogger::SummaryOffset McapLogger::WriteStatistics() {
+  const uint64_t stats_offset = output_.tellp();
+  const uint64_t message_count = std::accumulate(
+      message_counts_.begin(), message_counts_.end(), 0,
+      [](const uint64_t &count, const std::pair<uint16_t, uint64_t> &val) {
+        return count + val.second;
+      });
+  string_builder_.Reset();
+  AppendInt64(&string_builder_, message_count);
+  // Schema count.
+  AppendInt16(&string_builder_, message_counts_.size());
+  // Channel count.
+  AppendInt32(&string_builder_, message_counts_.size());
+  // Attachment count.
+  AppendInt32(&string_builder_, 0);
+  // Metadata count.
+  AppendInt32(&string_builder_, 0);
+  // Chunk count.
+  AppendInt32(&string_builder_, chunk_indices_.size());
+  // Earliest & latest message times.
+  AppendInt64(&string_builder_, earliest_message_->time_since_epoch().count());
+  AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
+  // Per-channel message counts.
+  AppendChannelMap(&string_builder_, message_counts_);
+  WriteRecord(OpCode::kStatistics, string_builder_.Result());
+  return {OpCode::kStatistics, stats_offset,
+          static_cast<uint64_t>(output_.tellp()) - stats_offset};
+}
+
+McapLogger::SummaryOffset McapLogger::WriteChunkIndices() {
+  const uint64_t index_offset = output_.tellp();
+  for (const ChunkIndex &index : chunk_indices_) {
+    string_builder_.Reset();
+    AppendInt64(&string_builder_, index.start_time.time_since_epoch().count());
+    AppendInt64(&string_builder_, index.end_time.time_since_epoch().count());
+    AppendInt64(&string_builder_, index.offset);
+    AppendInt64(&string_builder_, index.chunk_size);
+    AppendChannelMap(&string_builder_, index.message_index_offsets);
+    AppendInt64(&string_builder_, index.message_index_size);
+    // Compression used.
+    AppendString(&string_builder_, "");
+    // Compressed and uncompressed records size.
+    AppendInt64(&string_builder_, index.records_size);
+    AppendInt64(&string_builder_, index.records_size);
+    WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
+  }
+  return {OpCode::kChunkIndex, index_offset,
+          static_cast<uint64_t>(output_.tellp()) - index_offset};
+}
+
+void McapLogger::WriteSummaryOffset(const SummaryOffset &offset) {
+  string_builder_.Reset();
+  string_builder_.AppendChar(static_cast<char>(offset.op_code));
+  AppendInt64(&string_builder_, offset.offset);
+  AppendInt64(&string_builder_, offset.size);
+  WriteRecord(OpCode::kSummaryOffset, string_builder_.Result());
 }
 
 void McapLogger::AppendString(FastStringBuilder *builder,
@@ -210,14 +389,41 @@
   builder->Append(string);
 }
 
+void McapLogger::AppendBytes(FastStringBuilder *builder,
+                             std::string_view bytes) {
+  AppendInt64(builder, bytes.size());
+  builder->Append(bytes);
+}
+
 namespace {
 template <typename T>
 static void AppendInt(FastStringBuilder *builder, T val) {
   builder->Append(
       std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
 }
+template <typename T>
+void AppendMap(FastStringBuilder *builder, const T &map) {
+  AppendInt<uint32_t>(
+      builder, map.size() * (sizeof(typename T::value_type::first_type) +
+                             sizeof(typename T::value_type::second_type)));
+  for (const auto &pair : map) {
+    AppendInt(builder, pair.first);
+    AppendInt(builder, pair.second);
+  }
+}
 }  // namespace
 
+void McapLogger::AppendChannelMap(FastStringBuilder *builder,
+                                  const std::map<uint16_t, uint64_t> &map) {
+  AppendMap(builder, map);
+}
+
+void McapLogger::AppendMessageIndices(
+    FastStringBuilder *builder,
+    const std::vector<std::pair<uint64_t, uint64_t>> &messages) {
+  AppendMap(builder, messages);
+}
+
 void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
   AppendInt(builder, val);
 }