Merge "Support indexing of MCAP files"
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index 9ab4012..1cb9c32 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -4,6 +4,11 @@
#include "single_include/nlohmann/json.hpp"
namespace aos {
+namespace {
+// Amount of data to allow in each chunk before creating a new chunk.
+constexpr size_t kChunkSize = 10000000;
+}
+
nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
JsonSchemaRecursion recursion_level) {
nlohmann::json schema;
@@ -71,39 +76,100 @@
}
McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path)
- : output_(output_path) {
+ : event_loop_(event_loop), output_(output_path) {
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
CHECK(output_);
WriteMagic();
WriteHeader();
- uint16_t id = 1;
- for (const Channel *channel : *event_loop->configuration()->channels()) {
- if (!configuration::ChannelIsReadableOnNode(channel, event_loop->node())) {
- continue;
- }
-
- WriteSchema(id, channel);
-
- // Write out the channel entry that uses the schema (we just re-use the
- // chema ID for the channel ID, since we aren't deduplicating schemas for
- // channels that are of the same type).
- WriteChannel(id, id, channel);
-
- event_loop->MakeRawWatcher(
- channel, [this, id, channel](const Context &context, const void *) {
- WriteMessage(id, channel, context);
- });
- ++id;
- }
+ // Schemas and channels get written out both at the start and end of the file,
+ // per the MCAP spec.
+ WriteSchemasAndChannels(RegisterHandlers::kYes);
}
McapLogger::~McapLogger() {
+ // If we have any data remaining, write one last chunk.
+ if (current_chunk_.tellp() > 0) {
+ WriteChunk();
+ }
WriteDataEnd();
- WriteFooter();
+
+ // Now we enter the Summary section, where we write out all the channel/index
+ // information that readers need to be able to seek to arbitrary locations
+ // within the log.
+ const uint64_t summary_offset = output_.tellp();
+ const SummaryOffset chunk_indices_offset = WriteChunkIndices();
+ const SummaryOffset stats_offset = WriteStatistics();
+ // Schemas/Channels need to get reproduced in the summary section for random
+ // access reading.
+ const std::vector<SummaryOffset> offsets =
+ WriteSchemasAndChannels(RegisterHandlers::kNo);
+
+ // Next we have the summary offset section, which references the individual
+ // pieces of the summary section.
+ const uint64_t summary_offset_offset = output_.tellp();
+
+ // SummarytOffset's must all be the final thing before the footer.
+ WriteSummaryOffset(chunk_indices_offset);
+ WriteSummaryOffset(stats_offset);
+ for (const auto &offset : offsets) {
+ WriteSummaryOffset(offset);
+ }
+
+ // And finally, the footer which must itself reference the start of the
+ // summary and summary offset sections.
+ WriteFooter(summary_offset, summary_offset_offset);
WriteMagic();
}
+std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
+ RegisterHandlers register_handlers) {
+ uint16_t id = 1;
+ std::map<uint16_t, const Channel *> channels;
+ for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
+ continue;
+ }
+ channels[id] = channel;
+
+ if (register_handlers == RegisterHandlers::kYes) {
+ message_counts_[id] = 0;
+ event_loop_->MakeRawWatcher(
+ channel, [this, id, channel](const Context &context, const void *) {
+ WriteMessage(id, channel, context, ¤t_chunk_);
+ if (static_cast<uint64_t>(current_chunk_.tellp()) > kChunkSize) {
+ WriteChunk();
+ }
+ });
+ }
+ ++id;
+ }
+
+ std::vector<SummaryOffset> offsets;
+
+ const uint64_t schema_offset = output_.tellp();
+
+ for (const auto &pair : channels) {
+ WriteSchema(pair.first, pair.second);
+ }
+
+ const uint64_t channel_offset = output_.tellp();
+
+ offsets.push_back(
+ {OpCode::kSchema, schema_offset, channel_offset - schema_offset});
+
+ for (const auto &pair : channels) {
+ // Write out the channel entry that uses the schema (we just re-use
+ // the schema ID for the channel ID, since we aren't deduplicating
+ // schemas for channels that are of the same type).
+ WriteChannel(pair.first, pair.first, pair.second);
+ }
+
+ offsets.push_back({OpCode::kChannel, channel_offset,
+ static_cast<uint64_t>(output_.tellp()) - channel_offset});
+ return offsets;
+}
+
void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
void McapLogger::WriteHeader() {
@@ -115,11 +181,12 @@
WriteRecord(OpCode::kHeader, string_builder_.Result());
}
-void McapLogger::WriteFooter() {
+void McapLogger::WriteFooter(uint64_t summary_offset,
+ uint64_t summary_offset_offset) {
string_builder_.Reset();
- // Offsets and CRC32 for summary section, which we don't populate.
- AppendInt64(&string_builder_, 0);
- AppendInt64(&string_builder_, 0);
+ AppendInt64(&string_builder_, summary_offset);
+ AppendInt64(&string_builder_, summary_offset_offset);
+ // CRC32 for the Summary section, which we don't bother populating.
AppendInt32(&string_builder_, 0);
WriteRecord(OpCode::kFooter, string_builder_.Result());
}
@@ -167,9 +234,19 @@
}
void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
- const Context &context) {
+ const Context &context, std::ostream *output) {
CHECK_NOTNULL(context.data);
+ message_counts_[channel_id]++;
+
+ if (!earliest_message_.has_value()) {
+ earliest_message_ = context.monotonic_event_time;
+ }
+ if (!earliest_chunk_message_.has_value()) {
+ earliest_chunk_message_ = context.monotonic_event_time;
+ }
+ latest_message_ = context.monotonic_event_time;
+
string_builder_.Reset();
// Channel ID
AppendInt16(&string_builder_, channel_id);
@@ -193,15 +270,117 @@
aos::FlatbufferToJson(&string_builder_, channel->schema(),
static_cast<const uint8_t *>(context.data));
- WriteRecord(OpCode::kMessage, string_builder_.Result());
+ message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
+ context.monotonic_event_time.time_since_epoch().count(),
+ output->tellp()));
+
+ WriteRecord(OpCode::kMessage, string_builder_.Result(), output);
}
-void McapLogger::WriteRecord(OpCode op, std::string_view record) {
- output_.put(static_cast<char>(op));
+void McapLogger::WriteRecord(OpCode op, std::string_view record,
+ std::ostream *ostream) {
+ ostream->put(static_cast<char>(op));
uint64_t record_length = record.size();
- output_.write(reinterpret_cast<const char *>(&record_length),
- sizeof(record_length));
- output_ << record;
+ ostream->write(reinterpret_cast<const char *>(&record_length),
+ sizeof(record_length));
+ *ostream << record;
+}
+
+void McapLogger::WriteChunk() {
+ string_builder_.Reset();
+
+ CHECK(earliest_chunk_message_.has_value());
+ const uint64_t chunk_offset = output_.tellp();
+ AppendInt64(&string_builder_,
+ earliest_chunk_message_->time_since_epoch().count());
+ AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
+
+ std::string chunk_records = current_chunk_.str();
+ // Reset the chunk buffer.
+ current_chunk_.str("");
+
+ const uint64_t records_size = chunk_records.size();
+ // Uncompressed chunk size.
+ AppendInt64(&string_builder_, records_size);
+ // Uncompressed CRC (unpopulated).
+ AppendInt32(&string_builder_, 0);
+ AppendString(&string_builder_, "");
+ AppendBytes(&string_builder_, chunk_records);
+ WriteRecord(OpCode::kChunk, string_builder_.Result());
+
+ std::map<uint16_t, uint64_t> index_offsets;
+ const uint64_t message_index_start = output_.tellp();
+ for (const auto &indices : message_indices_) {
+ index_offsets[indices.first] = output_.tellp();
+ string_builder_.Reset();
+ AppendInt16(&string_builder_, indices.first);
+ AppendMessageIndices(&string_builder_, indices.second);
+ WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
+ }
+ message_indices_.clear();
+ chunk_indices_.push_back(ChunkIndex{
+ earliest_chunk_message_.value(), latest_message_, chunk_offset,
+ message_index_start - chunk_offset, records_size, index_offsets,
+ static_cast<uint64_t>(output_.tellp()) - message_index_start});
+ earliest_chunk_message_.reset();
+}
+
+McapLogger::SummaryOffset McapLogger::WriteStatistics() {
+ const uint64_t stats_offset = output_.tellp();
+ const uint64_t message_count = std::accumulate(
+ message_counts_.begin(), message_counts_.end(), 0,
+ [](const uint64_t &count, const std::pair<uint16_t, uint64_t> &val) {
+ return count + val.second;
+ });
+ string_builder_.Reset();
+ AppendInt64(&string_builder_, message_count);
+ // Schema count.
+ AppendInt16(&string_builder_, message_counts_.size());
+ // Channel count.
+ AppendInt32(&string_builder_, message_counts_.size());
+ // Attachment count.
+ AppendInt32(&string_builder_, 0);
+ // Metadata count.
+ AppendInt32(&string_builder_, 0);
+ // Chunk count.
+ AppendInt32(&string_builder_, chunk_indices_.size());
+ // Earliest & latest message times.
+ AppendInt64(&string_builder_, earliest_message_->time_since_epoch().count());
+ AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
+ // Per-channel message counts.
+ AppendChannelMap(&string_builder_, message_counts_);
+ WriteRecord(OpCode::kStatistics, string_builder_.Result());
+ return {OpCode::kStatistics, stats_offset,
+ static_cast<uint64_t>(output_.tellp()) - stats_offset};
+}
+
+McapLogger::SummaryOffset McapLogger::WriteChunkIndices() {
+ const uint64_t index_offset = output_.tellp();
+ for (const ChunkIndex &index : chunk_indices_) {
+ string_builder_.Reset();
+ AppendInt64(&string_builder_, index.start_time.time_since_epoch().count());
+ AppendInt64(&string_builder_, index.end_time.time_since_epoch().count());
+ AppendInt64(&string_builder_, index.offset);
+ AppendInt64(&string_builder_, index.chunk_size);
+ AppendChannelMap(&string_builder_, index.message_index_offsets);
+ AppendInt64(&string_builder_, index.message_index_size);
+ // Compression used.
+ AppendString(&string_builder_, "");
+ // Compressed and uncompressed records size.
+ AppendInt64(&string_builder_, index.records_size);
+ AppendInt64(&string_builder_, index.records_size);
+ WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
+ }
+ return {OpCode::kChunkIndex, index_offset,
+ static_cast<uint64_t>(output_.tellp()) - index_offset};
+}
+
+void McapLogger::WriteSummaryOffset(const SummaryOffset &offset) {
+ string_builder_.Reset();
+ string_builder_.AppendChar(static_cast<char>(offset.op_code));
+ AppendInt64(&string_builder_, offset.offset);
+ AppendInt64(&string_builder_, offset.size);
+ WriteRecord(OpCode::kSummaryOffset, string_builder_.Result());
}
void McapLogger::AppendString(FastStringBuilder *builder,
@@ -210,14 +389,41 @@
builder->Append(string);
}
+void McapLogger::AppendBytes(FastStringBuilder *builder,
+ std::string_view bytes) {
+ AppendInt64(builder, bytes.size());
+ builder->Append(bytes);
+}
+
namespace {
template <typename T>
static void AppendInt(FastStringBuilder *builder, T val) {
builder->Append(
std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
}
+template <typename T>
+void AppendMap(FastStringBuilder *builder, const T &map) {
+ AppendInt<uint32_t>(
+ builder, map.size() * (sizeof(typename T::value_type::first_type) +
+ sizeof(typename T::value_type::second_type)));
+ for (const auto &pair : map) {
+ AppendInt(builder, pair.first);
+ AppendInt(builder, pair.second);
+ }
+}
} // namespace
+void McapLogger::AppendChannelMap(FastStringBuilder *builder,
+ const std::map<uint16_t, uint64_t> &map) {
+ AppendMap(builder, map);
+}
+
+void McapLogger::AppendMessageIndices(
+ FastStringBuilder *builder,
+ const std::vector<std::pair<uint64_t, uint64_t>> &messages) {
+ AppendMap(builder, messages);
+}
+
void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
AppendInt(builder, val);
}
diff --git a/aos/util/mcap_logger.h b/aos/util/mcap_logger.h
index 15d11db..dcacb68 100644
--- a/aos/util/mcap_logger.h
+++ b/aos/util/mcap_logger.h
@@ -26,6 +26,8 @@
// Generates an MCAP file, per the specification at
// https://github.com/foxglove/mcap/tree/main/docs/specification
+// This currently generates an uncompressed logfile with full message indexing
+// available, to be able to support Foxglove fully.
class McapLogger {
public:
McapLogger(EventLoop *event_loop, const std::string &output_path);
@@ -38,30 +40,109 @@
kSchema = 0x03,
kChannel = 0x04,
kMessage = 0x05,
+ kChunk = 0x06,
+ kMessageIndex = 0x07,
+ kChunkIndex = 0x08,
+ kAttachment = 0x09,
+ kAttachmentIndex = 0x0A,
+ kStatistics = 0x0B,
+ kMetadata = 0x0C,
+ kMetadataIndex = 0x0D,
+ kSummaryOffset = 0x0E,
kDataEnd = 0x0F,
};
+ // Stores information associated with a SummaryOffset entry (an offset to the
+ // start of a section within Summary section, which allows readers to quickly
+ // find all the indices/channel definitions/etc. for a given log).
+ struct SummaryOffset {
+ OpCode op_code;
+ // Offset from the start of the file.
+ uint64_t offset;
+ // Total length of the section, in bytes.
+ uint64_t size;
+ };
+ // Information needed to build a ChunkIndex entry.
+ struct ChunkIndex {
+ // Earliest and latest message times within the Chunk being referenced.
+ aos::monotonic_clock::time_point start_time;
+ aos::monotonic_clock::time_point end_time;
+ // Offset from the start of the file to the start of the relevant Chunk.
+ uint64_t offset;
+ // Total size of the Chunk, in bytes.
+ uint64_t chunk_size;
+ // Total size of the records portion of the Chunk, in bytes.
+ uint64_t records_size;
+ // Mapping of channel IDs to the MessageIndex entry for that channel within
+ // the referenced Chunk. The MessageIndex is referenced by an offset from
+ // the start of the file.
+ std::map<uint16_t, uint64_t> message_index_offsets;
+ // Total size, in bytes, of all the MessageIndex entries for this Chunk
+ // together (note that they are required to be contiguous).
+ uint64_t message_index_size;
+ };
+ enum class RegisterHandlers { kYes, kNo };
// Helpers to write each type of relevant record.
void WriteMagic();
void WriteHeader();
- void WriteFooter();
+ void WriteFooter(uint64_t summary_offset, uint64_t summary_offset_offset);
void WriteDataEnd();
void WriteSchema(const uint16_t id, const aos::Channel *channel);
void WriteChannel(const uint16_t id, const uint16_t schema_id,
const aos::Channel *channel);
void WriteMessage(uint16_t channel_id, const Channel *channel,
- const Context &context);
- void WriteConfig();
+ const Context &context, std::ostream *output);
+ void WriteChunk();
+
+ // The helpers for writing records which appear in the Summary section will
+ // return SummaryOffset's so that they can be referenced in the SummaryOffset
+ // section.
+ SummaryOffset WriteChunkIndices();
+ SummaryOffset WriteStatistics();
+ std::vector<SummaryOffset> WriteSchemasAndChannels(
+ RegisterHandlers register_handlers);
+ void WriteSummaryOffset(const SummaryOffset &offset);
// Writes an MCAP record to the output file.
- void WriteRecord(OpCode op, std::string_view record);
- // Adds an MCAP-spec string/fixed-size integer to a buffer.
+ void WriteRecord(OpCode op, std::string_view record, std::ostream *ostream);
+ void WriteRecord(OpCode op, std::string_view record) {
+ WriteRecord(op, record, &output_);
+ }
+ // Adds an MCAP-spec string/byte-array/map/array of pairs/fixed-size integer
+ // to a buffer.
static void AppendString(FastStringBuilder *builder, std::string_view string);
+ static void AppendBytes(FastStringBuilder *builder, std::string_view bytes);
+ static void AppendChannelMap(FastStringBuilder *builder,
+ const std::map<uint16_t, uint64_t> &map);
+ static void AppendMessageIndices(
+ FastStringBuilder *builder,
+ const std::vector<std::pair<uint64_t, uint64_t>> &messages);
static void AppendInt16(FastStringBuilder *builder, uint16_t val);
static void AppendInt32(FastStringBuilder *builder, uint32_t val);
static void AppendInt64(FastStringBuilder *builder, uint64_t val);
+ aos::EventLoop *event_loop_;
std::ofstream output_;
+ // Buffer containing serialized message data for the currently-being-built
+ // chunk.
+ std::stringstream current_chunk_;
FastStringBuilder string_builder_;
+
+ // Earliest message observed in this logfile.
+ std::optional<aos::monotonic_clock::time_point> earliest_message_;
+ // Earliest message observed in the current chunk.
+ std::optional<aos::monotonic_clock::time_point> earliest_chunk_message_;
+ // Latest message observed.
+ aos::monotonic_clock::time_point latest_message_ =
+ aos::monotonic_clock::min_time;
+ // Count of all messages on each channel, indexed by channel ID.
+ std::map<uint16_t, uint64_t> message_counts_;
+ // MessageIndex's for each message. The std::map is indexed by channel ID. The
+ // vector is then a series of pairs of (timestamp, offset from start of
+ // current_chunk_).
+ std::map<uint16_t, std::vector<std::pair<uint64_t, uint64_t>>>
+ message_indices_;
+ // ChunkIndex's for all fully written Chunks.
+ std::vector<ChunkIndex> chunk_indices_;
};
} // namespace aos
#endif // AOS_UTIL_MCAP_LOGGER_H_