blob: 561a02bd18d409dc1eb72fd5bbb967175cdf5419 [file] [log] [blame]
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07001#include "aos/util/mcap_logger.h"
2
3#include "absl/strings/str_replace.h"
James Kuszmaulc31d7362022-05-27 14:20:04 -07004#include "aos/flatbuffer_merge.h"
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07005#include "single_include/nlohmann/json.hpp"
6
James Kuszmaulc31d7362022-05-27 14:20:04 -07007DEFINE_uint64(mcap_chunk_size, 10'000'000,
James Kuszmaul5c56ed32022-03-30 15:10:07 -07008 "Size, in bytes, of individual MCAP chunks");
James Kuszmaulc31d7362022-05-27 14:20:04 -07009DEFINE_bool(fetch, false,
10 "Whether to fetch most recent messages at start of logfile. Turn "
11 "this on if there are, e.g., one-time messages sent before the "
12 "start of the logfile that you need access to. Turn it off if you "
13 "don't want to deal with having messages that have timestamps that "
14 "may be arbitrarily far before any other interesting messages.");
James Kuszmaul5c56ed32022-03-30 15:10:07 -070015
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070016namespace aos {
James Kuszmaulb3fba252022-04-06 15:13:31 -070017
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070018nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
19 JsonSchemaRecursion recursion_level) {
20 nlohmann::json schema;
21 if (recursion_level == JsonSchemaRecursion::kTopLevel) {
22 schema["$schema"] = "https://json-schema.org/draft/2020-12/schema";
23 }
24 schema["type"] = "object";
25 nlohmann::json properties;
26 for (int index = 0; index < type.NumberFields(); ++index) {
27 nlohmann::json field;
28 const bool is_array = type.FieldIsRepeating(index);
29 if (type.FieldIsSequence(index)) {
30 // For sub-tables/structs, just recurse.
31 nlohmann::json subtype = JsonSchemaForFlatbuffer(
32 type.FieldType(index), JsonSchemaRecursion::kNested);
33 if (is_array) {
34 field["type"] = "array";
35 field["items"] = subtype;
36 } else {
37 field = subtype;
38 }
39 } else {
40 std::string elementary_type;
41 switch (type.FieldElementaryType(index)) {
42 case flatbuffers::ET_UTYPE:
43 case flatbuffers::ET_CHAR:
44 case flatbuffers::ET_UCHAR:
45 case flatbuffers::ET_SHORT:
46 case flatbuffers::ET_USHORT:
47 case flatbuffers::ET_INT:
48 case flatbuffers::ET_UINT:
49 case flatbuffers::ET_LONG:
50 case flatbuffers::ET_ULONG:
51 case flatbuffers::ET_FLOAT:
52 case flatbuffers::ET_DOUBLE:
53 elementary_type = "number";
54 break;
55 case flatbuffers::ET_BOOL:
56 elementary_type = "boolean";
57 break;
58 case flatbuffers::ET_STRING:
59 elementary_type = "string";
60 break;
61 case flatbuffers::ET_SEQUENCE:
62 if (type.FieldIsEnum(index)) {
63 elementary_type = "string";
64 } else {
65 LOG(FATAL) << "Should not encounter any sequence fields here.";
66 }
67 break;
68 }
69 if (is_array) {
70 field["type"] = "array";
71 field["items"]["type"] = elementary_type;
72 } else {
73 field["type"] = elementary_type;
74 }
75 }
76 // the nlohmann::json [] operator needs an actual string, not just a
77 // string_view :(.
78 properties[std::string(type.FieldName(index))] = field;
79 }
80 schema["properties"] = properties;
81 return schema;
82}
83
James Kuszmaulc31d7362022-05-27 14:20:04 -070084McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path,
85 Serialization serialization)
86 : event_loop_(event_loop),
87 output_(output_path),
88 serialization_(serialization) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070089 event_loop->SkipTimingReport();
90 event_loop->SkipAosLog();
91 CHECK(output_);
92 WriteMagic();
93 WriteHeader();
James Kuszmaulb3fba252022-04-06 15:13:31 -070094 // Schemas and channels get written out both at the start and end of the file,
95 // per the MCAP spec.
96 WriteSchemasAndChannels(RegisterHandlers::kYes);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070097}
98
99McapLogger::~McapLogger() {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700100 // If we have any data remaining, write one last chunk.
101 if (current_chunk_.tellp() > 0) {
102 WriteChunk();
103 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700104 WriteDataEnd();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700105
106 // Now we enter the Summary section, where we write out all the channel/index
107 // information that readers need to be able to seek to arbitrary locations
108 // within the log.
109 const uint64_t summary_offset = output_.tellp();
110 const SummaryOffset chunk_indices_offset = WriteChunkIndices();
111 const SummaryOffset stats_offset = WriteStatistics();
112 // Schemas/Channels need to get reproduced in the summary section for random
113 // access reading.
114 const std::vector<SummaryOffset> offsets =
115 WriteSchemasAndChannels(RegisterHandlers::kNo);
116
117 // Next we have the summary offset section, which references the individual
118 // pieces of the summary section.
119 const uint64_t summary_offset_offset = output_.tellp();
120
121 // SummarytOffset's must all be the final thing before the footer.
122 WriteSummaryOffset(chunk_indices_offset);
123 WriteSummaryOffset(stats_offset);
124 for (const auto &offset : offsets) {
125 WriteSummaryOffset(offset);
126 }
127
128 // And finally, the footer which must itself reference the start of the
129 // summary and summary offset sections.
130 WriteFooter(summary_offset, summary_offset_offset);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700131 WriteMagic();
James Kuszmaulc31d7362022-05-27 14:20:04 -0700132
133 // TODO(james): Add compression. With flatbuffers messages that contain large
134 // numbers of zeros (e.g., large grids or thresholded images) this can result
135 // in massive savings.
136 if (VLOG_IS_ON(2)) {
137 // For debugging, print out how much space each channel is taking in the
138 // overall log.
139 LOG(INFO) << total_message_bytes_;
140 std::vector<std::pair<size_t, const Channel *>> channel_bytes;
141 for (const auto &pair : total_channel_bytes_) {
142 channel_bytes.push_back(std::make_pair(pair.second, pair.first));
143 }
144 std::sort(channel_bytes.begin(), channel_bytes.end());
145 for (const auto &pair : channel_bytes) {
146 LOG(INFO) << configuration::StrippedChannelToString(pair.second) << ": "
147 << static_cast<float>(pair.first) * 1e-6 << "MB "
148 << static_cast<float>(pair.first) / total_message_bytes_
149 << "\n";
150 }
151 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700152}
153
James Kuszmaulb3fba252022-04-06 15:13:31 -0700154std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
155 RegisterHandlers register_handlers) {
James Kuszmaulc31d7362022-05-27 14:20:04 -0700156 uint16_t id = 0;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700157 std::map<uint16_t, const Channel *> channels;
158 for (const Channel *channel : *event_loop_->configuration()->channels()) {
James Kuszmaulc31d7362022-05-27 14:20:04 -0700159 ++id;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700160 if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
161 continue;
162 }
163 channels[id] = channel;
164
165 if (register_handlers == RegisterHandlers::kYes) {
166 message_counts_[id] = 0;
167 event_loop_->MakeRawWatcher(
168 channel, [this, id, channel](const Context &context, const void *) {
169 WriteMessage(id, channel, context, &current_chunk_);
James Kuszmaul5c56ed32022-03-30 15:10:07 -0700170 if (static_cast<uint64_t>(current_chunk_.tellp()) >
171 FLAGS_mcap_chunk_size) {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700172 WriteChunk();
173 }
174 });
James Kuszmaulc31d7362022-05-27 14:20:04 -0700175 fetchers_[id] = event_loop_->MakeRawFetcher(channel);
176 event_loop_->OnRun([this, id, channel]() {
177 if (FLAGS_fetch && fetchers_[id]->Fetch()) {
178 WriteMessage(id, channel, fetchers_[id]->context(), &current_chunk_);
179 }
180 });
James Kuszmaulb3fba252022-04-06 15:13:31 -0700181 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700182 }
183
184 std::vector<SummaryOffset> offsets;
185
186 const uint64_t schema_offset = output_.tellp();
187
188 for (const auto &pair : channels) {
189 WriteSchema(pair.first, pair.second);
190 }
191
192 const uint64_t channel_offset = output_.tellp();
193
194 offsets.push_back(
195 {OpCode::kSchema, schema_offset, channel_offset - schema_offset});
196
197 for (const auto &pair : channels) {
198 // Write out the channel entry that uses the schema (we just re-use
199 // the schema ID for the channel ID, since we aren't deduplicating
200 // schemas for channels that are of the same type).
201 WriteChannel(pair.first, pair.first, pair.second);
202 }
203
204 offsets.push_back({OpCode::kChannel, channel_offset,
205 static_cast<uint64_t>(output_.tellp()) - channel_offset});
206 return offsets;
207}
208
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700209void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
210
211void McapLogger::WriteHeader() {
212 string_builder_.Reset();
213 // "profile"
214 AppendString(&string_builder_, "x-aos");
215 // "library"
216 AppendString(&string_builder_, "AOS MCAP converter");
217 WriteRecord(OpCode::kHeader, string_builder_.Result());
218}
219
James Kuszmaulb3fba252022-04-06 15:13:31 -0700220void McapLogger::WriteFooter(uint64_t summary_offset,
221 uint64_t summary_offset_offset) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700222 string_builder_.Reset();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700223 AppendInt64(&string_builder_, summary_offset);
224 AppendInt64(&string_builder_, summary_offset_offset);
225 // CRC32 for the Summary section, which we don't bother populating.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700226 AppendInt32(&string_builder_, 0);
227 WriteRecord(OpCode::kFooter, string_builder_.Result());
228}
229
230void McapLogger::WriteDataEnd() {
231 string_builder_.Reset();
232 // CRC32 for the data, which we are too lazy to calculate.
233 AppendInt32(&string_builder_, 0);
234 WriteRecord(OpCode::kDataEnd, string_builder_.Result());
235}
236
237void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
238 CHECK(channel->has_schema());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700239
240 const FlatbufferDetachedBuffer<reflection::Schema> schema =
241 CopyFlatBuffer(channel->schema());
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700242
243 // Write out the schema (we don't bother deduplicating schema types):
244 string_builder_.Reset();
245 // Schema ID
246 AppendInt16(&string_builder_, id);
247 // Type name
248 AppendString(&string_builder_, channel->type()->string_view());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700249 switch (serialization_) {
250 case Serialization::kJson:
251 // Encoding
252 AppendString(&string_builder_, "jsonschema");
253 // Actual schema itself
254 AppendString(&string_builder_,
255 JsonSchemaForFlatbuffer({channel->schema()}).dump());
256 break;
257 case Serialization::kFlatbuffer:
258 // Encoding
259 AppendString(&string_builder_, "flatbuffer");
260 // Actual schema itself
261 AppendString(&string_builder_,
262 {reinterpret_cast<const char *>(schema.span().data()),
263 schema.span().size()});
264 break;
265 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700266 WriteRecord(OpCode::kSchema, string_builder_.Result());
267}
268
269void McapLogger::WriteChannel(const uint16_t id, const uint16_t schema_id,
270 const aos::Channel *channel) {
271 string_builder_.Reset();
272 // Channel ID
273 AppendInt16(&string_builder_, id);
274 // Schema ID
275 AppendInt16(&string_builder_, schema_id);
276 // Topic name
277 AppendString(&string_builder_,
278 absl::StrCat(channel->name()->string_view(), " ",
279 channel->type()->string_view()));
280 // Encoding
James Kuszmaulc31d7362022-05-27 14:20:04 -0700281 switch (serialization_) {
282 case Serialization::kJson:
283 AppendString(&string_builder_, "json");
284 break;
285 case Serialization::kFlatbuffer:
286 AppendString(&string_builder_, "flatbuffer");
287 break;
288 }
289
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700290 // Metadata (technically supposed to be a Map<string, string>)
291 AppendString(&string_builder_, "");
292 WriteRecord(OpCode::kChannel, string_builder_.Result());
293}
294
295void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
James Kuszmaulb3fba252022-04-06 15:13:31 -0700296 const Context &context, std::ostream *output) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700297 CHECK_NOTNULL(context.data);
298
James Kuszmaulb3fba252022-04-06 15:13:31 -0700299 message_counts_[channel_id]++;
300
301 if (!earliest_message_.has_value()) {
302 earliest_message_ = context.monotonic_event_time;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700303 } else {
304 earliest_message_ =
305 std::min(context.monotonic_event_time, earliest_message_.value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700306 }
307 if (!earliest_chunk_message_.has_value()) {
308 earliest_chunk_message_ = context.monotonic_event_time;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700309 } else {
310 earliest_chunk_message_ =
311 std::min(context.monotonic_event_time, earliest_chunk_message_.value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700312 }
313 latest_message_ = context.monotonic_event_time;
314
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700315 string_builder_.Reset();
316 // Channel ID
317 AppendInt16(&string_builder_, channel_id);
318 // Queue Index
319 AppendInt32(&string_builder_, context.queue_index);
320 // Log time, and publish time. Since we don't log a logged time, just use
321 // published time.
322 // TODO(james): If we use this for multi-node logfiles, use distributed clock.
323 AppendInt64(&string_builder_,
324 context.monotonic_event_time.time_since_epoch().count());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700325 // Note: Foxglove Studio doesn't appear to actually support using publish time
326 // right now.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700327 AppendInt64(&string_builder_,
328 context.monotonic_event_time.time_since_epoch().count());
329
330 CHECK(flatbuffers::Verify(*channel->schema(),
331 *channel->schema()->root_table(),
332 static_cast<const uint8_t *>(context.data),
333 static_cast<size_t>(context.size)))
334 << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
335 << channel->type()->c_str();
336
James Kuszmaulc31d7362022-05-27 14:20:04 -0700337 switch (serialization_) {
338 case Serialization::kJson:
339 aos::FlatbufferToJson(&string_builder_, channel->schema(),
340 static_cast<const uint8_t *>(context.data));
341 break;
342 case Serialization::kFlatbuffer:
343 string_builder_.Append(
344 {static_cast<const char *>(context.data), context.size});
345 break;
346 }
347 total_message_bytes_ += context.size;
348 total_channel_bytes_[channel] += context.size;
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700349
James Kuszmaulb3fba252022-04-06 15:13:31 -0700350 message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
351 context.monotonic_event_time.time_since_epoch().count(),
352 output->tellp()));
353
354 WriteRecord(OpCode::kMessage, string_builder_.Result(), output);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700355}
356
James Kuszmaulb3fba252022-04-06 15:13:31 -0700357void McapLogger::WriteRecord(OpCode op, std::string_view record,
358 std::ostream *ostream) {
359 ostream->put(static_cast<char>(op));
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700360 uint64_t record_length = record.size();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700361 ostream->write(reinterpret_cast<const char *>(&record_length),
362 sizeof(record_length));
363 *ostream << record;
364}
365
366void McapLogger::WriteChunk() {
367 string_builder_.Reset();
368
369 CHECK(earliest_chunk_message_.has_value());
370 const uint64_t chunk_offset = output_.tellp();
371 AppendInt64(&string_builder_,
372 earliest_chunk_message_->time_since_epoch().count());
373 AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
374
375 std::string chunk_records = current_chunk_.str();
376 // Reset the chunk buffer.
377 current_chunk_.str("");
378
379 const uint64_t records_size = chunk_records.size();
380 // Uncompressed chunk size.
381 AppendInt64(&string_builder_, records_size);
382 // Uncompressed CRC (unpopulated).
383 AppendInt32(&string_builder_, 0);
384 AppendString(&string_builder_, "");
385 AppendBytes(&string_builder_, chunk_records);
386 WriteRecord(OpCode::kChunk, string_builder_.Result());
387
388 std::map<uint16_t, uint64_t> index_offsets;
389 const uint64_t message_index_start = output_.tellp();
390 for (const auto &indices : message_indices_) {
391 index_offsets[indices.first] = output_.tellp();
392 string_builder_.Reset();
393 AppendInt16(&string_builder_, indices.first);
394 AppendMessageIndices(&string_builder_, indices.second);
395 WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
396 }
397 message_indices_.clear();
398 chunk_indices_.push_back(ChunkIndex{
399 earliest_chunk_message_.value(), latest_message_, chunk_offset,
400 message_index_start - chunk_offset, records_size, index_offsets,
401 static_cast<uint64_t>(output_.tellp()) - message_index_start});
402 earliest_chunk_message_.reset();
403}
404
405McapLogger::SummaryOffset McapLogger::WriteStatistics() {
406 const uint64_t stats_offset = output_.tellp();
407 const uint64_t message_count = std::accumulate(
408 message_counts_.begin(), message_counts_.end(), 0,
409 [](const uint64_t &count, const std::pair<uint16_t, uint64_t> &val) {
410 return count + val.second;
411 });
412 string_builder_.Reset();
413 AppendInt64(&string_builder_, message_count);
414 // Schema count.
415 AppendInt16(&string_builder_, message_counts_.size());
416 // Channel count.
417 AppendInt32(&string_builder_, message_counts_.size());
418 // Attachment count.
419 AppendInt32(&string_builder_, 0);
420 // Metadata count.
421 AppendInt32(&string_builder_, 0);
422 // Chunk count.
423 AppendInt32(&string_builder_, chunk_indices_.size());
424 // Earliest & latest message times.
425 AppendInt64(&string_builder_, earliest_message_->time_since_epoch().count());
426 AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
427 // Per-channel message counts.
428 AppendChannelMap(&string_builder_, message_counts_);
429 WriteRecord(OpCode::kStatistics, string_builder_.Result());
430 return {OpCode::kStatistics, stats_offset,
431 static_cast<uint64_t>(output_.tellp()) - stats_offset};
432}
433
434McapLogger::SummaryOffset McapLogger::WriteChunkIndices() {
435 const uint64_t index_offset = output_.tellp();
436 for (const ChunkIndex &index : chunk_indices_) {
437 string_builder_.Reset();
438 AppendInt64(&string_builder_, index.start_time.time_since_epoch().count());
439 AppendInt64(&string_builder_, index.end_time.time_since_epoch().count());
440 AppendInt64(&string_builder_, index.offset);
441 AppendInt64(&string_builder_, index.chunk_size);
442 AppendChannelMap(&string_builder_, index.message_index_offsets);
443 AppendInt64(&string_builder_, index.message_index_size);
444 // Compression used.
445 AppendString(&string_builder_, "");
446 // Compressed and uncompressed records size.
447 AppendInt64(&string_builder_, index.records_size);
448 AppendInt64(&string_builder_, index.records_size);
449 WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
450 }
451 return {OpCode::kChunkIndex, index_offset,
452 static_cast<uint64_t>(output_.tellp()) - index_offset};
453}
454
455void McapLogger::WriteSummaryOffset(const SummaryOffset &offset) {
456 string_builder_.Reset();
457 string_builder_.AppendChar(static_cast<char>(offset.op_code));
458 AppendInt64(&string_builder_, offset.offset);
459 AppendInt64(&string_builder_, offset.size);
460 WriteRecord(OpCode::kSummaryOffset, string_builder_.Result());
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700461}
462
463void McapLogger::AppendString(FastStringBuilder *builder,
464 std::string_view string) {
465 AppendInt32(builder, string.size());
466 builder->Append(string);
467}
468
James Kuszmaulb3fba252022-04-06 15:13:31 -0700469void McapLogger::AppendBytes(FastStringBuilder *builder,
470 std::string_view bytes) {
471 AppendInt64(builder, bytes.size());
472 builder->Append(bytes);
473}
474
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700475namespace {
476template <typename T>
477static void AppendInt(FastStringBuilder *builder, T val) {
478 builder->Append(
479 std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
480}
James Kuszmaulb3fba252022-04-06 15:13:31 -0700481template <typename T>
482void AppendMap(FastStringBuilder *builder, const T &map) {
483 AppendInt<uint32_t>(
484 builder, map.size() * (sizeof(typename T::value_type::first_type) +
485 sizeof(typename T::value_type::second_type)));
486 for (const auto &pair : map) {
487 AppendInt(builder, pair.first);
488 AppendInt(builder, pair.second);
489 }
490}
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700491} // namespace
492
James Kuszmaulb3fba252022-04-06 15:13:31 -0700493void McapLogger::AppendChannelMap(FastStringBuilder *builder,
494 const std::map<uint16_t, uint64_t> &map) {
495 AppendMap(builder, map);
496}
497
498void McapLogger::AppendMessageIndices(
499 FastStringBuilder *builder,
500 const std::vector<std::pair<uint64_t, uint64_t>> &messages) {
501 AppendMap(builder, messages);
502}
503
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700504void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
505 AppendInt(builder, val);
506}
507
508void McapLogger::AppendInt32(FastStringBuilder *builder, uint32_t val) {
509 AppendInt(builder, val);
510}
511
512void McapLogger::AppendInt64(FastStringBuilder *builder, uint64_t val) {
513 AppendInt(builder, val);
514}
515} // namespace aos