blob: 1cb9c327da2032ef91e333b63bde52fbc41fe851 [file] [log] [blame]
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07001#include "aos/util/mcap_logger.h"
2
3#include "absl/strings/str_replace.h"
4#include "single_include/nlohmann/json.hpp"
5
6namespace aos {
James Kuszmaulb3fba252022-04-06 15:13:31 -07007namespace {
8// Amount of data to allow in each chunk before creating a new chunk.
9constexpr size_t kChunkSize = 10000000;
10}
11
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070012nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
13 JsonSchemaRecursion recursion_level) {
14 nlohmann::json schema;
15 if (recursion_level == JsonSchemaRecursion::kTopLevel) {
16 schema["$schema"] = "https://json-schema.org/draft/2020-12/schema";
17 }
18 schema["type"] = "object";
19 nlohmann::json properties;
20 for (int index = 0; index < type.NumberFields(); ++index) {
21 nlohmann::json field;
22 const bool is_array = type.FieldIsRepeating(index);
23 if (type.FieldIsSequence(index)) {
24 // For sub-tables/structs, just recurse.
25 nlohmann::json subtype = JsonSchemaForFlatbuffer(
26 type.FieldType(index), JsonSchemaRecursion::kNested);
27 if (is_array) {
28 field["type"] = "array";
29 field["items"] = subtype;
30 } else {
31 field = subtype;
32 }
33 } else {
34 std::string elementary_type;
35 switch (type.FieldElementaryType(index)) {
36 case flatbuffers::ET_UTYPE:
37 case flatbuffers::ET_CHAR:
38 case flatbuffers::ET_UCHAR:
39 case flatbuffers::ET_SHORT:
40 case flatbuffers::ET_USHORT:
41 case flatbuffers::ET_INT:
42 case flatbuffers::ET_UINT:
43 case flatbuffers::ET_LONG:
44 case flatbuffers::ET_ULONG:
45 case flatbuffers::ET_FLOAT:
46 case flatbuffers::ET_DOUBLE:
47 elementary_type = "number";
48 break;
49 case flatbuffers::ET_BOOL:
50 elementary_type = "boolean";
51 break;
52 case flatbuffers::ET_STRING:
53 elementary_type = "string";
54 break;
55 case flatbuffers::ET_SEQUENCE:
56 if (type.FieldIsEnum(index)) {
57 elementary_type = "string";
58 } else {
59 LOG(FATAL) << "Should not encounter any sequence fields here.";
60 }
61 break;
62 }
63 if (is_array) {
64 field["type"] = "array";
65 field["items"]["type"] = elementary_type;
66 } else {
67 field["type"] = elementary_type;
68 }
69 }
70 // the nlohmann::json [] operator needs an actual string, not just a
71 // string_view :(.
72 properties[std::string(type.FieldName(index))] = field;
73 }
74 schema["properties"] = properties;
75 return schema;
76}
77
78McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path)
James Kuszmaulb3fba252022-04-06 15:13:31 -070079 : event_loop_(event_loop), output_(output_path) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070080 event_loop->SkipTimingReport();
81 event_loop->SkipAosLog();
82 CHECK(output_);
83 WriteMagic();
84 WriteHeader();
James Kuszmaulb3fba252022-04-06 15:13:31 -070085 // Schemas and channels get written out both at the start and end of the file,
86 // per the MCAP spec.
87 WriteSchemasAndChannels(RegisterHandlers::kYes);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070088}
89
90McapLogger::~McapLogger() {
James Kuszmaulb3fba252022-04-06 15:13:31 -070091 // If we have any data remaining, write one last chunk.
92 if (current_chunk_.tellp() > 0) {
93 WriteChunk();
94 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070095 WriteDataEnd();
James Kuszmaulb3fba252022-04-06 15:13:31 -070096
97 // Now we enter the Summary section, where we write out all the channel/index
98 // information that readers need to be able to seek to arbitrary locations
99 // within the log.
100 const uint64_t summary_offset = output_.tellp();
101 const SummaryOffset chunk_indices_offset = WriteChunkIndices();
102 const SummaryOffset stats_offset = WriteStatistics();
103 // Schemas/Channels need to get reproduced in the summary section for random
104 // access reading.
105 const std::vector<SummaryOffset> offsets =
106 WriteSchemasAndChannels(RegisterHandlers::kNo);
107
108 // Next we have the summary offset section, which references the individual
109 // pieces of the summary section.
110 const uint64_t summary_offset_offset = output_.tellp();
111
112 // SummarytOffset's must all be the final thing before the footer.
113 WriteSummaryOffset(chunk_indices_offset);
114 WriteSummaryOffset(stats_offset);
115 for (const auto &offset : offsets) {
116 WriteSummaryOffset(offset);
117 }
118
119 // And finally, the footer which must itself reference the start of the
120 // summary and summary offset sections.
121 WriteFooter(summary_offset, summary_offset_offset);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700122 WriteMagic();
123}
124
James Kuszmaulb3fba252022-04-06 15:13:31 -0700125std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
126 RegisterHandlers register_handlers) {
127 uint16_t id = 1;
128 std::map<uint16_t, const Channel *> channels;
129 for (const Channel *channel : *event_loop_->configuration()->channels()) {
130 if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
131 continue;
132 }
133 channels[id] = channel;
134
135 if (register_handlers == RegisterHandlers::kYes) {
136 message_counts_[id] = 0;
137 event_loop_->MakeRawWatcher(
138 channel, [this, id, channel](const Context &context, const void *) {
139 WriteMessage(id, channel, context, &current_chunk_);
140 if (static_cast<uint64_t>(current_chunk_.tellp()) > kChunkSize) {
141 WriteChunk();
142 }
143 });
144 }
145 ++id;
146 }
147
148 std::vector<SummaryOffset> offsets;
149
150 const uint64_t schema_offset = output_.tellp();
151
152 for (const auto &pair : channels) {
153 WriteSchema(pair.first, pair.second);
154 }
155
156 const uint64_t channel_offset = output_.tellp();
157
158 offsets.push_back(
159 {OpCode::kSchema, schema_offset, channel_offset - schema_offset});
160
161 for (const auto &pair : channels) {
162 // Write out the channel entry that uses the schema (we just re-use
163 // the schema ID for the channel ID, since we aren't deduplicating
164 // schemas for channels that are of the same type).
165 WriteChannel(pair.first, pair.first, pair.second);
166 }
167
168 offsets.push_back({OpCode::kChannel, channel_offset,
169 static_cast<uint64_t>(output_.tellp()) - channel_offset});
170 return offsets;
171}
172
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700173void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
174
175void McapLogger::WriteHeader() {
176 string_builder_.Reset();
177 // "profile"
178 AppendString(&string_builder_, "x-aos");
179 // "library"
180 AppendString(&string_builder_, "AOS MCAP converter");
181 WriteRecord(OpCode::kHeader, string_builder_.Result());
182}
183
James Kuszmaulb3fba252022-04-06 15:13:31 -0700184void McapLogger::WriteFooter(uint64_t summary_offset,
185 uint64_t summary_offset_offset) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700186 string_builder_.Reset();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700187 AppendInt64(&string_builder_, summary_offset);
188 AppendInt64(&string_builder_, summary_offset_offset);
189 // CRC32 for the Summary section, which we don't bother populating.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700190 AppendInt32(&string_builder_, 0);
191 WriteRecord(OpCode::kFooter, string_builder_.Result());
192}
193
194void McapLogger::WriteDataEnd() {
195 string_builder_.Reset();
196 // CRC32 for the data, which we are too lazy to calculate.
197 AppendInt32(&string_builder_, 0);
198 WriteRecord(OpCode::kDataEnd, string_builder_.Result());
199}
200
201void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
202 CHECK(channel->has_schema());
203 std::string schema = JsonSchemaForFlatbuffer({channel->schema()}).dump();
204
205 // Write out the schema (we don't bother deduplicating schema types):
206 string_builder_.Reset();
207 // Schema ID
208 AppendInt16(&string_builder_, id);
209 // Type name
210 AppendString(&string_builder_, channel->type()->string_view());
211 // Encoding
212 AppendString(&string_builder_, "jsonschema");
213 // Actual schema itself
214 AppendString(&string_builder_, schema);
215 WriteRecord(OpCode::kSchema, string_builder_.Result());
216}
217
218void McapLogger::WriteChannel(const uint16_t id, const uint16_t schema_id,
219 const aos::Channel *channel) {
220 string_builder_.Reset();
221 // Channel ID
222 AppendInt16(&string_builder_, id);
223 // Schema ID
224 AppendInt16(&string_builder_, schema_id);
225 // Topic name
226 AppendString(&string_builder_,
227 absl::StrCat(channel->name()->string_view(), " ",
228 channel->type()->string_view()));
229 // Encoding
230 AppendString(&string_builder_, "json");
231 // Metadata (technically supposed to be a Map<string, string>)
232 AppendString(&string_builder_, "");
233 WriteRecord(OpCode::kChannel, string_builder_.Result());
234}
235
236void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
James Kuszmaulb3fba252022-04-06 15:13:31 -0700237 const Context &context, std::ostream *output) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700238 CHECK_NOTNULL(context.data);
239
James Kuszmaulb3fba252022-04-06 15:13:31 -0700240 message_counts_[channel_id]++;
241
242 if (!earliest_message_.has_value()) {
243 earliest_message_ = context.monotonic_event_time;
244 }
245 if (!earliest_chunk_message_.has_value()) {
246 earliest_chunk_message_ = context.monotonic_event_time;
247 }
248 latest_message_ = context.monotonic_event_time;
249
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700250 string_builder_.Reset();
251 // Channel ID
252 AppendInt16(&string_builder_, channel_id);
253 // Queue Index
254 AppendInt32(&string_builder_, context.queue_index);
255 // Log time, and publish time. Since we don't log a logged time, just use
256 // published time.
257 // TODO(james): If we use this for multi-node logfiles, use distributed clock.
258 AppendInt64(&string_builder_,
259 context.monotonic_event_time.time_since_epoch().count());
260 AppendInt64(&string_builder_,
261 context.monotonic_event_time.time_since_epoch().count());
262
263 CHECK(flatbuffers::Verify(*channel->schema(),
264 *channel->schema()->root_table(),
265 static_cast<const uint8_t *>(context.data),
266 static_cast<size_t>(context.size)))
267 << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
268 << channel->type()->c_str();
269
270 aos::FlatbufferToJson(&string_builder_, channel->schema(),
271 static_cast<const uint8_t *>(context.data));
272
James Kuszmaulb3fba252022-04-06 15:13:31 -0700273 message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
274 context.monotonic_event_time.time_since_epoch().count(),
275 output->tellp()));
276
277 WriteRecord(OpCode::kMessage, string_builder_.Result(), output);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700278}
279
James Kuszmaulb3fba252022-04-06 15:13:31 -0700280void McapLogger::WriteRecord(OpCode op, std::string_view record,
281 std::ostream *ostream) {
282 ostream->put(static_cast<char>(op));
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700283 uint64_t record_length = record.size();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700284 ostream->write(reinterpret_cast<const char *>(&record_length),
285 sizeof(record_length));
286 *ostream << record;
287}
288
289void McapLogger::WriteChunk() {
290 string_builder_.Reset();
291
292 CHECK(earliest_chunk_message_.has_value());
293 const uint64_t chunk_offset = output_.tellp();
294 AppendInt64(&string_builder_,
295 earliest_chunk_message_->time_since_epoch().count());
296 AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
297
298 std::string chunk_records = current_chunk_.str();
299 // Reset the chunk buffer.
300 current_chunk_.str("");
301
302 const uint64_t records_size = chunk_records.size();
303 // Uncompressed chunk size.
304 AppendInt64(&string_builder_, records_size);
305 // Uncompressed CRC (unpopulated).
306 AppendInt32(&string_builder_, 0);
307 AppendString(&string_builder_, "");
308 AppendBytes(&string_builder_, chunk_records);
309 WriteRecord(OpCode::kChunk, string_builder_.Result());
310
311 std::map<uint16_t, uint64_t> index_offsets;
312 const uint64_t message_index_start = output_.tellp();
313 for (const auto &indices : message_indices_) {
314 index_offsets[indices.first] = output_.tellp();
315 string_builder_.Reset();
316 AppendInt16(&string_builder_, indices.first);
317 AppendMessageIndices(&string_builder_, indices.second);
318 WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
319 }
320 message_indices_.clear();
321 chunk_indices_.push_back(ChunkIndex{
322 earliest_chunk_message_.value(), latest_message_, chunk_offset,
323 message_index_start - chunk_offset, records_size, index_offsets,
324 static_cast<uint64_t>(output_.tellp()) - message_index_start});
325 earliest_chunk_message_.reset();
326}
327
328McapLogger::SummaryOffset McapLogger::WriteStatistics() {
329 const uint64_t stats_offset = output_.tellp();
330 const uint64_t message_count = std::accumulate(
331 message_counts_.begin(), message_counts_.end(), 0,
332 [](const uint64_t &count, const std::pair<uint16_t, uint64_t> &val) {
333 return count + val.second;
334 });
335 string_builder_.Reset();
336 AppendInt64(&string_builder_, message_count);
337 // Schema count.
338 AppendInt16(&string_builder_, message_counts_.size());
339 // Channel count.
340 AppendInt32(&string_builder_, message_counts_.size());
341 // Attachment count.
342 AppendInt32(&string_builder_, 0);
343 // Metadata count.
344 AppendInt32(&string_builder_, 0);
345 // Chunk count.
346 AppendInt32(&string_builder_, chunk_indices_.size());
347 // Earliest & latest message times.
348 AppendInt64(&string_builder_, earliest_message_->time_since_epoch().count());
349 AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
350 // Per-channel message counts.
351 AppendChannelMap(&string_builder_, message_counts_);
352 WriteRecord(OpCode::kStatistics, string_builder_.Result());
353 return {OpCode::kStatistics, stats_offset,
354 static_cast<uint64_t>(output_.tellp()) - stats_offset};
355}
356
357McapLogger::SummaryOffset McapLogger::WriteChunkIndices() {
358 const uint64_t index_offset = output_.tellp();
359 for (const ChunkIndex &index : chunk_indices_) {
360 string_builder_.Reset();
361 AppendInt64(&string_builder_, index.start_time.time_since_epoch().count());
362 AppendInt64(&string_builder_, index.end_time.time_since_epoch().count());
363 AppendInt64(&string_builder_, index.offset);
364 AppendInt64(&string_builder_, index.chunk_size);
365 AppendChannelMap(&string_builder_, index.message_index_offsets);
366 AppendInt64(&string_builder_, index.message_index_size);
367 // Compression used.
368 AppendString(&string_builder_, "");
369 // Compressed and uncompressed records size.
370 AppendInt64(&string_builder_, index.records_size);
371 AppendInt64(&string_builder_, index.records_size);
372 WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
373 }
374 return {OpCode::kChunkIndex, index_offset,
375 static_cast<uint64_t>(output_.tellp()) - index_offset};
376}
377
378void McapLogger::WriteSummaryOffset(const SummaryOffset &offset) {
379 string_builder_.Reset();
380 string_builder_.AppendChar(static_cast<char>(offset.op_code));
381 AppendInt64(&string_builder_, offset.offset);
382 AppendInt64(&string_builder_, offset.size);
383 WriteRecord(OpCode::kSummaryOffset, string_builder_.Result());
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700384}
385
386void McapLogger::AppendString(FastStringBuilder *builder,
387 std::string_view string) {
388 AppendInt32(builder, string.size());
389 builder->Append(string);
390}
391
James Kuszmaulb3fba252022-04-06 15:13:31 -0700392void McapLogger::AppendBytes(FastStringBuilder *builder,
393 std::string_view bytes) {
394 AppendInt64(builder, bytes.size());
395 builder->Append(bytes);
396}
397
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700398namespace {
399template <typename T>
400static void AppendInt(FastStringBuilder *builder, T val) {
401 builder->Append(
402 std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
403}
James Kuszmaulb3fba252022-04-06 15:13:31 -0700404template <typename T>
405void AppendMap(FastStringBuilder *builder, const T &map) {
406 AppendInt<uint32_t>(
407 builder, map.size() * (sizeof(typename T::value_type::first_type) +
408 sizeof(typename T::value_type::second_type)));
409 for (const auto &pair : map) {
410 AppendInt(builder, pair.first);
411 AppendInt(builder, pair.second);
412 }
413}
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700414} // namespace
415
James Kuszmaulb3fba252022-04-06 15:13:31 -0700416void McapLogger::AppendChannelMap(FastStringBuilder *builder,
417 const std::map<uint16_t, uint64_t> &map) {
418 AppendMap(builder, map);
419}
420
421void McapLogger::AppendMessageIndices(
422 FastStringBuilder *builder,
423 const std::vector<std::pair<uint64_t, uint64_t>> &messages) {
424 AppendMap(builder, messages);
425}
426
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700427void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
428 AppendInt(builder, val);
429}
430
431void McapLogger::AppendInt32(FastStringBuilder *builder, uint32_t val) {
432 AppendInt(builder, val);
433}
434
435void McapLogger::AppendInt64(FastStringBuilder *builder, uint64_t val) {
436 AppendInt(builder, val);
437}
438} // namespace aos