blob: 818844bdad00f3a5c6396de891869aa39289689b [file] [log] [blame]
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07001#include "aos/util/mcap_logger.h"
2
3#include "absl/strings/str_replace.h"
James Kuszmaule4aa01d2022-06-28 14:09:02 -07004#include "aos/configuration_schema.h"
James Kuszmaulc31d7362022-05-27 14:20:04 -07005#include "aos/flatbuffer_merge.h"
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07006#include "single_include/nlohmann/json.hpp"
7
James Kuszmaulc31d7362022-05-27 14:20:04 -07008DEFINE_uint64(mcap_chunk_size, 10'000'000,
James Kuszmaul5c56ed32022-03-30 15:10:07 -07009 "Size, in bytes, of individual MCAP chunks");
James Kuszmaulc31d7362022-05-27 14:20:04 -070010DEFINE_bool(fetch, false,
11 "Whether to fetch most recent messages at start of logfile. Turn "
12 "this on if there are, e.g., one-time messages sent before the "
13 "start of the logfile that you need access to. Turn it off if you "
14 "don't want to deal with having messages that have timestamps that "
15 "may be arbitrarily far before any other interesting messages.");
James Kuszmaul5c56ed32022-03-30 15:10:07 -070016
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070017namespace aos {
James Kuszmaulb3fba252022-04-06 15:13:31 -070018
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070019nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
20 JsonSchemaRecursion recursion_level) {
21 nlohmann::json schema;
22 if (recursion_level == JsonSchemaRecursion::kTopLevel) {
23 schema["$schema"] = "https://json-schema.org/draft/2020-12/schema";
24 }
25 schema["type"] = "object";
26 nlohmann::json properties;
27 for (int index = 0; index < type.NumberFields(); ++index) {
28 nlohmann::json field;
29 const bool is_array = type.FieldIsRepeating(index);
30 if (type.FieldIsSequence(index)) {
31 // For sub-tables/structs, just recurse.
32 nlohmann::json subtype = JsonSchemaForFlatbuffer(
33 type.FieldType(index), JsonSchemaRecursion::kNested);
34 if (is_array) {
35 field["type"] = "array";
36 field["items"] = subtype;
37 } else {
38 field = subtype;
39 }
40 } else {
41 std::string elementary_type;
42 switch (type.FieldElementaryType(index)) {
43 case flatbuffers::ET_UTYPE:
44 case flatbuffers::ET_CHAR:
45 case flatbuffers::ET_UCHAR:
46 case flatbuffers::ET_SHORT:
47 case flatbuffers::ET_USHORT:
48 case flatbuffers::ET_INT:
49 case flatbuffers::ET_UINT:
50 case flatbuffers::ET_LONG:
51 case flatbuffers::ET_ULONG:
52 case flatbuffers::ET_FLOAT:
53 case flatbuffers::ET_DOUBLE:
54 elementary_type = "number";
55 break;
56 case flatbuffers::ET_BOOL:
57 elementary_type = "boolean";
58 break;
59 case flatbuffers::ET_STRING:
60 elementary_type = "string";
61 break;
62 case flatbuffers::ET_SEQUENCE:
63 if (type.FieldIsEnum(index)) {
64 elementary_type = "string";
65 } else {
66 LOG(FATAL) << "Should not encounter any sequence fields here.";
67 }
68 break;
69 }
70 if (is_array) {
71 field["type"] = "array";
72 field["items"]["type"] = elementary_type;
73 } else {
74 field["type"] = elementary_type;
75 }
76 }
77 // the nlohmann::json [] operator needs an actual string, not just a
78 // string_view :(.
79 properties[std::string(type.FieldName(index))] = field;
80 }
81 schema["properties"] = properties;
82 return schema;
83}
84
James Kuszmaulc31d7362022-05-27 14:20:04 -070085McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path,
James Kuszmaul9f607c62022-10-27 17:01:55 -070086 Serialization serialization, CanonicalChannelNames canonical_channels)
James Kuszmaulc31d7362022-05-27 14:20:04 -070087 : event_loop_(event_loop),
88 output_(output_path),
James Kuszmaule4aa01d2022-06-28 14:09:02 -070089 serialization_(serialization),
James Kuszmaul9f607c62022-10-27 17:01:55 -070090 canonical_channels_(canonical_channels),
James Kuszmaule4aa01d2022-06-28 14:09:02 -070091 configuration_channel_([]() {
92 // Setup a fake Channel for providing the configuration in the MCAP
93 // file. This is included for convenience so that consumers of the MCAP
94 // file can actually dereference things like the channel indices in AOS
95 // timing reports.
96 flatbuffers::FlatBufferBuilder fbb;
97 flatbuffers::Offset<flatbuffers::String> name_offset =
98 fbb.CreateString("");
99 flatbuffers::Offset<flatbuffers::String> type_offset =
100 fbb.CreateString("aos.Configuration");
101 flatbuffers::Offset<reflection::Schema> schema_offset =
102 aos::CopyFlatBuffer(
103 aos::FlatbufferSpan<reflection::Schema>(ConfigurationSchema()),
104 &fbb);
105 Channel::Builder channel(fbb);
106 channel.add_name(name_offset);
107 channel.add_type(type_offset);
108 channel.add_schema(schema_offset);
109 fbb.Finish(channel.Finish());
110 return fbb.Release();
111 }()),
112 configuration_(CopyFlatBuffer(event_loop_->configuration())) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700113 event_loop->SkipTimingReport();
114 event_loop->SkipAosLog();
115 CHECK(output_);
116 WriteMagic();
117 WriteHeader();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700118 // Schemas and channels get written out both at the start and end of the file,
119 // per the MCAP spec.
120 WriteSchemasAndChannels(RegisterHandlers::kYes);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700121}
122
123McapLogger::~McapLogger() {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700124 // If we have any data remaining, write one last chunk.
James Kuszmaul36a25f42022-10-28 10:18:00 -0700125 for (auto &pair : current_chunks_) {
126 if (pair.second.data.tellp() > 0) {
127 WriteChunk(&pair.second);
128 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700129 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700130 WriteDataEnd();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700131
132 // Now we enter the Summary section, where we write out all the channel/index
133 // information that readers need to be able to seek to arbitrary locations
134 // within the log.
135 const uint64_t summary_offset = output_.tellp();
136 const SummaryOffset chunk_indices_offset = WriteChunkIndices();
137 const SummaryOffset stats_offset = WriteStatistics();
138 // Schemas/Channels need to get reproduced in the summary section for random
139 // access reading.
140 const std::vector<SummaryOffset> offsets =
141 WriteSchemasAndChannels(RegisterHandlers::kNo);
142
143 // Next we have the summary offset section, which references the individual
144 // pieces of the summary section.
145 const uint64_t summary_offset_offset = output_.tellp();
146
147 // SummarytOffset's must all be the final thing before the footer.
148 WriteSummaryOffset(chunk_indices_offset);
149 WriteSummaryOffset(stats_offset);
150 for (const auto &offset : offsets) {
151 WriteSummaryOffset(offset);
152 }
153
154 // And finally, the footer which must itself reference the start of the
155 // summary and summary offset sections.
156 WriteFooter(summary_offset, summary_offset_offset);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700157 WriteMagic();
James Kuszmaulc31d7362022-05-27 14:20:04 -0700158
159 // TODO(james): Add compression. With flatbuffers messages that contain large
160 // numbers of zeros (e.g., large grids or thresholded images) this can result
161 // in massive savings.
162 if (VLOG_IS_ON(2)) {
163 // For debugging, print out how much space each channel is taking in the
164 // overall log.
165 LOG(INFO) << total_message_bytes_;
166 std::vector<std::pair<size_t, const Channel *>> channel_bytes;
167 for (const auto &pair : total_channel_bytes_) {
168 channel_bytes.push_back(std::make_pair(pair.second, pair.first));
169 }
170 std::sort(channel_bytes.begin(), channel_bytes.end());
171 for (const auto &pair : channel_bytes) {
172 LOG(INFO) << configuration::StrippedChannelToString(pair.second) << ": "
173 << static_cast<float>(pair.first) * 1e-6 << "MB "
174 << static_cast<float>(pair.first) / total_message_bytes_
175 << "\n";
176 }
177 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700178}
179
James Kuszmaulb3fba252022-04-06 15:13:31 -0700180std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
181 RegisterHandlers register_handlers) {
James Kuszmaulc31d7362022-05-27 14:20:04 -0700182 uint16_t id = 0;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700183 std::map<uint16_t, const Channel *> channels;
184 for (const Channel *channel : *event_loop_->configuration()->channels()) {
James Kuszmaulc31d7362022-05-27 14:20:04 -0700185 ++id;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700186 if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
187 continue;
188 }
189 channels[id] = channel;
190
191 if (register_handlers == RegisterHandlers::kYes) {
192 message_counts_[id] = 0;
193 event_loop_->MakeRawWatcher(
194 channel, [this, id, channel](const Context &context, const void *) {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700195 ChunkStatus *chunk = &current_chunks_[id];
196 WriteMessage(id, channel, context, chunk);
197 if (static_cast<uint64_t>(chunk->data.tellp()) >
James Kuszmaul5c56ed32022-03-30 15:10:07 -0700198 FLAGS_mcap_chunk_size) {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700199 WriteChunk(chunk);
James Kuszmaulb3fba252022-04-06 15:13:31 -0700200 }
201 });
James Kuszmaulc31d7362022-05-27 14:20:04 -0700202 fetchers_[id] = event_loop_->MakeRawFetcher(channel);
203 event_loop_->OnRun([this, id, channel]() {
204 if (FLAGS_fetch && fetchers_[id]->Fetch()) {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700205 WriteMessage(id, channel, fetchers_[id]->context(), &current_chunks_[id]);
James Kuszmaulc31d7362022-05-27 14:20:04 -0700206 }
207 });
James Kuszmaulb3fba252022-04-06 15:13:31 -0700208 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700209 }
210
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700211 // Manually add in a special /configuration channel.
212 if (register_handlers == RegisterHandlers::kYes) {
213 configuration_id_ = ++id;
214 event_loop_->OnRun([this]() {
215 Context config_context;
216 config_context.monotonic_event_time = event_loop_->monotonic_now();
217 config_context.queue_index = 0;
218 config_context.size = configuration_.span().size();
219 config_context.data = configuration_.span().data();
220 WriteMessage(configuration_id_, &configuration_channel_.message(),
James Kuszmaul36a25f42022-10-28 10:18:00 -0700221 config_context, &current_chunks_[configuration_id_]);
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700222 });
223 }
224
James Kuszmaulb3fba252022-04-06 15:13:31 -0700225 std::vector<SummaryOffset> offsets;
226
227 const uint64_t schema_offset = output_.tellp();
228
229 for (const auto &pair : channels) {
230 WriteSchema(pair.first, pair.second);
231 }
232
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700233 WriteSchema(configuration_id_, &configuration_channel_.message());
234
James Kuszmaulb3fba252022-04-06 15:13:31 -0700235 const uint64_t channel_offset = output_.tellp();
236
237 offsets.push_back(
238 {OpCode::kSchema, schema_offset, channel_offset - schema_offset});
239
240 for (const auto &pair : channels) {
241 // Write out the channel entry that uses the schema (we just re-use
242 // the schema ID for the channel ID, since we aren't deduplicating
243 // schemas for channels that are of the same type).
244 WriteChannel(pair.first, pair.first, pair.second);
245 }
246
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700247 // Provide the configuration message on a special channel that is just named
248 // "configuration", which is guaranteed not to conflict with existing under
249 // our current naming scheme (since our current scheme will, at a minimum, put
250 // a space between the name/type of a channel).
251 WriteChannel(configuration_id_, configuration_id_,
252 &configuration_channel_.message(), "configuration");
253
James Kuszmaulb3fba252022-04-06 15:13:31 -0700254 offsets.push_back({OpCode::kChannel, channel_offset,
255 static_cast<uint64_t>(output_.tellp()) - channel_offset});
256 return offsets;
257}
258
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700259void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
260
261void McapLogger::WriteHeader() {
262 string_builder_.Reset();
263 // "profile"
264 AppendString(&string_builder_, "x-aos");
265 // "library"
266 AppendString(&string_builder_, "AOS MCAP converter");
267 WriteRecord(OpCode::kHeader, string_builder_.Result());
268}
269
James Kuszmaulb3fba252022-04-06 15:13:31 -0700270void McapLogger::WriteFooter(uint64_t summary_offset,
271 uint64_t summary_offset_offset) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700272 string_builder_.Reset();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700273 AppendInt64(&string_builder_, summary_offset);
274 AppendInt64(&string_builder_, summary_offset_offset);
275 // CRC32 for the Summary section, which we don't bother populating.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700276 AppendInt32(&string_builder_, 0);
277 WriteRecord(OpCode::kFooter, string_builder_.Result());
278}
279
280void McapLogger::WriteDataEnd() {
281 string_builder_.Reset();
282 // CRC32 for the data, which we are too lazy to calculate.
283 AppendInt32(&string_builder_, 0);
284 WriteRecord(OpCode::kDataEnd, string_builder_.Result());
285}
286
287void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
288 CHECK(channel->has_schema());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700289
290 const FlatbufferDetachedBuffer<reflection::Schema> schema =
291 CopyFlatBuffer(channel->schema());
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700292
293 // Write out the schema (we don't bother deduplicating schema types):
294 string_builder_.Reset();
295 // Schema ID
296 AppendInt16(&string_builder_, id);
297 // Type name
298 AppendString(&string_builder_, channel->type()->string_view());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700299 switch (serialization_) {
300 case Serialization::kJson:
301 // Encoding
302 AppendString(&string_builder_, "jsonschema");
303 // Actual schema itself
304 AppendString(&string_builder_,
305 JsonSchemaForFlatbuffer({channel->schema()}).dump());
306 break;
307 case Serialization::kFlatbuffer:
308 // Encoding
309 AppendString(&string_builder_, "flatbuffer");
310 // Actual schema itself
311 AppendString(&string_builder_,
312 {reinterpret_cast<const char *>(schema.span().data()),
313 schema.span().size()});
314 break;
315 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700316 WriteRecord(OpCode::kSchema, string_builder_.Result());
317}
318
319void McapLogger::WriteChannel(const uint16_t id, const uint16_t schema_id,
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700320 const aos::Channel *channel,
321 std::string_view override_name) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700322 string_builder_.Reset();
323 // Channel ID
324 AppendInt16(&string_builder_, id);
325 // Schema ID
326 AppendInt16(&string_builder_, schema_id);
327 // Topic name
James Kuszmaul9f607c62022-10-27 17:01:55 -0700328 std::string topic_name(override_name);
329 if (topic_name.empty()) {
330 switch (canonical_channels_) {
331 case CanonicalChannelNames::kCanonical:
332 topic_name = absl::StrCat(channel->name()->string_view(), " ",
333 channel->type()->string_view());
334 break;
335 case CanonicalChannelNames::kShortened: {
336 std::set<std::string> names = configuration::GetChannelAliases(
337 event_loop_->configuration(), channel, event_loop_->name(),
338 event_loop_->node());
339 std::string_view shortest_name;
340 for (const std::string &name : names) {
341 if (shortest_name.empty() || name.size() < shortest_name.size()) {
342 shortest_name = name;
343 }
344 }
345 if (shortest_name != channel->name()->string_view()) {
346 VLOG(1) << "Shortening " << channel->name()->string_view() << " "
347 << channel->type()->string_view() << " to " << shortest_name;
348 }
349 topic_name = absl::StrCat(shortest_name, " ",
350 channel->type()->string_view());
351 break;
352 }
353 }
354 }
355 AppendString(&string_builder_, topic_name);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700356 // Encoding
James Kuszmaulc31d7362022-05-27 14:20:04 -0700357 switch (serialization_) {
358 case Serialization::kJson:
359 AppendString(&string_builder_, "json");
360 break;
361 case Serialization::kFlatbuffer:
362 AppendString(&string_builder_, "flatbuffer");
363 break;
364 }
365
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700366 // Metadata (technically supposed to be a Map<string, string>)
367 AppendString(&string_builder_, "");
368 WriteRecord(OpCode::kChannel, string_builder_.Result());
369}
370
371void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
James Kuszmaul36a25f42022-10-28 10:18:00 -0700372 const Context &context, ChunkStatus *chunk) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700373 CHECK_NOTNULL(context.data);
374
James Kuszmaulb3fba252022-04-06 15:13:31 -0700375 message_counts_[channel_id]++;
376
377 if (!earliest_message_.has_value()) {
378 earliest_message_ = context.monotonic_event_time;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700379 } else {
380 earliest_message_ =
381 std::min(context.monotonic_event_time, earliest_message_.value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700382 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700383 if (!chunk->earliest_message.has_value()) {
384 chunk->earliest_message = context.monotonic_event_time;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700385 } else {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700386 chunk->earliest_message =
387 std::min(context.monotonic_event_time, chunk->earliest_message.value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700388 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700389 chunk->latest_message = context.monotonic_event_time;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700390 latest_message_ = context.monotonic_event_time;
391
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700392 string_builder_.Reset();
393 // Channel ID
394 AppendInt16(&string_builder_, channel_id);
395 // Queue Index
396 AppendInt32(&string_builder_, context.queue_index);
397 // Log time, and publish time. Since we don't log a logged time, just use
398 // published time.
399 // TODO(james): If we use this for multi-node logfiles, use distributed clock.
400 AppendInt64(&string_builder_,
401 context.monotonic_event_time.time_since_epoch().count());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700402 // Note: Foxglove Studio doesn't appear to actually support using publish time
403 // right now.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700404 AppendInt64(&string_builder_,
405 context.monotonic_event_time.time_since_epoch().count());
406
407 CHECK(flatbuffers::Verify(*channel->schema(),
408 *channel->schema()->root_table(),
409 static_cast<const uint8_t *>(context.data),
410 static_cast<size_t>(context.size)))
411 << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
412 << channel->type()->c_str();
413
James Kuszmaulc31d7362022-05-27 14:20:04 -0700414 switch (serialization_) {
415 case Serialization::kJson:
416 aos::FlatbufferToJson(&string_builder_, channel->schema(),
417 static_cast<const uint8_t *>(context.data));
418 break;
419 case Serialization::kFlatbuffer:
420 string_builder_.Append(
421 {static_cast<const char *>(context.data), context.size});
422 break;
423 }
424 total_message_bytes_ += context.size;
425 total_channel_bytes_[channel] += context.size;
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700426
James Kuszmaul36a25f42022-10-28 10:18:00 -0700427 chunk->message_indices[channel_id].push_back(
428 std::make_pair<uint64_t, uint64_t>(
429 context.monotonic_event_time.time_since_epoch().count(),
430 chunk->data.tellp()));
James Kuszmaulb3fba252022-04-06 15:13:31 -0700431
James Kuszmaul36a25f42022-10-28 10:18:00 -0700432 WriteRecord(OpCode::kMessage, string_builder_.Result(), &chunk->data);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700433}
434
James Kuszmaulb3fba252022-04-06 15:13:31 -0700435void McapLogger::WriteRecord(OpCode op, std::string_view record,
436 std::ostream *ostream) {
437 ostream->put(static_cast<char>(op));
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700438 uint64_t record_length = record.size();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700439 ostream->write(reinterpret_cast<const char *>(&record_length),
440 sizeof(record_length));
441 *ostream << record;
442}
443
James Kuszmaul36a25f42022-10-28 10:18:00 -0700444void McapLogger::WriteChunk(ChunkStatus *chunk) {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700445 string_builder_.Reset();
446
James Kuszmaul36a25f42022-10-28 10:18:00 -0700447 CHECK(chunk->earliest_message.has_value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700448 const uint64_t chunk_offset = output_.tellp();
449 AppendInt64(&string_builder_,
James Kuszmaul36a25f42022-10-28 10:18:00 -0700450 chunk->earliest_message->time_since_epoch().count());
451 CHECK(chunk->latest_message.has_value());
452 AppendInt64(&string_builder_,
453 chunk->latest_message.value().time_since_epoch().count());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700454
James Kuszmaul36a25f42022-10-28 10:18:00 -0700455 std::string chunk_records = chunk->data.str();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700456 // Reset the chunk buffer.
James Kuszmaul36a25f42022-10-28 10:18:00 -0700457 chunk->data.str("");
James Kuszmaulb3fba252022-04-06 15:13:31 -0700458
459 const uint64_t records_size = chunk_records.size();
460 // Uncompressed chunk size.
461 AppendInt64(&string_builder_, records_size);
462 // Uncompressed CRC (unpopulated).
463 AppendInt32(&string_builder_, 0);
464 AppendString(&string_builder_, "");
465 AppendBytes(&string_builder_, chunk_records);
466 WriteRecord(OpCode::kChunk, string_builder_.Result());
467
468 std::map<uint16_t, uint64_t> index_offsets;
469 const uint64_t message_index_start = output_.tellp();
James Kuszmaul36a25f42022-10-28 10:18:00 -0700470 for (const auto &indices : chunk->message_indices) {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700471 index_offsets[indices.first] = output_.tellp();
472 string_builder_.Reset();
473 AppendInt16(&string_builder_, indices.first);
474 AppendMessageIndices(&string_builder_, indices.second);
475 WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
476 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700477 chunk->message_indices.clear();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700478 chunk_indices_.push_back(ChunkIndex{
James Kuszmaul36a25f42022-10-28 10:18:00 -0700479 chunk->earliest_message.value(), chunk->latest_message.value(), chunk_offset,
James Kuszmaulb3fba252022-04-06 15:13:31 -0700480 message_index_start - chunk_offset, records_size, index_offsets,
481 static_cast<uint64_t>(output_.tellp()) - message_index_start});
James Kuszmaul36a25f42022-10-28 10:18:00 -0700482 chunk->earliest_message.reset();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700483}
484
485McapLogger::SummaryOffset McapLogger::WriteStatistics() {
486 const uint64_t stats_offset = output_.tellp();
487 const uint64_t message_count = std::accumulate(
488 message_counts_.begin(), message_counts_.end(), 0,
489 [](const uint64_t &count, const std::pair<uint16_t, uint64_t> &val) {
490 return count + val.second;
491 });
492 string_builder_.Reset();
493 AppendInt64(&string_builder_, message_count);
494 // Schema count.
495 AppendInt16(&string_builder_, message_counts_.size());
496 // Channel count.
497 AppendInt32(&string_builder_, message_counts_.size());
498 // Attachment count.
499 AppendInt32(&string_builder_, 0);
500 // Metadata count.
501 AppendInt32(&string_builder_, 0);
502 // Chunk count.
503 AppendInt32(&string_builder_, chunk_indices_.size());
504 // Earliest & latest message times.
505 AppendInt64(&string_builder_, earliest_message_->time_since_epoch().count());
506 AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
507 // Per-channel message counts.
508 AppendChannelMap(&string_builder_, message_counts_);
509 WriteRecord(OpCode::kStatistics, string_builder_.Result());
510 return {OpCode::kStatistics, stats_offset,
511 static_cast<uint64_t>(output_.tellp()) - stats_offset};
512}
513
514McapLogger::SummaryOffset McapLogger::WriteChunkIndices() {
515 const uint64_t index_offset = output_.tellp();
516 for (const ChunkIndex &index : chunk_indices_) {
517 string_builder_.Reset();
518 AppendInt64(&string_builder_, index.start_time.time_since_epoch().count());
519 AppendInt64(&string_builder_, index.end_time.time_since_epoch().count());
520 AppendInt64(&string_builder_, index.offset);
521 AppendInt64(&string_builder_, index.chunk_size);
522 AppendChannelMap(&string_builder_, index.message_index_offsets);
523 AppendInt64(&string_builder_, index.message_index_size);
524 // Compression used.
525 AppendString(&string_builder_, "");
526 // Compressed and uncompressed records size.
527 AppendInt64(&string_builder_, index.records_size);
528 AppendInt64(&string_builder_, index.records_size);
529 WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
530 }
531 return {OpCode::kChunkIndex, index_offset,
532 static_cast<uint64_t>(output_.tellp()) - index_offset};
533}
534
535void McapLogger::WriteSummaryOffset(const SummaryOffset &offset) {
536 string_builder_.Reset();
537 string_builder_.AppendChar(static_cast<char>(offset.op_code));
538 AppendInt64(&string_builder_, offset.offset);
539 AppendInt64(&string_builder_, offset.size);
540 WriteRecord(OpCode::kSummaryOffset, string_builder_.Result());
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700541}
542
543void McapLogger::AppendString(FastStringBuilder *builder,
544 std::string_view string) {
545 AppendInt32(builder, string.size());
546 builder->Append(string);
547}
548
James Kuszmaulb3fba252022-04-06 15:13:31 -0700549void McapLogger::AppendBytes(FastStringBuilder *builder,
550 std::string_view bytes) {
551 AppendInt64(builder, bytes.size());
552 builder->Append(bytes);
553}
554
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700555namespace {
556template <typename T>
557static void AppendInt(FastStringBuilder *builder, T val) {
558 builder->Append(
559 std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
560}
James Kuszmaulb3fba252022-04-06 15:13:31 -0700561template <typename T>
562void AppendMap(FastStringBuilder *builder, const T &map) {
563 AppendInt<uint32_t>(
564 builder, map.size() * (sizeof(typename T::value_type::first_type) +
565 sizeof(typename T::value_type::second_type)));
566 for (const auto &pair : map) {
567 AppendInt(builder, pair.first);
568 AppendInt(builder, pair.second);
569 }
570}
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700571} // namespace
572
James Kuszmaulb3fba252022-04-06 15:13:31 -0700573void McapLogger::AppendChannelMap(FastStringBuilder *builder,
574 const std::map<uint16_t, uint64_t> &map) {
575 AppendMap(builder, map);
576}
577
578void McapLogger::AppendMessageIndices(
579 FastStringBuilder *builder,
580 const std::vector<std::pair<uint64_t, uint64_t>> &messages) {
581 AppendMap(builder, messages);
582}
583
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700584void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
585 AppendInt(builder, val);
586}
587
588void McapLogger::AppendInt32(FastStringBuilder *builder, uint32_t val) {
589 AppendInt(builder, val);
590}
591
592void McapLogger::AppendInt64(FastStringBuilder *builder, uint64_t val) {
593 AppendInt(builder, val);
594}
595} // namespace aos