blob: cdac9283b65df17def2c4f346c99a88aa200413a [file] [log] [blame]
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07001#include "aos/util/mcap_logger.h"
2
3#include "absl/strings/str_replace.h"
James Kuszmaul5ab990d2022-11-07 16:35:49 -08004#include "lz4/lz4.h"
5#include "lz4/lz4frame.h"
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07006#include "single_include/nlohmann/json.hpp"
7
Philipp Schrader790cb542023-07-05 21:06:52 -07008#include "aos/configuration_schema.h"
9#include "aos/flatbuffer_merge.h"
10
James Kuszmaulc31d7362022-05-27 14:20:04 -070011DEFINE_uint64(mcap_chunk_size, 10'000'000,
James Kuszmaul5c56ed32022-03-30 15:10:07 -070012 "Size, in bytes, of individual MCAP chunks");
James Kuszmaulc31d7362022-05-27 14:20:04 -070013DEFINE_bool(fetch, false,
14 "Whether to fetch most recent messages at start of logfile. Turn "
15 "this on if there are, e.g., one-time messages sent before the "
16 "start of the logfile that you need access to. Turn it off if you "
17 "don't want to deal with having messages that have timestamps that "
18 "may be arbitrarily far before any other interesting messages.");
James Kuszmaul5c56ed32022-03-30 15:10:07 -070019
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070020namespace aos {
James Kuszmaulb3fba252022-04-06 15:13:31 -070021
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070022nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
23 JsonSchemaRecursion recursion_level) {
24 nlohmann::json schema;
25 if (recursion_level == JsonSchemaRecursion::kTopLevel) {
26 schema["$schema"] = "https://json-schema.org/draft/2020-12/schema";
27 }
28 schema["type"] = "object";
29 nlohmann::json properties;
30 for (int index = 0; index < type.NumberFields(); ++index) {
31 nlohmann::json field;
32 const bool is_array = type.FieldIsRepeating(index);
33 if (type.FieldIsSequence(index)) {
34 // For sub-tables/structs, just recurse.
35 nlohmann::json subtype = JsonSchemaForFlatbuffer(
36 type.FieldType(index), JsonSchemaRecursion::kNested);
37 if (is_array) {
38 field["type"] = "array";
39 field["items"] = subtype;
40 } else {
41 field = subtype;
42 }
43 } else {
44 std::string elementary_type;
45 switch (type.FieldElementaryType(index)) {
46 case flatbuffers::ET_UTYPE:
47 case flatbuffers::ET_CHAR:
48 case flatbuffers::ET_UCHAR:
49 case flatbuffers::ET_SHORT:
50 case flatbuffers::ET_USHORT:
51 case flatbuffers::ET_INT:
52 case flatbuffers::ET_UINT:
53 case flatbuffers::ET_LONG:
54 case flatbuffers::ET_ULONG:
55 case flatbuffers::ET_FLOAT:
56 case flatbuffers::ET_DOUBLE:
57 elementary_type = "number";
58 break;
59 case flatbuffers::ET_BOOL:
60 elementary_type = "boolean";
61 break;
62 case flatbuffers::ET_STRING:
63 elementary_type = "string";
64 break;
65 case flatbuffers::ET_SEQUENCE:
66 if (type.FieldIsEnum(index)) {
67 elementary_type = "string";
68 } else {
69 LOG(FATAL) << "Should not encounter any sequence fields here.";
70 }
71 break;
72 }
73 if (is_array) {
74 field["type"] = "array";
75 field["items"]["type"] = elementary_type;
76 } else {
77 field["type"] = elementary_type;
78 }
79 }
80 // the nlohmann::json [] operator needs an actual string, not just a
81 // string_view :(.
82 properties[std::string(type.FieldName(index))] = field;
83 }
84 schema["properties"] = properties;
85 return schema;
86}
87
James Kuszmaul1e418f62023-02-26 14:40:20 -080088std::string ShortenedChannelName(const aos::Configuration *config,
89 const aos::Channel *channel,
90 std::string_view application_name,
91 const aos::Node *node) {
92 std::set<std::string> names =
93 configuration::GetChannelAliases(config, channel, application_name, node);
94 std::string_view shortest_name;
95 for (const std::string &name : names) {
96 if (shortest_name.empty() || name.size() < shortest_name.size()) {
97 shortest_name = name;
98 }
99 }
100 return std::string(shortest_name);
101}
102
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800103namespace {
104std::string_view CompressionName(McapLogger::Compression compression) {
105 switch (compression) {
106 case McapLogger::Compression::kNone:
107 return "";
108 case McapLogger::Compression::kLz4:
109 return "lz4";
110 }
111 LOG(FATAL) << "Unreachable.";
112}
113} // namespace
114
James Kuszmaulc31d7362022-05-27 14:20:04 -0700115McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path,
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800116 Serialization serialization,
117 CanonicalChannelNames canonical_channels,
118 Compression compression)
James Kuszmaulc31d7362022-05-27 14:20:04 -0700119 : event_loop_(event_loop),
120 output_(output_path),
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700121 serialization_(serialization),
James Kuszmaul9f607c62022-10-27 17:01:55 -0700122 canonical_channels_(canonical_channels),
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800123 compression_(compression),
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700124 configuration_channel_([]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700125 // Set up a fake Channel for providing the configuration in the MCAP
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700126 // file. This is included for convenience so that consumers of the MCAP
127 // file can actually dereference things like the channel indices in AOS
128 // timing reports.
129 flatbuffers::FlatBufferBuilder fbb;
130 flatbuffers::Offset<flatbuffers::String> name_offset =
131 fbb.CreateString("");
132 flatbuffers::Offset<flatbuffers::String> type_offset =
133 fbb.CreateString("aos.Configuration");
134 flatbuffers::Offset<reflection::Schema> schema_offset =
135 aos::CopyFlatBuffer(
136 aos::FlatbufferSpan<reflection::Schema>(ConfigurationSchema()),
137 &fbb);
138 Channel::Builder channel(fbb);
139 channel.add_name(name_offset);
140 channel.add_type(type_offset);
141 channel.add_schema(schema_offset);
142 fbb.Finish(channel.Finish());
143 return fbb.Release();
144 }()),
145 configuration_(CopyFlatBuffer(event_loop_->configuration())) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700146 event_loop->SkipTimingReport();
147 event_loop->SkipAosLog();
148 CHECK(output_);
149 WriteMagic();
150 WriteHeader();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700151 // Schemas and channels get written out both at the start and end of the file,
152 // per the MCAP spec.
153 WriteSchemasAndChannels(RegisterHandlers::kYes);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700154}
155
156McapLogger::~McapLogger() {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700157 // If we have any data remaining, write one last chunk.
James Kuszmaul36a25f42022-10-28 10:18:00 -0700158 for (auto &pair : current_chunks_) {
159 if (pair.second.data.tellp() > 0) {
160 WriteChunk(&pair.second);
161 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700162 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700163 WriteDataEnd();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700164
165 // Now we enter the Summary section, where we write out all the channel/index
166 // information that readers need to be able to seek to arbitrary locations
167 // within the log.
168 const uint64_t summary_offset = output_.tellp();
169 const SummaryOffset chunk_indices_offset = WriteChunkIndices();
170 const SummaryOffset stats_offset = WriteStatistics();
171 // Schemas/Channels need to get reproduced in the summary section for random
172 // access reading.
173 const std::vector<SummaryOffset> offsets =
174 WriteSchemasAndChannels(RegisterHandlers::kNo);
175
176 // Next we have the summary offset section, which references the individual
177 // pieces of the summary section.
178 const uint64_t summary_offset_offset = output_.tellp();
179
180 // SummarytOffset's must all be the final thing before the footer.
181 WriteSummaryOffset(chunk_indices_offset);
182 WriteSummaryOffset(stats_offset);
183 for (const auto &offset : offsets) {
184 WriteSummaryOffset(offset);
185 }
186
187 // And finally, the footer which must itself reference the start of the
188 // summary and summary offset sections.
189 WriteFooter(summary_offset, summary_offset_offset);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700190 WriteMagic();
James Kuszmaulc31d7362022-05-27 14:20:04 -0700191
192 // TODO(james): Add compression. With flatbuffers messages that contain large
193 // numbers of zeros (e.g., large grids or thresholded images) this can result
194 // in massive savings.
195 if (VLOG_IS_ON(2)) {
196 // For debugging, print out how much space each channel is taking in the
197 // overall log.
198 LOG(INFO) << total_message_bytes_;
199 std::vector<std::pair<size_t, const Channel *>> channel_bytes;
200 for (const auto &pair : total_channel_bytes_) {
201 channel_bytes.push_back(std::make_pair(pair.second, pair.first));
202 }
203 std::sort(channel_bytes.begin(), channel_bytes.end());
204 for (const auto &pair : channel_bytes) {
205 LOG(INFO) << configuration::StrippedChannelToString(pair.second) << ": "
206 << static_cast<float>(pair.first) * 1e-6 << "MB "
207 << static_cast<float>(pair.first) / total_message_bytes_
208 << "\n";
209 }
210 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700211}
212
James Kuszmaulb3fba252022-04-06 15:13:31 -0700213std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
214 RegisterHandlers register_handlers) {
James Kuszmaulc31d7362022-05-27 14:20:04 -0700215 uint16_t id = 0;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700216 std::map<uint16_t, const Channel *> channels;
217 for (const Channel *channel : *event_loop_->configuration()->channels()) {
James Kuszmaulc31d7362022-05-27 14:20:04 -0700218 ++id;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700219 if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
220 continue;
221 }
222 channels[id] = channel;
223
224 if (register_handlers == RegisterHandlers::kYes) {
225 message_counts_[id] = 0;
226 event_loop_->MakeRawWatcher(
227 channel, [this, id, channel](const Context &context, const void *) {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700228 ChunkStatus *chunk = &current_chunks_[id];
229 WriteMessage(id, channel, context, chunk);
230 if (static_cast<uint64_t>(chunk->data.tellp()) >
James Kuszmaul5c56ed32022-03-30 15:10:07 -0700231 FLAGS_mcap_chunk_size) {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700232 WriteChunk(chunk);
James Kuszmaulb3fba252022-04-06 15:13:31 -0700233 }
234 });
James Kuszmaulc31d7362022-05-27 14:20:04 -0700235 fetchers_[id] = event_loop_->MakeRawFetcher(channel);
236 event_loop_->OnRun([this, id, channel]() {
237 if (FLAGS_fetch && fetchers_[id]->Fetch()) {
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800238 WriteMessage(id, channel, fetchers_[id]->context(),
239 &current_chunks_[id]);
James Kuszmaulc31d7362022-05-27 14:20:04 -0700240 }
241 });
James Kuszmaulb3fba252022-04-06 15:13:31 -0700242 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700243 }
244
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700245 // Manually add in a special /configuration channel.
246 if (register_handlers == RegisterHandlers::kYes) {
247 configuration_id_ = ++id;
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700248 }
249
James Kuszmaulb3fba252022-04-06 15:13:31 -0700250 std::vector<SummaryOffset> offsets;
251
252 const uint64_t schema_offset = output_.tellp();
253
254 for (const auto &pair : channels) {
255 WriteSchema(pair.first, pair.second);
256 }
257
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700258 WriteSchema(configuration_id_, &configuration_channel_.message());
259
James Kuszmaulb3fba252022-04-06 15:13:31 -0700260 const uint64_t channel_offset = output_.tellp();
261
262 offsets.push_back(
263 {OpCode::kSchema, schema_offset, channel_offset - schema_offset});
264
265 for (const auto &pair : channels) {
266 // Write out the channel entry that uses the schema (we just re-use
267 // the schema ID for the channel ID, since we aren't deduplicating
268 // schemas for channels that are of the same type).
269 WriteChannel(pair.first, pair.first, pair.second);
270 }
271
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700272 // Provide the configuration message on a special channel that is just named
273 // "configuration", which is guaranteed not to conflict with existing under
274 // our current naming scheme (since our current scheme will, at a minimum, put
275 // a space between the name/type of a channel).
276 WriteChannel(configuration_id_, configuration_id_,
277 &configuration_channel_.message(), "configuration");
278
James Kuszmaulb3fba252022-04-06 15:13:31 -0700279 offsets.push_back({OpCode::kChannel, channel_offset,
280 static_cast<uint64_t>(output_.tellp()) - channel_offset});
281 return offsets;
282}
283
James Kuszmaulbed2af02023-01-28 15:57:24 -0800284void McapLogger::WriteConfigurationMessage() {
285 Context config_context;
286 config_context.monotonic_event_time = event_loop_->monotonic_now();
287 config_context.queue_index = 0;
288 config_context.size = configuration_.span().size();
289 config_context.data = configuration_.span().data();
290 // Avoid infinite recursion...
291 wrote_configuration_ = true;
292 WriteMessage(configuration_id_, &configuration_channel_.message(),
293 config_context, &current_chunks_[configuration_id_]);
294}
295
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700296void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
297
298void McapLogger::WriteHeader() {
299 string_builder_.Reset();
300 // "profile"
301 AppendString(&string_builder_, "x-aos");
302 // "library"
303 AppendString(&string_builder_, "AOS MCAP converter");
304 WriteRecord(OpCode::kHeader, string_builder_.Result());
305}
306
James Kuszmaulb3fba252022-04-06 15:13:31 -0700307void McapLogger::WriteFooter(uint64_t summary_offset,
308 uint64_t summary_offset_offset) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700309 string_builder_.Reset();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700310 AppendInt64(&string_builder_, summary_offset);
311 AppendInt64(&string_builder_, summary_offset_offset);
312 // CRC32 for the Summary section, which we don't bother populating.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700313 AppendInt32(&string_builder_, 0);
314 WriteRecord(OpCode::kFooter, string_builder_.Result());
315}
316
317void McapLogger::WriteDataEnd() {
318 string_builder_.Reset();
319 // CRC32 for the data, which we are too lazy to calculate.
320 AppendInt32(&string_builder_, 0);
321 WriteRecord(OpCode::kDataEnd, string_builder_.Result());
322}
323
324void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
325 CHECK(channel->has_schema());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700326
327 const FlatbufferDetachedBuffer<reflection::Schema> schema =
James Kuszmaulf1dbaff2023-02-08 21:17:32 -0800328 RecursiveCopyFlatBuffer(channel->schema());
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700329
330 // Write out the schema (we don't bother deduplicating schema types):
331 string_builder_.Reset();
332 // Schema ID
333 AppendInt16(&string_builder_, id);
334 // Type name
335 AppendString(&string_builder_, channel->type()->string_view());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700336 switch (serialization_) {
337 case Serialization::kJson:
338 // Encoding
339 AppendString(&string_builder_, "jsonschema");
340 // Actual schema itself
341 AppendString(&string_builder_,
342 JsonSchemaForFlatbuffer({channel->schema()}).dump());
343 break;
344 case Serialization::kFlatbuffer:
345 // Encoding
346 AppendString(&string_builder_, "flatbuffer");
347 // Actual schema itself
348 AppendString(&string_builder_,
349 {reinterpret_cast<const char *>(schema.span().data()),
350 schema.span().size()});
351 break;
352 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700353 WriteRecord(OpCode::kSchema, string_builder_.Result());
354}
355
356void McapLogger::WriteChannel(const uint16_t id, const uint16_t schema_id,
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700357 const aos::Channel *channel,
358 std::string_view override_name) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700359 string_builder_.Reset();
360 // Channel ID
361 AppendInt16(&string_builder_, id);
362 // Schema ID
363 AppendInt16(&string_builder_, schema_id);
364 // Topic name
James Kuszmaul9f607c62022-10-27 17:01:55 -0700365 std::string topic_name(override_name);
366 if (topic_name.empty()) {
367 switch (canonical_channels_) {
368 case CanonicalChannelNames::kCanonical:
369 topic_name = absl::StrCat(channel->name()->string_view(), " ",
370 channel->type()->string_view());
371 break;
372 case CanonicalChannelNames::kShortened: {
James Kuszmaul1e418f62023-02-26 14:40:20 -0800373 const std::string shortest_name =
374 ShortenedChannelName(event_loop_->configuration(), channel,
375 event_loop_->name(), event_loop_->node());
James Kuszmaul9f607c62022-10-27 17:01:55 -0700376 if (shortest_name != channel->name()->string_view()) {
377 VLOG(1) << "Shortening " << channel->name()->string_view() << " "
378 << channel->type()->string_view() << " to " << shortest_name;
379 }
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800380 topic_name =
381 absl::StrCat(shortest_name, " ", channel->type()->string_view());
James Kuszmaul9f607c62022-10-27 17:01:55 -0700382 break;
383 }
384 }
385 }
386 AppendString(&string_builder_, topic_name);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700387 // Encoding
James Kuszmaulc31d7362022-05-27 14:20:04 -0700388 switch (serialization_) {
389 case Serialization::kJson:
390 AppendString(&string_builder_, "json");
391 break;
392 case Serialization::kFlatbuffer:
393 AppendString(&string_builder_, "flatbuffer");
394 break;
395 }
396
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700397 // Metadata (technically supposed to be a Map<string, string>)
398 AppendString(&string_builder_, "");
399 WriteRecord(OpCode::kChannel, string_builder_.Result());
400}
401
402void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
James Kuszmaul36a25f42022-10-28 10:18:00 -0700403 const Context &context, ChunkStatus *chunk) {
James Kuszmaulbed2af02023-01-28 15:57:24 -0800404 if (!wrote_configuration_) {
405 WriteConfigurationMessage();
406 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700407 CHECK_NOTNULL(context.data);
408
James Kuszmaulb3fba252022-04-06 15:13:31 -0700409 message_counts_[channel_id]++;
410
411 if (!earliest_message_.has_value()) {
412 earliest_message_ = context.monotonic_event_time;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700413 } else {
414 earliest_message_ =
415 std::min(context.monotonic_event_time, earliest_message_.value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700416 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700417 if (!chunk->earliest_message.has_value()) {
418 chunk->earliest_message = context.monotonic_event_time;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700419 } else {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700420 chunk->earliest_message =
421 std::min(context.monotonic_event_time, chunk->earliest_message.value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700422 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700423 chunk->latest_message = context.monotonic_event_time;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700424 latest_message_ = context.monotonic_event_time;
425
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700426 string_builder_.Reset();
427 // Channel ID
428 AppendInt16(&string_builder_, channel_id);
429 // Queue Index
430 AppendInt32(&string_builder_, context.queue_index);
431 // Log time, and publish time. Since we don't log a logged time, just use
432 // published time.
433 // TODO(james): If we use this for multi-node logfiles, use distributed clock.
434 AppendInt64(&string_builder_,
435 context.monotonic_event_time.time_since_epoch().count());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700436 // Note: Foxglove Studio doesn't appear to actually support using publish time
437 // right now.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700438 AppendInt64(&string_builder_,
439 context.monotonic_event_time.time_since_epoch().count());
440
441 CHECK(flatbuffers::Verify(*channel->schema(),
442 *channel->schema()->root_table(),
443 static_cast<const uint8_t *>(context.data),
444 static_cast<size_t>(context.size)))
445 << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
446 << channel->type()->c_str();
447
James Kuszmaulc31d7362022-05-27 14:20:04 -0700448 switch (serialization_) {
449 case Serialization::kJson:
450 aos::FlatbufferToJson(&string_builder_, channel->schema(),
451 static_cast<const uint8_t *>(context.data));
452 break;
453 case Serialization::kFlatbuffer:
454 string_builder_.Append(
455 {static_cast<const char *>(context.data), context.size});
456 break;
457 }
458 total_message_bytes_ += context.size;
459 total_channel_bytes_[channel] += context.size;
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700460
James Kuszmaul36a25f42022-10-28 10:18:00 -0700461 chunk->message_indices[channel_id].push_back(
462 std::make_pair<uint64_t, uint64_t>(
463 context.monotonic_event_time.time_since_epoch().count(),
464 chunk->data.tellp()));
James Kuszmaulb3fba252022-04-06 15:13:31 -0700465
James Kuszmaul36a25f42022-10-28 10:18:00 -0700466 WriteRecord(OpCode::kMessage, string_builder_.Result(), &chunk->data);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700467}
468
James Kuszmaulb3fba252022-04-06 15:13:31 -0700469void McapLogger::WriteRecord(OpCode op, std::string_view record,
470 std::ostream *ostream) {
471 ostream->put(static_cast<char>(op));
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700472 uint64_t record_length = record.size();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700473 ostream->write(reinterpret_cast<const char *>(&record_length),
474 sizeof(record_length));
475 *ostream << record;
476}
477
James Kuszmaul36a25f42022-10-28 10:18:00 -0700478void McapLogger::WriteChunk(ChunkStatus *chunk) {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700479 string_builder_.Reset();
480
James Kuszmaul36a25f42022-10-28 10:18:00 -0700481 CHECK(chunk->earliest_message.has_value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700482 const uint64_t chunk_offset = output_.tellp();
483 AppendInt64(&string_builder_,
James Kuszmaul36a25f42022-10-28 10:18:00 -0700484 chunk->earliest_message->time_since_epoch().count());
485 CHECK(chunk->latest_message.has_value());
486 AppendInt64(&string_builder_,
487 chunk->latest_message.value().time_since_epoch().count());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700488
James Kuszmaul36a25f42022-10-28 10:18:00 -0700489 std::string chunk_records = chunk->data.str();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700490 // Reset the chunk buffer.
James Kuszmaul36a25f42022-10-28 10:18:00 -0700491 chunk->data.str("");
James Kuszmaulb3fba252022-04-06 15:13:31 -0700492
493 const uint64_t records_size = chunk_records.size();
494 // Uncompressed chunk size.
495 AppendInt64(&string_builder_, records_size);
496 // Uncompressed CRC (unpopulated).
497 AppendInt32(&string_builder_, 0);
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800498 // Compression
499 AppendString(&string_builder_, CompressionName(compression_));
500 uint64_t records_size_compressed = records_size;
501 switch (compression_) {
502 case Compression::kNone:
503 AppendBytes(&string_builder_, chunk_records);
504 break;
505 case Compression::kLz4: {
506 // Default preferences.
507 LZ4F_preferences_t *lz4_preferences = nullptr;
508 const uint64_t max_size =
509 LZ4F_compressFrameBound(records_size, lz4_preferences);
510 CHECK_NE(0u, max_size);
511 if (max_size > compression_buffer_.size()) {
512 compression_buffer_.resize(max_size);
513 }
514 records_size_compressed = LZ4F_compressFrame(
515 compression_buffer_.data(), compression_buffer_.size(),
516 reinterpret_cast<const char *>(chunk_records.data()),
517 chunk_records.size(), lz4_preferences);
518 CHECK(!LZ4F_isError(records_size_compressed));
519 AppendBytes(&string_builder_,
520 {reinterpret_cast<const char *>(compression_buffer_.data()),
521 static_cast<size_t>(records_size_compressed)});
522 break;
523 }
524 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700525 WriteRecord(OpCode::kChunk, string_builder_.Result());
526
527 std::map<uint16_t, uint64_t> index_offsets;
528 const uint64_t message_index_start = output_.tellp();
James Kuszmaul36a25f42022-10-28 10:18:00 -0700529 for (const auto &indices : chunk->message_indices) {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700530 index_offsets[indices.first] = output_.tellp();
531 string_builder_.Reset();
532 AppendInt16(&string_builder_, indices.first);
533 AppendMessageIndices(&string_builder_, indices.second);
534 WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
535 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700536 chunk->message_indices.clear();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700537 chunk_indices_.push_back(ChunkIndex{
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800538 .start_time = chunk->earliest_message.value(),
539 .end_time = chunk->latest_message.value(),
540 .offset = chunk_offset,
541 .chunk_size = message_index_start - chunk_offset,
542 .records_size = records_size,
543 .records_size_compressed = records_size_compressed,
544 .message_index_offsets = index_offsets,
545 .message_index_size =
546 static_cast<uint64_t>(output_.tellp()) - message_index_start,
547 .compression = compression_});
James Kuszmaul36a25f42022-10-28 10:18:00 -0700548 chunk->earliest_message.reset();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700549}
550
551McapLogger::SummaryOffset McapLogger::WriteStatistics() {
552 const uint64_t stats_offset = output_.tellp();
553 const uint64_t message_count = std::accumulate(
554 message_counts_.begin(), message_counts_.end(), 0,
555 [](const uint64_t &count, const std::pair<uint16_t, uint64_t> &val) {
556 return count + val.second;
557 });
558 string_builder_.Reset();
559 AppendInt64(&string_builder_, message_count);
560 // Schema count.
561 AppendInt16(&string_builder_, message_counts_.size());
562 // Channel count.
563 AppendInt32(&string_builder_, message_counts_.size());
564 // Attachment count.
565 AppendInt32(&string_builder_, 0);
566 // Metadata count.
567 AppendInt32(&string_builder_, 0);
568 // Chunk count.
569 AppendInt32(&string_builder_, chunk_indices_.size());
570 // Earliest & latest message times.
571 AppendInt64(&string_builder_, earliest_message_->time_since_epoch().count());
572 AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
573 // Per-channel message counts.
574 AppendChannelMap(&string_builder_, message_counts_);
575 WriteRecord(OpCode::kStatistics, string_builder_.Result());
576 return {OpCode::kStatistics, stats_offset,
577 static_cast<uint64_t>(output_.tellp()) - stats_offset};
578}
579
580McapLogger::SummaryOffset McapLogger::WriteChunkIndices() {
581 const uint64_t index_offset = output_.tellp();
582 for (const ChunkIndex &index : chunk_indices_) {
583 string_builder_.Reset();
584 AppendInt64(&string_builder_, index.start_time.time_since_epoch().count());
585 AppendInt64(&string_builder_, index.end_time.time_since_epoch().count());
586 AppendInt64(&string_builder_, index.offset);
587 AppendInt64(&string_builder_, index.chunk_size);
588 AppendChannelMap(&string_builder_, index.message_index_offsets);
589 AppendInt64(&string_builder_, index.message_index_size);
590 // Compression used.
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800591 AppendString(&string_builder_, CompressionName(index.compression));
James Kuszmaulb3fba252022-04-06 15:13:31 -0700592 // Compressed and uncompressed records size.
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800593 AppendInt64(&string_builder_, index.records_size_compressed);
James Kuszmaulb3fba252022-04-06 15:13:31 -0700594 AppendInt64(&string_builder_, index.records_size);
595 WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
596 }
597 return {OpCode::kChunkIndex, index_offset,
598 static_cast<uint64_t>(output_.tellp()) - index_offset};
599}
600
601void McapLogger::WriteSummaryOffset(const SummaryOffset &offset) {
602 string_builder_.Reset();
603 string_builder_.AppendChar(static_cast<char>(offset.op_code));
604 AppendInt64(&string_builder_, offset.offset);
605 AppendInt64(&string_builder_, offset.size);
606 WriteRecord(OpCode::kSummaryOffset, string_builder_.Result());
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700607}
608
609void McapLogger::AppendString(FastStringBuilder *builder,
610 std::string_view string) {
611 AppendInt32(builder, string.size());
612 builder->Append(string);
613}
614
James Kuszmaulb3fba252022-04-06 15:13:31 -0700615void McapLogger::AppendBytes(FastStringBuilder *builder,
616 std::string_view bytes) {
617 AppendInt64(builder, bytes.size());
618 builder->Append(bytes);
619}
620
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700621namespace {
622template <typename T>
623static void AppendInt(FastStringBuilder *builder, T val) {
624 builder->Append(
625 std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
626}
James Kuszmaulb3fba252022-04-06 15:13:31 -0700627template <typename T>
628void AppendMap(FastStringBuilder *builder, const T &map) {
629 AppendInt<uint32_t>(
630 builder, map.size() * (sizeof(typename T::value_type::first_type) +
631 sizeof(typename T::value_type::second_type)));
632 for (const auto &pair : map) {
633 AppendInt(builder, pair.first);
634 AppendInt(builder, pair.second);
635 }
636}
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700637} // namespace
638
James Kuszmaulb3fba252022-04-06 15:13:31 -0700639void McapLogger::AppendChannelMap(FastStringBuilder *builder,
640 const std::map<uint16_t, uint64_t> &map) {
641 AppendMap(builder, map);
642}
643
644void McapLogger::AppendMessageIndices(
645 FastStringBuilder *builder,
646 const std::vector<std::pair<uint64_t, uint64_t>> &messages) {
647 AppendMap(builder, messages);
648}
649
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700650void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
651 AppendInt(builder, val);
652}
653
654void McapLogger::AppendInt32(FastStringBuilder *builder, uint32_t val) {
655 AppendInt(builder, val);
656}
657
658void McapLogger::AppendInt64(FastStringBuilder *builder, uint64_t val) {
659 AppendInt(builder, val);
660}
661} // namespace aos