Split MCAP channels into separate chunks

This significantly improves performance on Foxglove Studio by making it
so that if you, e.g., want to plot the memory usage for a given process
Studio doesn't *also* have to read every other message in the log.

For much higher rate/bigger messages (e.g., Pose or Map data), the
performance benefits are less substantial, since they were the
channels causing issues before.

Change-Id: I956e4be244d3f21e25a3fd87e1ed7303fca704bb
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index 4d4c1bd..818844b 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -122,8 +122,10 @@
 
 McapLogger::~McapLogger() {
   // If we have any data remaining, write one last chunk.
-  if (current_chunk_.tellp() > 0) {
-    WriteChunk();
+  for (auto &pair : current_chunks_) {
+    if (pair.second.data.tellp() > 0) {
+      WriteChunk(&pair.second);
+    }
   }
   WriteDataEnd();
 
@@ -190,16 +192,17 @@
       message_counts_[id] = 0;
       event_loop_->MakeRawWatcher(
           channel, [this, id, channel](const Context &context, const void *) {
-            WriteMessage(id, channel, context, &current_chunk_);
-            if (static_cast<uint64_t>(current_chunk_.tellp()) >
+            ChunkStatus *chunk = &current_chunks_[id];
+            WriteMessage(id, channel, context, chunk);
+            if (static_cast<uint64_t>(chunk->data.tellp()) >
                 FLAGS_mcap_chunk_size) {
-              WriteChunk();
+              WriteChunk(chunk);
             }
           });
       fetchers_[id] = event_loop_->MakeRawFetcher(channel);
       event_loop_->OnRun([this, id, channel]() {
         if (FLAGS_fetch && fetchers_[id]->Fetch()) {
-          WriteMessage(id, channel, fetchers_[id]->context(), &current_chunk_);
+          WriteMessage(id, channel, fetchers_[id]->context(), &current_chunks_[id]);
         }
       });
     }
@@ -215,7 +218,7 @@
       config_context.size = configuration_.span().size();
       config_context.data = configuration_.span().data();
       WriteMessage(configuration_id_, &configuration_channel_.message(),
-                   config_context, &current_chunk_);
+                   config_context, &current_chunks_[configuration_id_]);
     });
   }
 
@@ -366,7 +369,7 @@
 }
 
 void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
-                              const Context &context, std::ostream *output) {
+                              const Context &context, ChunkStatus *chunk) {
   CHECK_NOTNULL(context.data);
 
   message_counts_[channel_id]++;
@@ -377,12 +380,13 @@
     earliest_message_ =
         std::min(context.monotonic_event_time, earliest_message_.value());
   }
-  if (!earliest_chunk_message_.has_value()) {
-    earliest_chunk_message_ = context.monotonic_event_time;
+  if (!chunk->earliest_message.has_value()) {
+    chunk->earliest_message = context.monotonic_event_time;
   } else {
-    earliest_chunk_message_ =
-        std::min(context.monotonic_event_time, earliest_chunk_message_.value());
+    chunk->earliest_message =
+        std::min(context.monotonic_event_time, chunk->earliest_message.value());
   }
+  chunk->latest_message = context.monotonic_event_time;
   latest_message_ = context.monotonic_event_time;
 
   string_builder_.Reset();
@@ -420,11 +424,12 @@
   total_message_bytes_ += context.size;
   total_channel_bytes_[channel] += context.size;
 
-  message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
-      context.monotonic_event_time.time_since_epoch().count(),
-      output->tellp()));
+  chunk->message_indices[channel_id].push_back(
+      std::make_pair<uint64_t, uint64_t>(
+          context.monotonic_event_time.time_since_epoch().count(),
+          chunk->data.tellp()));
 
-  WriteRecord(OpCode::kMessage, string_builder_.Result(), output);
+  WriteRecord(OpCode::kMessage, string_builder_.Result(), &chunk->data);
 }
 
 void McapLogger::WriteRecord(OpCode op, std::string_view record,
@@ -436,18 +441,20 @@
   *ostream << record;
 }
 
