Split MCAP channels into separate chunks
This significantly improves performance on Foxglove Studio by making it
so that if you, e.g., want to plot the memory usage for a given process
Studio doesn't *also* have to read every other message in the log.
For much higher rate/bigger messages (e.g., Pose or Map data), the
performance benefits are less substantial, since they were the
channels causing issues before.
Change-Id: I956e4be244d3f21e25a3fd87e1ed7303fca704bb
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index 4d4c1bd..818844b 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -122,8 +122,10 @@
McapLogger::~McapLogger() {
// If we have any data remaining, write one last chunk.
- if (current_chunk_.tellp() > 0) {
- WriteChunk();
+ for (auto &pair : current_chunks_) {
+ if (pair.second.data.tellp() > 0) {
+ WriteChunk(&pair.second);
+ }
}
WriteDataEnd();
@@ -190,16 +192,17 @@
message_counts_[id] = 0;
event_loop_->MakeRawWatcher(
channel, [this, id, channel](const Context &context, const void *) {
- WriteMessage(id, channel, context, ¤t_chunk_);
- if (static_cast<uint64_t>(current_chunk_.tellp()) >
+ ChunkStatus *chunk = ¤t_chunks_[id];
+ WriteMessage(id, channel, context, chunk);
+ if (static_cast<uint64_t>(chunk->data.tellp()) >
FLAGS_mcap_chunk_size) {
- WriteChunk();
+ WriteChunk(chunk);
}
});
fetchers_[id] = event_loop_->MakeRawFetcher(channel);
event_loop_->OnRun([this, id, channel]() {
if (FLAGS_fetch && fetchers_[id]->Fetch()) {
- WriteMessage(id, channel, fetchers_[id]->context(), ¤t_chunk_);
+ WriteMessage(id, channel, fetchers_[id]->context(), ¤t_chunks_[id]);
}
});
}
@@ -215,7 +218,7 @@
config_context.size = configuration_.span().size();
config_context.data = configuration_.span().data();
WriteMessage(configuration_id_, &configuration_channel_.message(),
- config_context, ¤t_chunk_);
+ config_context, ¤t_chunks_[configuration_id_]);
});
}
@@ -366,7 +369,7 @@
}
void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
- const Context &context, std::ostream *output) {
+ const Context &context, ChunkStatus *chunk) {
CHECK_NOTNULL(context.data);
message_counts_[channel_id]++;
@@ -377,12 +380,13 @@
earliest_message_ =
std::min(context.monotonic_event_time, earliest_message_.value());
}
- if (!earliest_chunk_message_.has_value()) {
- earliest_chunk_message_ = context.monotonic_event_time;
+ if (!chunk->earliest_message.has_value()) {
+ chunk->earliest_message = context.monotonic_event_time;
} else {
- earliest_chunk_message_ =
- std::min(context.monotonic_event_time, earliest_chunk_message_.value());
+ chunk->earliest_message =
+ std::min(context.monotonic_event_time, chunk->earliest_message.value());
}
+ chunk->latest_message = context.monotonic_event_time;
latest_message_ = context.monotonic_event_time;
string_builder_.Reset();
@@ -420,11 +424,12 @@
total_message_bytes_ += context.size;
total_channel_bytes_[channel] += context.size;
- message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
- context.monotonic_event_time.time_since_epoch().count(),
- output->tellp()));
+ chunk->message_indices[channel_id].push_back(
+ std::make_pair<uint64_t, uint64_t>(
+ context.monotonic_event_time.time_since_epoch().count(),
+ chunk->data.tellp()));
- WriteRecord(OpCode::kMessage, string_builder_.Result(), output);
+ WriteRecord(OpCode::kMessage, string_builder_.Result(), &chunk->data);
}
void McapLogger::WriteRecord(OpCode op, std::string_view record,
@@ -436,18 +441,20 @@
*ostream << record;
}
-void McapLogger::WriteChunk() {
+void McapLogger::WriteChunk(ChunkStatus *chunk) {
string_builder_.Reset();
- CHECK(earliest_chunk_message_.has_value());
+ CHECK(chunk->earliest_message.has_value());
const uint64_t chunk_offset = output_.tellp();
AppendInt64(&string_builder_,
- earliest_chunk_message_->time_since_epoch().count());
- AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
+ chunk->earliest_message->time_since_epoch().count());
+ CHECK(chunk->latest_message.has_value());
+ AppendInt64(&string_builder_,
+ chunk->latest_message.value().time_since_epoch().count());
- std::string chunk_records = current_chunk_.str();
+ std::string chunk_records = chunk->data.str();
// Reset the chunk buffer.
- current_chunk_.str("");
+ chunk->data.str("");
const uint64_t records_size = chunk_records.size();
// Uncompressed chunk size.
@@ -460,19 +467,19 @@
std::map<uint16_t, uint64_t> index_offsets;
const uint64_t message_index_start = output_.tellp();
- for (const auto &indices : message_indices_) {
+ for (const auto &indices : chunk->message_indices) {
index_offsets[indices.first] = output_.tellp();
string_builder_.Reset();
AppendInt16(&string_builder_, indices.first);
AppendMessageIndices(&string_builder_, indices.second);
WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
}
- message_indices_.clear();
+ chunk->message_indices.clear();
chunk_indices_.push_back(ChunkIndex{
- earliest_chunk_message_.value(), latest_message_, chunk_offset,
+ chunk->earliest_message.value(), chunk->latest_message.value(), chunk_offset,
message_index_start - chunk_offset, records_size, index_offsets,
static_cast<uint64_t>(output_.tellp()) - message_index_start});
- earliest_chunk_message_.reset();
+ chunk->earliest_message.reset();
}
McapLogger::SummaryOffset McapLogger::WriteStatistics() {