blob: 111d78445ea38a2e04125432ed9a9014ee8dd283 [file] [log] [blame]
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07001#include "aos/util/mcap_logger.h"
2
3#include "absl/strings/str_replace.h"
James Kuszmaule4aa01d2022-06-28 14:09:02 -07004#include "aos/configuration_schema.h"
James Kuszmaulc31d7362022-05-27 14:20:04 -07005#include "aos/flatbuffer_merge.h"
James Kuszmaul5ab990d2022-11-07 16:35:49 -08006#include "lz4/lz4.h"
7#include "lz4/lz4frame.h"
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07008#include "single_include/nlohmann/json.hpp"
9
James Kuszmaulc31d7362022-05-27 14:20:04 -070010DEFINE_uint64(mcap_chunk_size, 10'000'000,
James Kuszmaul5c56ed32022-03-30 15:10:07 -070011 "Size, in bytes, of individual MCAP chunks");
James Kuszmaulc31d7362022-05-27 14:20:04 -070012DEFINE_bool(fetch, false,
13 "Whether to fetch most recent messages at start of logfile. Turn "
14 "this on if there are, e.g., one-time messages sent before the "
15 "start of the logfile that you need access to. Turn it off if you "
16 "don't want to deal with having messages that have timestamps that "
17 "may be arbitrarily far before any other interesting messages.");
James Kuszmaul5c56ed32022-03-30 15:10:07 -070018
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070019namespace aos {
James Kuszmaulb3fba252022-04-06 15:13:31 -070020
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070021nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
22 JsonSchemaRecursion recursion_level) {
23 nlohmann::json schema;
24 if (recursion_level == JsonSchemaRecursion::kTopLevel) {
25 schema["$schema"] = "https://json-schema.org/draft/2020-12/schema";
26 }
27 schema["type"] = "object";
28 nlohmann::json properties;
29 for (int index = 0; index < type.NumberFields(); ++index) {
30 nlohmann::json field;
31 const bool is_array = type.FieldIsRepeating(index);
32 if (type.FieldIsSequence(index)) {
33 // For sub-tables/structs, just recurse.
34 nlohmann::json subtype = JsonSchemaForFlatbuffer(
35 type.FieldType(index), JsonSchemaRecursion::kNested);
36 if (is_array) {
37 field["type"] = "array";
38 field["items"] = subtype;
39 } else {
40 field = subtype;
41 }
42 } else {
43 std::string elementary_type;
44 switch (type.FieldElementaryType(index)) {
45 case flatbuffers::ET_UTYPE:
46 case flatbuffers::ET_CHAR:
47 case flatbuffers::ET_UCHAR:
48 case flatbuffers::ET_SHORT:
49 case flatbuffers::ET_USHORT:
50 case flatbuffers::ET_INT:
51 case flatbuffers::ET_UINT:
52 case flatbuffers::ET_LONG:
53 case flatbuffers::ET_ULONG:
54 case flatbuffers::ET_FLOAT:
55 case flatbuffers::ET_DOUBLE:
56 elementary_type = "number";
57 break;
58 case flatbuffers::ET_BOOL:
59 elementary_type = "boolean";
60 break;
61 case flatbuffers::ET_STRING:
62 elementary_type = "string";
63 break;
64 case flatbuffers::ET_SEQUENCE:
65 if (type.FieldIsEnum(index)) {
66 elementary_type = "string";
67 } else {
68 LOG(FATAL) << "Should not encounter any sequence fields here.";
69 }
70 break;
71 }
72 if (is_array) {
73 field["type"] = "array";
74 field["items"]["type"] = elementary_type;
75 } else {
76 field["type"] = elementary_type;
77 }
78 }
79 // the nlohmann::json [] operator needs an actual string, not just a
80 // string_view :(.
81 properties[std::string(type.FieldName(index))] = field;
82 }
83 schema["properties"] = properties;
84 return schema;
85}
86
James Kuszmaul1e418f62023-02-26 14:40:20 -080087std::string ShortenedChannelName(const aos::Configuration *config,
88 const aos::Channel *channel,
89 std::string_view application_name,
90 const aos::Node *node) {
91 std::set<std::string> names =
92 configuration::GetChannelAliases(config, channel, application_name, node);
93 std::string_view shortest_name;
94 for (const std::string &name : names) {
95 if (shortest_name.empty() || name.size() < shortest_name.size()) {
96 shortest_name = name;
97 }
98 }
99 return std::string(shortest_name);
100}
101
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800102namespace {
103std::string_view CompressionName(McapLogger::Compression compression) {
104 switch (compression) {
105 case McapLogger::Compression::kNone:
106 return "";
107 case McapLogger::Compression::kLz4:
108 return "lz4";
109 }
110 LOG(FATAL) << "Unreachable.";
111}
112} // namespace
113
James Kuszmaulc31d7362022-05-27 14:20:04 -0700114McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path,
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800115 Serialization serialization,
116 CanonicalChannelNames canonical_channels,
117 Compression compression)
James Kuszmaulc31d7362022-05-27 14:20:04 -0700118 : event_loop_(event_loop),
119 output_(output_path),
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700120 serialization_(serialization),
James Kuszmaul9f607c62022-10-27 17:01:55 -0700121 canonical_channels_(canonical_channels),
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800122 compression_(compression),
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700123 configuration_channel_([]() {
124 // Setup a fake Channel for providing the configuration in the MCAP
125 // file. This is included for convenience so that consumers of the MCAP
126 // file can actually dereference things like the channel indices in AOS
127 // timing reports.
128 flatbuffers::FlatBufferBuilder fbb;
129 flatbuffers::Offset<flatbuffers::String> name_offset =
130 fbb.CreateString("");
131 flatbuffers::Offset<flatbuffers::String> type_offset =
132 fbb.CreateString("aos.Configuration");
133 flatbuffers::Offset<reflection::Schema> schema_offset =
134 aos::CopyFlatBuffer(
135 aos::FlatbufferSpan<reflection::Schema>(ConfigurationSchema()),
136 &fbb);
137 Channel::Builder channel(fbb);
138 channel.add_name(name_offset);
139 channel.add_type(type_offset);
140 channel.add_schema(schema_offset);
141 fbb.Finish(channel.Finish());
142 return fbb.Release();
143 }()),
144 configuration_(CopyFlatBuffer(event_loop_->configuration())) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700145 event_loop->SkipTimingReport();
146 event_loop->SkipAosLog();
147 CHECK(output_);
148 WriteMagic();
149 WriteHeader();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700150 // Schemas and channels get written out both at the start and end of the file,
151 // per the MCAP spec.
152 WriteSchemasAndChannels(RegisterHandlers::kYes);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700153}
154
155McapLogger::~McapLogger() {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700156 // If we have any data remaining, write one last chunk.
James Kuszmaul36a25f42022-10-28 10:18:00 -0700157 for (auto &pair : current_chunks_) {
158 if (pair.second.data.tellp() > 0) {
159 WriteChunk(&pair.second);
160 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700161 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700162 WriteDataEnd();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700163
164 // Now we enter the Summary section, where we write out all the channel/index
165 // information that readers need to be able to seek to arbitrary locations
166 // within the log.
167 const uint64_t summary_offset = output_.tellp();
168 const SummaryOffset chunk_indices_offset = WriteChunkIndices();
169 const SummaryOffset stats_offset = WriteStatistics();
170 // Schemas/Channels need to get reproduced in the summary section for random
171 // access reading.
172 const std::vector<SummaryOffset> offsets =
173 WriteSchemasAndChannels(RegisterHandlers::kNo);
174
175 // Next we have the summary offset section, which references the individual
176 // pieces of the summary section.
177 const uint64_t summary_offset_offset = output_.tellp();
178
179 // SummarytOffset's must all be the final thing before the footer.
180 WriteSummaryOffset(chunk_indices_offset);
181 WriteSummaryOffset(stats_offset);
182 for (const auto &offset : offsets) {
183 WriteSummaryOffset(offset);
184 }
185
186 // And finally, the footer which must itself reference the start of the
187 // summary and summary offset sections.
188 WriteFooter(summary_offset, summary_offset_offset);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700189 WriteMagic();
James Kuszmaulc31d7362022-05-27 14:20:04 -0700190
191 // TODO(james): Add compression. With flatbuffers messages that contain large
192 // numbers of zeros (e.g., large grids or thresholded images) this can result
193 // in massive savings.
194 if (VLOG_IS_ON(2)) {
195 // For debugging, print out how much space each channel is taking in the
196 // overall log.
197 LOG(INFO) << total_message_bytes_;
198 std::vector<std::pair<size_t, const Channel *>> channel_bytes;
199 for (const auto &pair : total_channel_bytes_) {
200 channel_bytes.push_back(std::make_pair(pair.second, pair.first));
201 }
202 std::sort(channel_bytes.begin(), channel_bytes.end());
203 for (const auto &pair : channel_bytes) {
204 LOG(INFO) << configuration::StrippedChannelToString(pair.second) << ": "
205 << static_cast<float>(pair.first) * 1e-6 << "MB "
206 << static_cast<float>(pair.first) / total_message_bytes_
207 << "\n";
208 }
209 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700210}
211
James Kuszmaulb3fba252022-04-06 15:13:31 -0700212std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
213 RegisterHandlers register_handlers) {
James Kuszmaulc31d7362022-05-27 14:20:04 -0700214 uint16_t id = 0;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700215 std::map<uint16_t, const Channel *> channels;
216 for (const Channel *channel : *event_loop_->configuration()->channels()) {
James Kuszmaulc31d7362022-05-27 14:20:04 -0700217 ++id;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700218 if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
219 continue;
220 }
221 channels[id] = channel;
222
223 if (register_handlers == RegisterHandlers::kYes) {
224 message_counts_[id] = 0;
225 event_loop_->MakeRawWatcher(
226 channel, [this, id, channel](const Context &context, const void *) {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700227 ChunkStatus *chunk = &current_chunks_[id];
228 WriteMessage(id, channel, context, chunk);
229 if (static_cast<uint64_t>(chunk->data.tellp()) >
James Kuszmaul5c56ed32022-03-30 15:10:07 -0700230 FLAGS_mcap_chunk_size) {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700231 WriteChunk(chunk);
James Kuszmaulb3fba252022-04-06 15:13:31 -0700232 }
233 });
James Kuszmaulc31d7362022-05-27 14:20:04 -0700234 fetchers_[id] = event_loop_->MakeRawFetcher(channel);
235 event_loop_->OnRun([this, id, channel]() {
236 if (FLAGS_fetch && fetchers_[id]->Fetch()) {
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800237 WriteMessage(id, channel, fetchers_[id]->context(),
238 &current_chunks_[id]);
James Kuszmaulc31d7362022-05-27 14:20:04 -0700239 }
240 });
James Kuszmaulb3fba252022-04-06 15:13:31 -0700241 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700242 }
243
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700244 // Manually add in a special /configuration channel.
245 if (register_handlers == RegisterHandlers::kYes) {
246 configuration_id_ = ++id;
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700247 }
248
James Kuszmaulb3fba252022-04-06 15:13:31 -0700249 std::vector<SummaryOffset> offsets;
250
251 const uint64_t schema_offset = output_.tellp();
252
253 for (const auto &pair : channels) {
254 WriteSchema(pair.first, pair.second);
255 }
256
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700257 WriteSchema(configuration_id_, &configuration_channel_.message());
258
James Kuszmaulb3fba252022-04-06 15:13:31 -0700259 const uint64_t channel_offset = output_.tellp();
260
261 offsets.push_back(
262 {OpCode::kSchema, schema_offset, channel_offset - schema_offset});
263
264 for (const auto &pair : channels) {
265 // Write out the channel entry that uses the schema (we just re-use
266 // the schema ID for the channel ID, since we aren't deduplicating
267 // schemas for channels that are of the same type).
268 WriteChannel(pair.first, pair.first, pair.second);
269 }
270
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700271 // Provide the configuration message on a special channel that is just named
272 // "configuration", which is guaranteed not to conflict with existing under
273 // our current naming scheme (since our current scheme will, at a minimum, put
274 // a space between the name/type of a channel).
275 WriteChannel(configuration_id_, configuration_id_,
276 &configuration_channel_.message(), "configuration");
277
James Kuszmaulb3fba252022-04-06 15:13:31 -0700278 offsets.push_back({OpCode::kChannel, channel_offset,
279 static_cast<uint64_t>(output_.tellp()) - channel_offset});
280 return offsets;
281}
282
James Kuszmaulbed2af02023-01-28 15:57:24 -0800283void McapLogger::WriteConfigurationMessage() {
284 Context config_context;
285 config_context.monotonic_event_time = event_loop_->monotonic_now();
286 config_context.queue_index = 0;
287 config_context.size = configuration_.span().size();
288 config_context.data = configuration_.span().data();
289 // Avoid infinite recursion...
290 wrote_configuration_ = true;
291 WriteMessage(configuration_id_, &configuration_channel_.message(),
292 config_context, &current_chunks_[configuration_id_]);
293}
294
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700295void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
296
297void McapLogger::WriteHeader() {
298 string_builder_.Reset();
299 // "profile"
300 AppendString(&string_builder_, "x-aos");
301 // "library"
302 AppendString(&string_builder_, "AOS MCAP converter");
303 WriteRecord(OpCode::kHeader, string_builder_.Result());
304}
305
James Kuszmaulb3fba252022-04-06 15:13:31 -0700306void McapLogger::WriteFooter(uint64_t summary_offset,
307 uint64_t summary_offset_offset) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700308 string_builder_.Reset();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700309 AppendInt64(&string_builder_, summary_offset);
310 AppendInt64(&string_builder_, summary_offset_offset);
311 // CRC32 for the Summary section, which we don't bother populating.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700312 AppendInt32(&string_builder_, 0);
313 WriteRecord(OpCode::kFooter, string_builder_.Result());
314}
315
316void McapLogger::WriteDataEnd() {
317 string_builder_.Reset();
318 // CRC32 for the data, which we are too lazy to calculate.
319 AppendInt32(&string_builder_, 0);
320 WriteRecord(OpCode::kDataEnd, string_builder_.Result());
321}
322
323void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
324 CHECK(channel->has_schema());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700325
326 const FlatbufferDetachedBuffer<reflection::Schema> schema =
James Kuszmaulf1dbaff2023-02-08 21:17:32 -0800327 RecursiveCopyFlatBuffer(channel->schema());
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700328
329 // Write out the schema (we don't bother deduplicating schema types):
330 string_builder_.Reset();
331 // Schema ID
332 AppendInt16(&string_builder_, id);
333 // Type name
334 AppendString(&string_builder_, channel->type()->string_view());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700335 switch (serialization_) {
336 case Serialization::kJson:
337 // Encoding
338 AppendString(&string_builder_, "jsonschema");
339 // Actual schema itself
340 AppendString(&string_builder_,
341 JsonSchemaForFlatbuffer({channel->schema()}).dump());
342 break;
343 case Serialization::kFlatbuffer:
344 // Encoding
345 AppendString(&string_builder_, "flatbuffer");
346 // Actual schema itself
347 AppendString(&string_builder_,
348 {reinterpret_cast<const char *>(schema.span().data()),
349 schema.span().size()});
350 break;
351 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700352 WriteRecord(OpCode::kSchema, string_builder_.Result());
353}
354
355void McapLogger::WriteChannel(const uint16_t id, const uint16_t schema_id,
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700356 const aos::Channel *channel,
357 std::string_view override_name) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700358 string_builder_.Reset();
359 // Channel ID
360 AppendInt16(&string_builder_, id);
361 // Schema ID
362 AppendInt16(&string_builder_, schema_id);
363 // Topic name
James Kuszmaul9f607c62022-10-27 17:01:55 -0700364 std::string topic_name(override_name);
365 if (topic_name.empty()) {
366 switch (canonical_channels_) {
367 case CanonicalChannelNames::kCanonical:
368 topic_name = absl::StrCat(channel->name()->string_view(), " ",
369 channel->type()->string_view());
370 break;
371 case CanonicalChannelNames::kShortened: {
James Kuszmaul1e418f62023-02-26 14:40:20 -0800372 const std::string shortest_name =
373 ShortenedChannelName(event_loop_->configuration(), channel,
374 event_loop_->name(), event_loop_->node());
James Kuszmaul9f607c62022-10-27 17:01:55 -0700375 if (shortest_name != channel->name()->string_view()) {
376 VLOG(1) << "Shortening " << channel->name()->string_view() << " "
377 << channel->type()->string_view() << " to " << shortest_name;
378 }
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800379 topic_name =
380 absl::StrCat(shortest_name, " ", channel->type()->string_view());
James Kuszmaul9f607c62022-10-27 17:01:55 -0700381 break;
382 }
383 }
384 }
385 AppendString(&string_builder_, topic_name);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700386 // Encoding
James Kuszmaulc31d7362022-05-27 14:20:04 -0700387 switch (serialization_) {
388 case Serialization::kJson:
389 AppendString(&string_builder_, "json");
390 break;
391 case Serialization::kFlatbuffer:
392 AppendString(&string_builder_, "flatbuffer");
393 break;
394 }
395
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700396 // Metadata (technically supposed to be a Map<string, string>)
397 AppendString(&string_builder_, "");
398 WriteRecord(OpCode::kChannel, string_builder_.Result());
399}
400
401void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
James Kuszmaul36a25f42022-10-28 10:18:00 -0700402 const Context &context, ChunkStatus *chunk) {
James Kuszmaulbed2af02023-01-28 15:57:24 -0800403 if (!wrote_configuration_) {
404 WriteConfigurationMessage();
405 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700406 CHECK_NOTNULL(context.data);
407
James Kuszmaulb3fba252022-04-06 15:13:31 -0700408 message_counts_[channel_id]++;
409
410 if (!earliest_message_.has_value()) {
411 earliest_message_ = context.monotonic_event_time;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700412 } else {
413 earliest_message_ =
414 std::min(context.monotonic_event_time, earliest_message_.value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700415 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700416 if (!chunk->earliest_message.has_value()) {
417 chunk->earliest_message = context.monotonic_event_time;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700418 } else {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700419 chunk->earliest_message =
420 std::min(context.monotonic_event_time, chunk->earliest_message.value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700421 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700422 chunk->latest_message = context.monotonic_event_time;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700423 latest_message_ = context.monotonic_event_time;
424
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700425 string_builder_.Reset();
426 // Channel ID
427 AppendInt16(&string_builder_, channel_id);
428 // Queue Index
429 AppendInt32(&string_builder_, context.queue_index);
430 // Log time, and publish time. Since we don't log a logged time, just use
431 // published time.
432 // TODO(james): If we use this for multi-node logfiles, use distributed clock.
433 AppendInt64(&string_builder_,
434 context.monotonic_event_time.time_since_epoch().count());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700435 // Note: Foxglove Studio doesn't appear to actually support using publish time
436 // right now.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700437 AppendInt64(&string_builder_,
438 context.monotonic_event_time.time_since_epoch().count());
439
440 CHECK(flatbuffers::Verify(*channel->schema(),
441 *channel->schema()->root_table(),
442 static_cast<const uint8_t *>(context.data),
443 static_cast<size_t>(context.size)))
444 << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
445 << channel->type()->c_str();
446
James Kuszmaulc31d7362022-05-27 14:20:04 -0700447 switch (serialization_) {
448 case Serialization::kJson:
449 aos::FlatbufferToJson(&string_builder_, channel->schema(),
450 static_cast<const uint8_t *>(context.data));
451 break;
452 case Serialization::kFlatbuffer:
453 string_builder_.Append(
454 {static_cast<const char *>(context.data), context.size});
455 break;
456 }
457 total_message_bytes_ += context.size;
458 total_channel_bytes_[channel] += context.size;
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700459
James Kuszmaul36a25f42022-10-28 10:18:00 -0700460 chunk->message_indices[channel_id].push_back(
461 std::make_pair<uint64_t, uint64_t>(
462 context.monotonic_event_time.time_since_epoch().count(),
463 chunk->data.tellp()));
James Kuszmaulb3fba252022-04-06 15:13:31 -0700464
James Kuszmaul36a25f42022-10-28 10:18:00 -0700465 WriteRecord(OpCode::kMessage, string_builder_.Result(), &chunk->data);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700466}
467
James Kuszmaulb3fba252022-04-06 15:13:31 -0700468void McapLogger::WriteRecord(OpCode op, std::string_view record,
469 std::ostream *ostream) {
470 ostream->put(static_cast<char>(op));
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700471 uint64_t record_length = record.size();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700472 ostream->write(reinterpret_cast<const char *>(&record_length),
473 sizeof(record_length));
474 *ostream << record;
475}
476
James Kuszmaul36a25f42022-10-28 10:18:00 -0700477void McapLogger::WriteChunk(ChunkStatus *chunk) {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700478 string_builder_.Reset();
479
James Kuszmaul36a25f42022-10-28 10:18:00 -0700480 CHECK(chunk->earliest_message.has_value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700481 const uint64_t chunk_offset = output_.tellp();
482 AppendInt64(&string_builder_,
James Kuszmaul36a25f42022-10-28 10:18:00 -0700483 chunk->earliest_message->time_since_epoch().count());
484 CHECK(chunk->latest_message.has_value());
485 AppendInt64(&string_builder_,
486 chunk->latest_message.value().time_since_epoch().count());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700487
James Kuszmaul36a25f42022-10-28 10:18:00 -0700488 std::string chunk_records = chunk->data.str();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700489 // Reset the chunk buffer.
James Kuszmaul36a25f42022-10-28 10:18:00 -0700490 chunk->data.str("");
James Kuszmaulb3fba252022-04-06 15:13:31 -0700491
492 const uint64_t records_size = chunk_records.size();
493 // Uncompressed chunk size.
494 AppendInt64(&string_builder_, records_size);
495 // Uncompressed CRC (unpopulated).
496 AppendInt32(&string_builder_, 0);
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800497 // Compression
498 AppendString(&string_builder_, CompressionName(compression_));
499 uint64_t records_size_compressed = records_size;
500 switch (compression_) {
501 case Compression::kNone:
502 AppendBytes(&string_builder_, chunk_records);
503 break;
504 case Compression::kLz4: {
505 // Default preferences.
506 LZ4F_preferences_t *lz4_preferences = nullptr;
507 const uint64_t max_size =
508 LZ4F_compressFrameBound(records_size, lz4_preferences);
509 CHECK_NE(0u, max_size);
510 if (max_size > compression_buffer_.size()) {
511 compression_buffer_.resize(max_size);
512 }
513 records_size_compressed = LZ4F_compressFrame(
514 compression_buffer_.data(), compression_buffer_.size(),
515 reinterpret_cast<const char *>(chunk_records.data()),
516 chunk_records.size(), lz4_preferences);
517 CHECK(!LZ4F_isError(records_size_compressed));
518 AppendBytes(&string_builder_,
519 {reinterpret_cast<const char *>(compression_buffer_.data()),
520 static_cast<size_t>(records_size_compressed)});
521 break;
522 }
523 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700524 WriteRecord(OpCode::kChunk, string_builder_.Result());
525
526 std::map<uint16_t, uint64_t> index_offsets;
527 const uint64_t message_index_start = output_.tellp();
James Kuszmaul36a25f42022-10-28 10:18:00 -0700528 for (const auto &indices : chunk->message_indices) {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700529 index_offsets[indices.first] = output_.tellp();
530 string_builder_.Reset();
531 AppendInt16(&string_builder_, indices.first);
532 AppendMessageIndices(&string_builder_, indices.second);
533 WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
534 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700535 chunk->message_indices.clear();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700536 chunk_indices_.push_back(ChunkIndex{
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800537 .start_time = chunk->earliest_message.value(),
538 .end_time = chunk->latest_message.value(),
539 .offset = chunk_offset,
540 .chunk_size = message_index_start - chunk_offset,
541 .records_size = records_size,
542 .records_size_compressed = records_size_compressed,
543 .message_index_offsets = index_offsets,
544 .message_index_size =
545 static_cast<uint64_t>(output_.tellp()) - message_index_start,
546 .compression = compression_});
James Kuszmaul36a25f42022-10-28 10:18:00 -0700547 chunk->earliest_message.reset();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700548}
549
550McapLogger::SummaryOffset McapLogger::WriteStatistics() {
551 const uint64_t stats_offset = output_.tellp();
552 const uint64_t message_count = std::accumulate(
553 message_counts_.begin(), message_counts_.end(), 0,
554 [](const uint64_t &count, const std::pair<uint16_t, uint64_t> &val) {
555 return count + val.second;
556 });
557 string_builder_.Reset();
558 AppendInt64(&string_builder_, message_count);
559 // Schema count.
560 AppendInt16(&string_builder_, message_counts_.size());
561 // Channel count.
562 AppendInt32(&string_builder_, message_counts_.size());
563 // Attachment count.
564 AppendInt32(&string_builder_, 0);
565 // Metadata count.
566 AppendInt32(&string_builder_, 0);
567 // Chunk count.
568 AppendInt32(&string_builder_, chunk_indices_.size());
569 // Earliest & latest message times.
570 AppendInt64(&string_builder_, earliest_message_->time_since_epoch().count());
571 AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
572 // Per-channel message counts.
573 AppendChannelMap(&string_builder_, message_counts_);
574 WriteRecord(OpCode::kStatistics, string_builder_.Result());
575 return {OpCode::kStatistics, stats_offset,
576 static_cast<uint64_t>(output_.tellp()) - stats_offset};
577}
578
579McapLogger::SummaryOffset McapLogger::WriteChunkIndices() {
580 const uint64_t index_offset = output_.tellp();
581 for (const ChunkIndex &index : chunk_indices_) {
582 string_builder_.Reset();
583 AppendInt64(&string_builder_, index.start_time.time_since_epoch().count());
584 AppendInt64(&string_builder_, index.end_time.time_since_epoch().count());
585 AppendInt64(&string_builder_, index.offset);
586 AppendInt64(&string_builder_, index.chunk_size);
587 AppendChannelMap(&string_builder_, index.message_index_offsets);
588 AppendInt64(&string_builder_, index.message_index_size);
589 // Compression used.
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800590 AppendString(&string_builder_, CompressionName(index.compression));
James Kuszmaulb3fba252022-04-06 15:13:31 -0700591 // Compressed and uncompressed records size.
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800592 AppendInt64(&string_builder_, index.records_size_compressed);
James Kuszmaulb3fba252022-04-06 15:13:31 -0700593 AppendInt64(&string_builder_, index.records_size);
594 WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
595 }
596 return {OpCode::kChunkIndex, index_offset,
597 static_cast<uint64_t>(output_.tellp()) - index_offset};
598}
599
600void McapLogger::WriteSummaryOffset(const SummaryOffset &offset) {
601 string_builder_.Reset();
602 string_builder_.AppendChar(static_cast<char>(offset.op_code));
603 AppendInt64(&string_builder_, offset.offset);
604 AppendInt64(&string_builder_, offset.size);
605 WriteRecord(OpCode::kSummaryOffset, string_builder_.Result());
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700606}
607
608void McapLogger::AppendString(FastStringBuilder *builder,
609 std::string_view string) {
610 AppendInt32(builder, string.size());
611 builder->Append(string);
612}
613
James Kuszmaulb3fba252022-04-06 15:13:31 -0700614void McapLogger::AppendBytes(FastStringBuilder *builder,
615 std::string_view bytes) {
616 AppendInt64(builder, bytes.size());
617 builder->Append(bytes);
618}
619
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700620namespace {
621template <typename T>
622static void AppendInt(FastStringBuilder *builder, T val) {
623 builder->Append(
624 std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
625}
James Kuszmaulb3fba252022-04-06 15:13:31 -0700626template <typename T>
627void AppendMap(FastStringBuilder *builder, const T &map) {
628 AppendInt<uint32_t>(
629 builder, map.size() * (sizeof(typename T::value_type::first_type) +
630 sizeof(typename T::value_type::second_type)));
631 for (const auto &pair : map) {
632 AppendInt(builder, pair.first);
633 AppendInt(builder, pair.second);
634 }
635}
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700636} // namespace
637
James Kuszmaulb3fba252022-04-06 15:13:31 -0700638void McapLogger::AppendChannelMap(FastStringBuilder *builder,
639 const std::map<uint16_t, uint64_t> &map) {
640 AppendMap(builder, map);
641}
642
643void McapLogger::AppendMessageIndices(
644 FastStringBuilder *builder,
645 const std::vector<std::pair<uint64_t, uint64_t>> &messages) {
646 AppendMap(builder, messages);
647}
648
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700649void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
650 AppendInt(builder, val);
651}
652
653void McapLogger::AppendInt32(FastStringBuilder *builder, uint32_t val) {
654 AppendInt(builder, val);
655}
656
657void McapLogger::AppendInt64(FastStringBuilder *builder, uint64_t val) {
658 AppendInt(builder, val);
659}
660} // namespace aos