Support indexing of MCAP files
Tested manually against Foxglove Studio and the `mcap doctor` tool.
Future work is to build the upstream `mcap` tool
(https://github.com/foxglove/mcap/tree/main/go/cli/mcap) which
includes a format validator, so that that can be tested automatically.
Change-Id: I05bb0e7487d76ac95e309545aceffc0304de1f1d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index 9ab4012..1cb9c32 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -4,6 +4,11 @@
#include "single_include/nlohmann/json.hpp"
namespace aos {
+namespace {
+// Amount of data to allow in each chunk before creating a new chunk.
+constexpr size_t kChunkSize = 10000000;
+}
+
nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
JsonSchemaRecursion recursion_level) {
nlohmann::json schema;
@@ -71,39 +76,100 @@
}
McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path)
- : output_(output_path) {
+ : event_loop_(event_loop), output_(output_path) {
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
CHECK(output_);
WriteMagic();
WriteHeader();
- uint16_t id = 1;
- for (const Channel *channel : *event_loop->configuration()->channels()) {
- if (!configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
- continue;
- }
-
- WriteSchema(id, channel);
-
- // Write out the channel entry that uses the schema (we just re-use the
- // chema ID for the channel ID, since we aren't deduplicating schemas for
- // channels that are of the same type).
- WriteChannel(id, id, channel);
-
- event_loop->MakeRawWatcher(
- channel, [this, id, channel](const Context &context, const void *) {
- WriteMessage(id, channel, context);
- });
- ++id;
- }
+ // Schemas and channels get written out both at the start and end of the file,
+ // per the MCAP spec.
+ WriteSchemasAndChannels(RegisterHandlers::kYes);
}
McapLogger::~McapLogger() {
+ // If we have any data remaining, write one last chunk.
+ if (current_chunk_.tellp() > 0) {
+ WriteChunk();
+ }
WriteDataEnd();
- WriteFooter();
+
+ // Now we enter the Summary section, where we write out all the channel/index
+ // information that readers need to be able to seek to arbitrary locations
+ // within the log.
+ const uint64_t summary_offset = output_.tellp();
+ const SummaryOffset chunk_indices_offset = WriteChunkIndices();
+ const SummaryOffset stats_offset = WriteStatistics();
+ // Schemas/Channels need to get reproduced in the summary section for random
+ // access reading.
+ const std::vector<SummaryOffset> offsets =
+ WriteSchemasAndChannels(RegisterHandlers::kNo);
+
+ // Next we have the summary offset section, which references the individual
+ // pieces of the summary section.
+ const uint64_t summary_offset_offset = output_.tellp();
+
+ // SummarytOffset's must all be the final thing before the footer.
+ WriteSummaryOffset(chunk_indices_offset);
+ WriteSummaryOffset(stats_offset);
+ for (const auto &offset : offsets) {
+ WriteSummaryOffset(offset);
+ }
+
+ // And finally, the footer which must itself reference the start of the
+ // summary and summary offset sections.
+ WriteFooter(summary_offset, summary_offset_offset);
WriteMagic();
}
+std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
+ RegisterHandlers register_handlers) {
+ uint16_t id = 1;
+ std::map<uint16_t, const Channel *> channels;
+ for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
+ continue;
+ }
+ channels[id] = channel;
+
+ if (register_handlers == RegisterHandlers::kYes) {
+ message_counts_[id] = 0;
+ event_loop_->MakeRawWatcher(
+ channel, [this, id, channel](const Context &context, const void *) {
+ WriteMessage(id, channel, context, ¤t_chunk_);
+ if (static_cast<uint64_t>(current_chunk_.tellp()) > kChunkSize) {
+ WriteChunk();
+ }
+ });
+ }
+ ++id;
+ }
+
+ std::vector<SummaryOffset> offsets;
+
+ const uint64_t schema_offset = output_.tellp();
+
+ for (const auto &pair : channels) {
+ WriteSchema(pair.first, pair.second);
+ }
+
+ const uint64_t channel_offset = output_.tellp();
+
+ offsets.push_back(
+ {OpCode::kSchema, schema_offset, channel_offset - schema_offset});
+
+ for (const auto &pair : channels) {
+ // Write out the channel entry that uses the schema (we just re-use
+ // the schema ID for the channel ID, since we aren't deduplicating
+ // schemas for channels that are of the same type).
+ WriteChannel(pair.first, pair.first, pair.second);
+ }
+
+ offsets.push_back({OpCode::kChannel, channel_offset,
+ static_cast<uint64_t>(output_.tellp()) - channel_offset});
+ return offsets;
+}
+
void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
void McapLogger::WriteHeader() {
@@ -115,11 +181,12 @@
WriteRecord(OpCode::kHeader, string_builder_.Result());
}
-void McapLogger::WriteFooter() {
+void McapLogger::WriteFooter(uint64_t summary_offset,
+ uint64_t summary_offset_offset) {
string_builder_.Reset();
- // Offsets and CRC32 for summary section, which we don't populate.
- AppendInt64(&string_builder_, 0);
- AppendInt64(&string_builder_, 0);
+ AppendInt64(&string_builder_, summary_offset);
+ AppendInt64(&string_builder_, summary_offset_offset);
+ // CRC32 for the Summary section, which we don't bother populating.
AppendInt32(&string_builder_, 0);
WriteRecord(OpCode::kFooter, string_builder_.Result());
}
@@ -167,9 +234,19 @@
}
void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
- const Context &context) {
+ const Context &context, std::ostream *output) {
CHECK_NOTNULL(context.data);
+ message_counts_[channel_id]++;
+
+ if (!earliest_message_.has_value()) {
+ earliest_message_ = context.monotonic_event_time;
+ }
+ if (!earliest_chunk_message_.has_value()) {
+ earliest_chunk_message_ = context.monotonic_event_time;
+ }
+ latest_message_ = context.monotonic_event_time;
+
string_builder_.Reset();
// Channel ID
AppendInt16(&string_builder_, channel_id);
@@ -193,15 +270,117 @@
aos::FlatbufferToJson(&string_builder_, channel->schema(),
static_cast<const uint8_t *>(context.data));
- WriteRecord(OpCode::kMessage, string_builder_.Result());
+ message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
+ context.monotonic_event_time.time_since_epoch().count(),
+ output->tellp()));
+
+ WriteRecord(OpCode::kMessage, string_builder_.Result(), output);
}
-void McapLogger::WriteRecord(OpCode op, std::string_view record) {
- output_.put(static_cast<char>(op));
+void McapLogger::WriteRecord(OpCode op, std::string_view record,
+ std::ostream *ostream) {
+ ostream->put(static_cast<char>(op));
uint64_t record_length = record.size();
- output_.write(reinterpret_cast<const char *>(&record_length),
- sizeof(record_length));
- output_ << record;
+ ostream->write(reinterpret_cast<const char *>(&record_length),
+ sizeof(record_length));
+ *ostream << record;
+}
+
+void McapLogger::WriteChunk() {
+ string_builder_.Reset();
+
+ CHECK(earliest_chunk_message_.has_value());
+ const uint64_t chunk_offset = output_.tellp();
+ AppendInt64(&string_builder_,
+ earliest_chunk_message_->time_since_epoch().count());
+ AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
+
+ std::string chunk_records = current_chunk_.str();
+ // Reset the chunk buffer.
+ current_chunk_.str("");
+
+ const uint64_t records_size = chunk_records.size();
+ // Uncompressed chunk size.
+ AppendInt64(&string_builder_, records_size);
+ // Uncompressed CRC (unpopulated).
+ AppendInt32(&string_builder_, 0);
+ AppendString(&string_builder_, "");
+ AppendBytes(&string_builder_, chunk_records);
+ WriteRecord(OpCode::kChunk, string_builder_.Result());
+
+ std::map<uint16_t, uint64_t> index_offsets;
+ const uint64_t message_index_start = output_.tellp();
+ for (const auto &indices : message_indices_) {
+ index_offsets[indices.first] = output_.tellp();
+ string_builder_.Reset();
+ AppendInt16(&string_builder_, indices.first);
+ AppendMessageIndices(&string_builder_, indices.second);
+ WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
+ }
+ message_indices_.clear();
+ chunk_indices_.push_back(ChunkIndex{
+ earliest_chunk_message_.value(), latest_message_, chunk_offset,
+ message_index_start - chunk_offset, records_size, index_offsets,
+ static_cast<uint64_t>(output_.tellp()) - message_index_start});
+ earliest_chunk_message_.reset();
+}
+
+McapLogger::SummaryOffset McapLogger::WriteStatistics() {
+ const uint64_t stats_offset = output_.tellp();
+ const uint64_t message_count = std::accumulate(
+ message_counts_.begin(), message_counts_.end(), 0,
+ [](const uint64_t &count, const std::pair<uint16_t, uint64_t> &val) {
+ return count + val.second;
+ });
+ string_builder_.Reset();
+ AppendInt64(&string_builder_, message_count);
+ // Schema count.
+ AppendInt16(&string_builder_, message_counts_.size());
+ // Channel count.
+ AppendInt32(&string_builder_, message_counts_.size());
+ // Attachment count.
+ AppendInt32(&string_builder_, 0);
+ // Metadata count.
+ AppendInt32(&string_builder_, 0);
+ // Chunk count.
+ AppendInt32(&string_builder_, chunk_indices_.size());
+ // Earliest & latest message times.
+ AppendInt64(&string_builder_, earliest_message_->time_since_epoch().count());
+ AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
+ // Per-channel message counts.
+ AppendChannelMap(&string_builder_, message_counts_);
+ WriteRecord(OpCode::kStatistics, string_builder_.Result());
+ return {OpCode::kStatistics, stats_offset,
+ static_cast<uint64_t>(output_.tellp()) - stats_offset};
+}
+
+McapLogger::SummaryOffset McapLogger::WriteChunkIndices() {
+ const uint64_t index_offset = output_.tellp();
+ for (const ChunkIndex &index : chunk_indices_) {
+ string_builder_.Reset();
+ AppendInt64(&string_builder_, index.start_time.time_since_epoch().count());
+ AppendInt64(&string_builder_, index.end_time.time_since_epoch().count());
+ AppendInt64(&string_builder_, index.offset);
+ AppendInt64(&string_builder_, index.chunk_size);
+ AppendChannelMap(&string_builder_, index.message_index_offsets);
+ AppendInt64(&string_builder_, index.message_index_size);
+ // Compression used.
+ AppendString(&string_builder_, "");
+ // Compressed and uncompressed records size.
+ AppendInt64(&string_builder_, index.records_size);
+ AppendInt64(&string_builder_, index.records_size);
+ WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
+ }
+ return {OpCode::kChunkIndex, index_offset,
+ static_cast<uint64_t>(output_.tellp()) - index_offset};
+}
+
+void McapLogger::WriteSummaryOffset(const SummaryOffset &offset) {
+ string_builder_.Reset();
+ string_builder_.AppendChar(static_cast<char>(offset.op_code));
+ AppendInt64(&string_builder_, offset.offset);
+ AppendInt64(&string_builder_, offset.size);
+ WriteRecord(OpCode::kSummaryOffset, string_builder_.Result());
}
void McapLogger::AppendString(FastStringBuilder *builder,
@@ -210,14 +389,41 @@
builder->Append(string);
}
+void McapLogger::AppendBytes(FastStringBuilder *builder,
+ std::string_view bytes) {
+ AppendInt64(builder, bytes.size());
+ builder->Append(bytes);
+}
+
namespace {
template <typename T>
static void AppendInt(FastStringBuilder *builder, T val) {
builder->Append(
std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
}
+template <typename T>
+void AppendMap(FastStringBuilder *builder, const T &map) {
+ AppendInt<uint32_t>(
+ builder, map.size() * (sizeof(typename T::value_type::first_type) +
+ sizeof(typename T::value_type::second_type)));
+ for (const auto &pair : map) {
+ AppendInt(builder, pair.first);
+ AppendInt(builder, pair.second);
+ }
+}
} // namespace
+void McapLogger::AppendChannelMap(FastStringBuilder *builder,
+ const std::map<uint16_t, uint64_t> &map) {
+ AppendMap(builder, map);
+}
+
+void McapLogger::AppendMessageIndices(
+ FastStringBuilder *builder,
+ const std::vector<std::pair<uint64_t, uint64_t>> &messages) {
+ AppendMap(builder, messages);
+}
+
void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
AppendInt(builder, val);
}