Support indexing of MCAP files
Tested manually against Foxglove Studio and the `mcap doctor` tool.
Future work is to build the upstream `mcap` tool
(https://github.com/foxglove/mcap/tree/main/go/cli/mcap) which
includes a format validator, so that that can be tested automatically.
Change-Id: I05bb0e7487d76ac95e309545aceffc0304de1f1d
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/mcap_logger.h b/aos/util/mcap_logger.h
index 15d11db..dcacb68 100644
--- a/aos/util/mcap_logger.h
+++ b/aos/util/mcap_logger.h
@@ -26,6 +26,8 @@
// Generates an MCAP file, per the specification at
// https://github.com/foxglove/mcap/tree/main/docs/specification
+// This currently generates an uncompressed logfile with full message indexing
+// available, to be able to support Foxglove fully.
class McapLogger {
public:
McapLogger(EventLoop *event_loop, const std::string &output_path);
@@ -38,30 +40,109 @@
kSchema = 0x03,
kChannel = 0x04,
kMessage = 0x05,
+ kChunk = 0x06,
+ kMessageIndex = 0x07,
+ kChunkIndex = 0x08,
+ kAttachment = 0x09,
+ kAttachmentIndex = 0x0A,
+ kStatistics = 0x0B,
+ kMetadata = 0x0C,
+ kMetadataIndex = 0x0D,
+ kSummaryOffset = 0x0E,
kDataEnd = 0x0F,
};
+ // Stores information associated with a SummaryOffset entry (an offset to the
+ // start of a section within Summary section, which allows readers to quickly
+ // find all the indices/channel definitions/etc. for a given log).
+ struct SummaryOffset {
+ OpCode op_code;
+ // Offset from the start of the file.
+ uint64_t offset;
+ // Total length of the section, in bytes.
+ uint64_t size;
+ };
+ // Information needed to build a ChunkIndex entry.
+ struct ChunkIndex {
+ // Earliest and latest message times within the Chunk being referenced.
+ aos::monotonic_clock::time_point start_time;
+ aos::monotonic_clock::time_point end_time;
+ // Offset from the start of the file to the start of the relevant Chunk.
+ uint64_t offset;
+ // Total size of the Chunk, in bytes.
+ uint64_t chunk_size;
+ // Total size of the records portion of the Chunk, in bytes.
+ uint64_t records_size;
+ // Mapping of channel IDs to the MessageIndex entry for that channel within
+ // the referenced Chunk. The MessageIndex is referenced by an offset from
+ // the start of the file.
+ std::map<uint16_t, uint64_t> message_index_offsets;
+ // Total size, in bytes, of all the MessageIndex entries for this Chunk
+ // together (note that they are required to be contiguous).
+ uint64_t message_index_size;
+ };
+ enum class RegisterHandlers { kYes, kNo };
// Helpers to write each type of relevant record.
void WriteMagic();
void WriteHeader();
- void WriteFooter();
+ void WriteFooter(uint64_t summary_offset, uint64_t summary_offset_offset);
void WriteDataEnd();
void WriteSchema(const uint16_t id, const aos::Channel *channel);
void WriteChannel(const uint16_t id, const uint16_t schema_id,
const aos::Channel *channel);
void WriteMessage(uint16_t channel_id, const Channel *channel,
- const Context &context);
- void WriteConfig();
+ const Context &context, std::ostream *output);
+ void WriteChunk();
+
+ // The helpers for writing records which appear in the Summary section will
+ // return SummaryOffset's so that they can be referenced in the SummaryOffset
+ // section.
+ SummaryOffset WriteChunkIndices();
+ SummaryOffset WriteStatistics();
+ std::vector<SummaryOffset> WriteSchemasAndChannels(
+ RegisterHandlers register_handlers);
+ void WriteSummaryOffset(const SummaryOffset &offset);
// Writes an MCAP record to the output file.
- void WriteRecord(OpCode op, std::string_view record);
- // Adds an MCAP-spec string/fixed-size integer to a buffer.
+ void WriteRecord(OpCode op, std::string_view record, std::ostream *ostream);
+ void WriteRecord(OpCode op, std::string_view record) {
+ WriteRecord(op, record, &output_);
+ }
+ // Adds an MCAP-spec string/byte-array/map/array of pairs/fixed-size integer
+ // to a buffer.
static void AppendString(FastStringBuilder *builder, std::string_view string);
+ static void AppendBytes(FastStringBuilder *builder, std::string_view bytes);
+ static void AppendChannelMap(FastStringBuilder *builder,
+ const std::map<uint16_t, uint64_t> &map);
+ static void AppendMessageIndices(
+ FastStringBuilder *builder,
+ const std::vector<std::pair<uint64_t, uint64_t>> &messages);
static void AppendInt16(FastStringBuilder *builder, uint16_t val);
static void AppendInt32(FastStringBuilder *builder, uint32_t val);
static void AppendInt64(FastStringBuilder *builder, uint64_t val);
+ aos::EventLoop *event_loop_;
std::ofstream output_;
+ // Buffer containing serialized message data for the currently-being-built
+ // chunk.
+ std::stringstream current_chunk_;
FastStringBuilder string_builder_;
+
+ // Earliest message observed in this logfile.
+ std::optional<aos::monotonic_clock::time_point> earliest_message_;
+ // Earliest message observed in the current chunk.
+ std::optional<aos::monotonic_clock::time_point> earliest_chunk_message_;
+ // Latest message observed.
+ aos::monotonic_clock::time_point latest_message_ =
+ aos::monotonic_clock::min_time;
+ // Count of all messages on each channel, indexed by channel ID.
+ std::map<uint16_t, uint64_t> message_counts_;
+ // MessageIndex's for each message. The std::map is indexed by channel ID. The
+ // vector is then a series of pairs of (timestamp, offset from start of
+ // current_chunk_).
+ std::map<uint16_t, std::vector<std::pair<uint64_t, uint64_t>>>
+ message_indices_;
+ // ChunkIndex's for all fully written Chunks.
+ std::vector<ChunkIndex> chunk_indices_;
};
} // namespace aos
#endif // AOS_UTIL_MCAP_LOGGER_H_