-void McapLogger::WriteChunk() {
+void McapLogger::WriteChunk(ChunkStatus *chunk) {
   string_builder_.Reset();
 
-  CHECK(earliest_chunk_message_.has_value());
+  CHECK(chunk->earliest_message.has_value());
   const uint64_t chunk_offset = output_.tellp();
   AppendInt64(&string_builder_,
-              earliest_chunk_message_->time_since_epoch().count());
-  AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
+              chunk->earliest_message->time_since_epoch().count());
+  CHECK(chunk->latest_message.has_value());
+  AppendInt64(&string_builder_,
+              chunk->latest_message.value().time_since_epoch().count());
 
-  std::string chunk_records = current_chunk_.str();
+  std::string chunk_records = chunk->data.str();
   // Reset the chunk buffer.
-  current_chunk_.str("");
+  chunk->data.str("");
 
   const uint64_t records_size = chunk_records.size();
   // Uncompressed chunk size.
@@ -460,19 +467,19 @@
 
   std::map<uint16_t, uint64_t> index_offsets;
   const uint64_t message_index_start = output_.tellp();
-  for (const auto &indices : message_indices_) {
+  for (const auto &indices : chunk->message_indices) {
     index_offsets[indices.first] = output_.tellp();
     string_builder_.Reset();
     AppendInt16(&string_builder_, indices.first);
     AppendMessageIndices(&string_builder_, indices.second);
     WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
   }
-  message_indices_.clear();
+  chunk->message_indices.clear();
   chunk_indices_.push_back(ChunkIndex{
-      earliest_chunk_message_.value(), latest_message_, chunk_offset,
+      chunk->earliest_message.value(), chunk->latest_message.value(), chunk_offset,
       message_index_start - chunk_offset, records_size, index_offsets,
       static_cast<uint64_t>(output_.tellp()) - message_index_start});
-  earliest_chunk_message_.reset();
+  chunk->earliest_message.reset();
 }
 
 McapLogger::SummaryOffset McapLogger::WriteStatistics() {
diff --git a/aos/util/mcap_logger.h b/aos/util/mcap_logger.h
index d264c1e..267364a 100644
--- a/aos/util/mcap_logger.h
+++ b/aos/util/mcap_logger.h
@@ -97,6 +97,27 @@
     // together (note that they are required to be contiguous).
     uint64_t message_index_size;
   };
+  // Maintains the state of a single Chunk. In order to maximize read performance,
+  // we currently maintain separate chunks for each channel so that, in order to
+  // read a given channel, only data associated with that channel nead be read.
+  struct ChunkStatus {
+    // Buffer containing serialized message data for the currently-being-built
+    // chunk.
+    std::stringstream data;
+    // Earliest message observed in this chunk.
+    std::optional<aos::monotonic_clock::time_point> earliest_message;
+    // Latest message observed in this chunk.
+    std::optional<aos::monotonic_clock::time_point> latest_message;
+    // MessageIndex's for each message. The std::map is indexed by channel ID.
+    // The vector is then a series of pairs of (timestamp, offset from start of
+    // data).
+    // Note that currently this will only ever have one entry, for the channel
+    // that this chunk corresponds to. However, the standard provides for there
+    // being more than one channel per chunk and so we still have some code that
+    // supports that.
+    std::map<uint16_t, std::vector<std::pair<uint64_t, uint64_t>>>
+        message_indices;
+  };
   enum class RegisterHandlers { kYes, kNo };
   // Helpers to write each type of relevant record.
   void WriteMagic();
@@ -108,8 +129,8 @@
                     const aos::Channel *channel,
                     std::string_view override_name = "");
   void WriteMessage(uint16_t channel_id, const Channel *channel,
-                    const Context &context, std::ostream *output);
-  void WriteChunk();
+                    const Context &context, ChunkStatus *chunk);
+  void WriteChunk(ChunkStatus *chunk);
 
   // The helpers for writing records which appear in the Summary section will
   // return SummaryOffset's so that they can be referenced in the SummaryOffset
@@ -144,26 +165,18 @@
   const CanonicalChannelNames canonical_channels_;
   size_t total_message_bytes_ = 0;
   std::map<const Channel *, size_t> total_channel_bytes_;
-  // Buffer containing serialized message data for the currently-being-built
-  // chunk.
-  std::stringstream current_chunk_;
   FastStringBuilder string_builder_;
 
   // Earliest message observed in this logfile.
   std::optional<aos::monotonic_clock::time_point> earliest_message_;
-  // Earliest message observed in the current chunk.
-  std::optional<aos::monotonic_clock::time_point> earliest_chunk_message_;
-  // Latest message observed.
-  aos::monotonic_clock::time_point latest_message_ =
-      aos::monotonic_clock::min_time;
+  // Latest message observed in this logfile.
+  aos::monotonic_clock::time_point latest_message_ = aos::monotonic_clock::min_time;
   // Count of all messages on each channel, indexed by channel ID.
   std::map<uint16_t, uint64_t> message_counts_;
   std::map<uint16_t, std::unique_ptr<RawFetcher>> fetchers_;
-  // MessageIndex's for each message. The std::map is indexed by channel ID. The
-  // vector is then a series of pairs of (timestamp, offset from start of
-  // current_chunk_).
-  std::map<uint16_t, std::vector<std::pair<uint64_t, uint64_t>>>
-      message_indices_;
+  // All currently-being-built chunks. Indexed by channel ID. This is used to
+  // segregate channels into separate chunks to support more efficient reading.
+  std::map<uint16_t, ChunkStatus> current_chunks_;
   // ChunkIndex's for all fully written Chunks.
   std::vector<ChunkIndex> chunk_indices_;