blob: 70a13282a66b3c45c7ce7ac96e2b8eff206e9ed3 [file] [log] [blame]
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07001#ifndef AOS_UTIL_MCAP_LOGGER_H_
2#define AOS_UTIL_MCAP_LOGGER_H_
3
4#include "aos/configuration_generated.h"
5#include "aos/events/event_loop.h"
6#include "aos/fast_string_builder.h"
7#include "aos/flatbuffer_utils.h"
8#include "single_include/nlohmann/json.hpp"
9
10namespace aos {
11
12// Produces a JSON Schema (https://json-schema.org/) for a given flatbuffer
13// type. If recursion_level is set, will include a $schema attribute indicating
14// the schema definition being used (this is used to allow for recursion).
15//
16// Note that this is pretty bare-bones, so, e.g., we don't distinguish between
17// structs and tables when generating the JSON schema, so we don't bother to
18// mark struct fields as required.
19enum class JsonSchemaRecursion {
20 kTopLevel,
21 kNested,
22};
23nlohmann::json JsonSchemaForFlatbuffer(
24 const FlatbufferType &type,
25 JsonSchemaRecursion recursion_level = JsonSchemaRecursion::kTopLevel);
26
27// Generates an MCAP file, per the specification at
28// https://github.com/foxglove/mcap/tree/main/docs/specification
James Kuszmaulb3fba252022-04-06 15:13:31 -070029// This currently generates an uncompressed logfile with full message indexing
30// available, to be able to support Foxglove fully.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070031class McapLogger {
32 public:
James Kuszmaulc31d7362022-05-27 14:20:04 -070033 // Whether to serialize the messages into the MCAP file as JSON or
34 // flatbuffers.
35 enum class Serialization {
36 kJson,
37 kFlatbuffer,
38 };
James Kuszmaul9f607c62022-10-27 17:01:55 -070039 // Whether to attempt to shorten channel names.
40 enum class CanonicalChannelNames {
41 // Just use the full, unambiguous, channel names.
42 kCanonical,
43 // Use GetChannelAliases() to determine the shortest possible name for the
44 // channel for the current node, and use that in the MCAP file. This makes
45 // it so that the channels in the resulting file are more likely to match
46 // the channel names that are used in "real" applications.
47 kShortened,
48 };
James Kuszmaul5ab990d2022-11-07 16:35:49 -080049 // Chunk compression to use in the MCAP file.
50 enum class Compression {
51 kNone,
52 kLz4,
53 };
James Kuszmaulc31d7362022-05-27 14:20:04 -070054 McapLogger(EventLoop *event_loop, const std::string &output_path,
James Kuszmaul5ab990d2022-11-07 16:35:49 -080055 Serialization serialization,
56 CanonicalChannelNames canonical_channels, Compression compression);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070057 ~McapLogger();
58
59 private:
60 enum class OpCode {
61 kHeader = 0x01,
62 kFooter = 0x02,
63 kSchema = 0x03,
64 kChannel = 0x04,
65 kMessage = 0x05,
James Kuszmaulb3fba252022-04-06 15:13:31 -070066 kChunk = 0x06,
67 kMessageIndex = 0x07,
68 kChunkIndex = 0x08,
69 kAttachment = 0x09,
70 kAttachmentIndex = 0x0A,
71 kStatistics = 0x0B,
72 kMetadata = 0x0C,
73 kMetadataIndex = 0x0D,
74 kSummaryOffset = 0x0E,
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070075 kDataEnd = 0x0F,
76 };
James Kuszmaulb3fba252022-04-06 15:13:31 -070077 // Stores information associated with a SummaryOffset entry (an offset to the
78 // start of a section within Summary section, which allows readers to quickly
79 // find all the indices/channel definitions/etc. for a given log).
80 struct SummaryOffset {
81 OpCode op_code;
82 // Offset from the start of the file.
83 uint64_t offset;
84 // Total length of the section, in bytes.
85 uint64_t size;
86 };
87 // Information needed to build a ChunkIndex entry.
88 struct ChunkIndex {
89 // Earliest and latest message times within the Chunk being referenced.
90 aos::monotonic_clock::time_point start_time;
91 aos::monotonic_clock::time_point end_time;
92 // Offset from the start of the file to the start of the relevant Chunk.
93 uint64_t offset;
94 // Total size of the Chunk, in bytes.
95 uint64_t chunk_size;
James Kuszmaul5ab990d2022-11-07 16:35:49 -080096 // Total uncompressed size of the records portion of the Chunk, in bytes.
James Kuszmaulb3fba252022-04-06 15:13:31 -070097 uint64_t records_size;
James Kuszmaul5ab990d2022-11-07 16:35:49 -080098 // Total size of the records portion of the Chunk, when compressed
99 uint64_t records_size_compressed;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700100 // Mapping of channel IDs to the MessageIndex entry for that channel within
101 // the referenced Chunk. The MessageIndex is referenced by an offset from
102 // the start of the file.
103 std::map<uint16_t, uint64_t> message_index_offsets;
104 // Total size, in bytes, of all the MessageIndex entries for this Chunk
105 // together (note that they are required to be contiguous).
106 uint64_t message_index_size;
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800107 // Compression used in this Chunk.
108 Compression compression;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700109 };
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800110 // Maintains the state of a single Chunk. In order to maximize read
111 // performance, we currently maintain separate chunks for each channel so
112 // that, in order to read a given channel, only data associated with that
113 // channel nead be read.
James Kuszmaul36a25f42022-10-28 10:18:00 -0700114 struct ChunkStatus {
115 // Buffer containing serialized message data for the currently-being-built
116 // chunk.
117 std::stringstream data;
118 // Earliest message observed in this chunk.
119 std::optional<aos::monotonic_clock::time_point> earliest_message;
120 // Latest message observed in this chunk.
121 std::optional<aos::monotonic_clock::time_point> latest_message;
122 // MessageIndex's for each message. The std::map is indexed by channel ID.
123 // The vector is then a series of pairs of (timestamp, offset from start of
124 // data).
125 // Note that currently this will only ever have one entry, for the channel
126 // that this chunk corresponds to. However, the standard provides for there
127 // being more than one channel per chunk and so we still have some code that
128 // supports that.
129 std::map<uint16_t, std::vector<std::pair<uint64_t, uint64_t>>>
130 message_indices;
131 };
James Kuszmaulb3fba252022-04-06 15:13:31 -0700132 enum class RegisterHandlers { kYes, kNo };
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700133 // Helpers to write each type of relevant record.
134 void WriteMagic();
135 void WriteHeader();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700136 void WriteFooter(uint64_t summary_offset, uint64_t summary_offset_offset);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700137 void WriteDataEnd();
138 void WriteSchema(const uint16_t id, const aos::Channel *channel);
139 void WriteChannel(const uint16_t id, const uint16_t schema_id,
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700140 const aos::Channel *channel,
141 std::string_view override_name = "");
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700142 void WriteMessage(uint16_t channel_id, const Channel *channel,
James Kuszmaul36a25f42022-10-28 10:18:00 -0700143 const Context &context, ChunkStatus *chunk);
144 void WriteChunk(ChunkStatus *chunk);
James Kuszmaulb3fba252022-04-06 15:13:31 -0700145
146 // The helpers for writing records which appear in the Summary section will
147 // return SummaryOffset's so that they can be referenced in the SummaryOffset
148 // section.
149 SummaryOffset WriteChunkIndices();
150 SummaryOffset WriteStatistics();
151 std::vector<SummaryOffset> WriteSchemasAndChannels(
152 RegisterHandlers register_handlers);
153 void WriteSummaryOffset(const SummaryOffset &offset);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700154
155 // Writes an MCAP record to the output file.
James Kuszmaulb3fba252022-04-06 15:13:31 -0700156 void WriteRecord(OpCode op, std::string_view record, std::ostream *ostream);
157 void WriteRecord(OpCode op, std::string_view record) {
158 WriteRecord(op, record, &output_);
159 }
160 // Adds an MCAP-spec string/byte-array/map/array of pairs/fixed-size integer
161 // to a buffer.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700162 static void AppendString(FastStringBuilder *builder, std::string_view string);
James Kuszmaulb3fba252022-04-06 15:13:31 -0700163 static void AppendBytes(FastStringBuilder *builder, std::string_view bytes);
164 static void AppendChannelMap(FastStringBuilder *builder,
165 const std::map<uint16_t, uint64_t> &map);
166 static void AppendMessageIndices(
167 FastStringBuilder *builder,
168 const std::vector<std::pair<uint64_t, uint64_t>> &messages);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700169 static void AppendInt16(FastStringBuilder *builder, uint16_t val);
170 static void AppendInt32(FastStringBuilder *builder, uint32_t val);
171 static void AppendInt64(FastStringBuilder *builder, uint64_t val);
172
James Kuszmaulb3fba252022-04-06 15:13:31 -0700173 aos::EventLoop *event_loop_;
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700174 std::ofstream output_;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700175 const Serialization serialization_;
James Kuszmaul9f607c62022-10-27 17:01:55 -0700176 const CanonicalChannelNames canonical_channels_;
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800177 const Compression compression_;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700178 size_t total_message_bytes_ = 0;
179 std::map<const Channel *, size_t> total_channel_bytes_;
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700180 FastStringBuilder string_builder_;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700181
182 // Earliest message observed in this logfile.
183 std::optional<aos::monotonic_clock::time_point> earliest_message_;
James Kuszmaul36a25f42022-10-28 10:18:00 -0700184 // Latest message observed in this logfile.
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800185 aos::monotonic_clock::time_point latest_message_ =
186 aos::monotonic_clock::min_time;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700187 // Count of all messages on each channel, indexed by channel ID.
188 std::map<uint16_t, uint64_t> message_counts_;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700189 std::map<uint16_t, std::unique_ptr<RawFetcher>> fetchers_;
James Kuszmaul36a25f42022-10-28 10:18:00 -0700190 // All currently-being-built chunks. Indexed by channel ID. This is used to
191 // segregate channels into separate chunks to support more efficient reading.
192 std::map<uint16_t, ChunkStatus> current_chunks_;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700193 // ChunkIndex's for all fully written Chunks.
194 std::vector<ChunkIndex> chunk_indices_;
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700195
196 // Metadata associated with the fake "configuration" channel that we create in
197 // order to ensure that foxglove extensions/users have access to the full
198 // configuration.
199 uint16_t configuration_id_ = 0;
200 FlatbufferDetachedBuffer<Channel> configuration_channel_;
201 FlatbufferDetachedBuffer<Configuration> configuration_;
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800202
203 // Memory buffer to use for compressing data.
204 std::vector<uint8_t> compression_buffer_;
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700205};
206} // namespace aos
207#endif // AOS_UTIL_MCAP_LOGGER_H_