blob: 0dbb88eec85322435f59cbecdc130665a1526a1e [file] [log] [blame]
James Kuszmaul4ed5fb12022-03-22 15:20:04 -07001#include "aos/util/mcap_logger.h"
2
Stephan Pleinesb1177672024-05-27 17:48:32 -07003#include <algorithm>
4#include <chrono>
5#include <numeric>
6#include <ostream>
7#include <set>
8
9#include "absl/strings/str_cat.h"
10#include "absl/types/span.h"
11#include "flatbuffers/buffer.h"
12#include "flatbuffers/detached_buffer.h"
13#include "flatbuffers/flatbuffer_builder.h"
14#include "flatbuffers/reflection.h"
15#include "flatbuffers/reflection_generated.h"
16#include "flatbuffers/string.h"
17#include "flatbuffers/vector.h"
18#include "gflags/gflags.h"
19#include "glog/logging.h"
20#include "glog/vlog_is_on.h"
James Kuszmaul5ab990d2022-11-07 16:35:49 -080021#include "lz4/lz4frame.h"
Adam Snaider13d48d92023-08-03 12:20:15 -070022#include "nlohmann/json.hpp"
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070023
Stephan Pleinesb1177672024-05-27 17:48:32 -070024#include "aos/configuration.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070025#include "aos/configuration_schema.h"
26#include "aos/flatbuffer_merge.h"
Stephan Pleinesb1177672024-05-27 17:48:32 -070027#include "aos/json_to_flatbuffer.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070028
James Kuszmaulc31d7362022-05-27 14:20:04 -070029DEFINE_uint64(mcap_chunk_size, 10'000'000,
James Kuszmaul5c56ed32022-03-30 15:10:07 -070030 "Size, in bytes, of individual MCAP chunks");
James Kuszmaulc31d7362022-05-27 14:20:04 -070031DEFINE_bool(fetch, false,
32 "Whether to fetch most recent messages at start of logfile. Turn "
33 "this on if there are, e.g., one-time messages sent before the "
34 "start of the logfile that you need access to. Turn it off if you "
35 "don't want to deal with having messages that have timestamps that "
36 "may be arbitrarily far before any other interesting messages.");
James Kuszmaul5c56ed32022-03-30 15:10:07 -070037
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070038namespace aos {
James Kuszmaulb3fba252022-04-06 15:13:31 -070039
James Kuszmaul4ed5fb12022-03-22 15:20:04 -070040nlohmann::json JsonSchemaForFlatbuffer(const FlatbufferType &type,
41 JsonSchemaRecursion recursion_level) {
42 nlohmann::json schema;
43 if (recursion_level == JsonSchemaRecursion::kTopLevel) {
44 schema["$schema"] = "https://json-schema.org/draft/2020-12/schema";
45 }
46 schema["type"] = "object";
47 nlohmann::json properties;
48 for (int index = 0; index < type.NumberFields(); ++index) {
49 nlohmann::json field;
50 const bool is_array = type.FieldIsRepeating(index);
51 if (type.FieldIsSequence(index)) {
52 // For sub-tables/structs, just recurse.
53 nlohmann::json subtype = JsonSchemaForFlatbuffer(
54 type.FieldType(index), JsonSchemaRecursion::kNested);
55 if (is_array) {
56 field["type"] = "array";
57 field["items"] = subtype;
58 } else {
59 field = subtype;
60 }
61 } else {
62 std::string elementary_type;
63 switch (type.FieldElementaryType(index)) {
64 case flatbuffers::ET_UTYPE:
65 case flatbuffers::ET_CHAR:
66 case flatbuffers::ET_UCHAR:
67 case flatbuffers::ET_SHORT:
68 case flatbuffers::ET_USHORT:
69 case flatbuffers::ET_INT:
70 case flatbuffers::ET_UINT:
71 case flatbuffers::ET_LONG:
72 case flatbuffers::ET_ULONG:
73 case flatbuffers::ET_FLOAT:
74 case flatbuffers::ET_DOUBLE:
75 elementary_type = "number";
76 break;
77 case flatbuffers::ET_BOOL:
78 elementary_type = "boolean";
79 break;
80 case flatbuffers::ET_STRING:
81 elementary_type = "string";
82 break;
83 case flatbuffers::ET_SEQUENCE:
84 if (type.FieldIsEnum(index)) {
85 elementary_type = "string";
86 } else {
87 LOG(FATAL) << "Should not encounter any sequence fields here.";
88 }
89 break;
90 }
91 if (is_array) {
92 field["type"] = "array";
93 field["items"]["type"] = elementary_type;
94 } else {
95 field["type"] = elementary_type;
96 }
97 }
98 // the nlohmann::json [] operator needs an actual string, not just a
99 // string_view :(.
100 properties[std::string(type.FieldName(index))] = field;
101 }
102 schema["properties"] = properties;
103 return schema;
104}
105
James Kuszmaul1e418f62023-02-26 14:40:20 -0800106std::string ShortenedChannelName(const aos::Configuration *config,
107 const aos::Channel *channel,
108 std::string_view application_name,
109 const aos::Node *node) {
110 std::set<std::string> names =
111 configuration::GetChannelAliases(config, channel, application_name, node);
112 std::string_view shortest_name;
113 for (const std::string &name : names) {
114 if (shortest_name.empty() || name.size() < shortest_name.size()) {
115 shortest_name = name;
116 }
117 }
118 return std::string(shortest_name);
119}
120
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800121namespace {
122std::string_view CompressionName(McapLogger::Compression compression) {
123 switch (compression) {
124 case McapLogger::Compression::kNone:
125 return "";
126 case McapLogger::Compression::kLz4:
127 return "lz4";
128 }
129 LOG(FATAL) << "Unreachable.";
130}
131} // namespace
132
James Kuszmaulc31d7362022-05-27 14:20:04 -0700133McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path,
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800134 Serialization serialization,
135 CanonicalChannelNames canonical_channels,
136 Compression compression)
James Kuszmaulc31d7362022-05-27 14:20:04 -0700137 : event_loop_(event_loop),
138 output_(output_path),
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700139 serialization_(serialization),
James Kuszmaul9f607c62022-10-27 17:01:55 -0700140 canonical_channels_(canonical_channels),
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800141 compression_(compression),
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700142 configuration_channel_([]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700143 // Set up a fake Channel for providing the configuration in the MCAP
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700144 // file. This is included for convenience so that consumers of the MCAP
145 // file can actually dereference things like the channel indices in AOS
146 // timing reports.
147 flatbuffers::FlatBufferBuilder fbb;
148 flatbuffers::Offset<flatbuffers::String> name_offset =
149 fbb.CreateString("");
150 flatbuffers::Offset<flatbuffers::String> type_offset =
151 fbb.CreateString("aos.Configuration");
152 flatbuffers::Offset<reflection::Schema> schema_offset =
153 aos::CopyFlatBuffer(
154 aos::FlatbufferSpan<reflection::Schema>(ConfigurationSchema()),
155 &fbb);
156 Channel::Builder channel(fbb);
157 channel.add_name(name_offset);
158 channel.add_type(type_offset);
159 channel.add_schema(schema_offset);
160 fbb.Finish(channel.Finish());
161 return fbb.Release();
162 }()),
163 configuration_(CopyFlatBuffer(event_loop_->configuration())) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700164 event_loop->SkipTimingReport();
165 event_loop->SkipAosLog();
166 CHECK(output_);
167 WriteMagic();
168 WriteHeader();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700169 // Schemas and channels get written out both at the start and end of the file,
170 // per the MCAP spec.
171 WriteSchemasAndChannels(RegisterHandlers::kYes);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700172}
173
174McapLogger::~McapLogger() {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700175 // If we have any data remaining, write one last chunk.
James Kuszmaul36a25f42022-10-28 10:18:00 -0700176 for (auto &pair : current_chunks_) {
177 if (pair.second.data.tellp() > 0) {
178 WriteChunk(&pair.second);
179 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700180 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700181 WriteDataEnd();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700182
183 // Now we enter the Summary section, where we write out all the channel/index
184 // information that readers need to be able to seek to arbitrary locations
185 // within the log.
186 const uint64_t summary_offset = output_.tellp();
187 const SummaryOffset chunk_indices_offset = WriteChunkIndices();
188 const SummaryOffset stats_offset = WriteStatistics();
189 // Schemas/Channels need to get reproduced in the summary section for random
190 // access reading.
191 const std::vector<SummaryOffset> offsets =
192 WriteSchemasAndChannels(RegisterHandlers::kNo);
193
194 // Next we have the summary offset section, which references the individual
195 // pieces of the summary section.
196 const uint64_t summary_offset_offset = output_.tellp();
197
198 // SummarytOffset's must all be the final thing before the footer.
199 WriteSummaryOffset(chunk_indices_offset);
200 WriteSummaryOffset(stats_offset);
201 for (const auto &offset : offsets) {
202 WriteSummaryOffset(offset);
203 }
204
205 // And finally, the footer which must itself reference the start of the
206 // summary and summary offset sections.
207 WriteFooter(summary_offset, summary_offset_offset);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700208 WriteMagic();
James Kuszmaulc31d7362022-05-27 14:20:04 -0700209
210 // TODO(james): Add compression. With flatbuffers messages that contain large
211 // numbers of zeros (e.g., large grids or thresholded images) this can result
212 // in massive savings.
213 if (VLOG_IS_ON(2)) {
214 // For debugging, print out how much space each channel is taking in the
215 // overall log.
216 LOG(INFO) << total_message_bytes_;
217 std::vector<std::pair<size_t, const Channel *>> channel_bytes;
218 for (const auto &pair : total_channel_bytes_) {
219 channel_bytes.push_back(std::make_pair(pair.second, pair.first));
220 }
221 std::sort(channel_bytes.begin(), channel_bytes.end());
222 for (const auto &pair : channel_bytes) {
223 LOG(INFO) << configuration::StrippedChannelToString(pair.second) << ": "
224 << static_cast<float>(pair.first) * 1e-6 << "MB "
225 << static_cast<float>(pair.first) / total_message_bytes_
226 << "\n";
227 }
228 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700229}
230
James Kuszmaulb3fba252022-04-06 15:13:31 -0700231std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
232 RegisterHandlers register_handlers) {
James Kuszmaulc31d7362022-05-27 14:20:04 -0700233 uint16_t id = 0;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700234 std::map<uint16_t, const Channel *> channels;
235 for (const Channel *channel : *event_loop_->configuration()->channels()) {
James Kuszmaulc31d7362022-05-27 14:20:04 -0700236 ++id;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700237 if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
238 continue;
239 }
240 channels[id] = channel;
241
242 if (register_handlers == RegisterHandlers::kYes) {
243 message_counts_[id] = 0;
244 event_loop_->MakeRawWatcher(
245 channel, [this, id, channel](const Context &context, const void *) {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700246 ChunkStatus *chunk = &current_chunks_[id];
247 WriteMessage(id, channel, context, chunk);
248 if (static_cast<uint64_t>(chunk->data.tellp()) >
James Kuszmaul5c56ed32022-03-30 15:10:07 -0700249 FLAGS_mcap_chunk_size) {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700250 WriteChunk(chunk);
James Kuszmaulb3fba252022-04-06 15:13:31 -0700251 }
252 });
James Kuszmaulc31d7362022-05-27 14:20:04 -0700253 fetchers_[id] = event_loop_->MakeRawFetcher(channel);
254 event_loop_->OnRun([this, id, channel]() {
255 if (FLAGS_fetch && fetchers_[id]->Fetch()) {
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800256 WriteMessage(id, channel, fetchers_[id]->context(),
257 &current_chunks_[id]);
James Kuszmaulc31d7362022-05-27 14:20:04 -0700258 }
259 });
James Kuszmaulb3fba252022-04-06 15:13:31 -0700260 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700261 }
262
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700263 // Manually add in a special /configuration channel.
264 if (register_handlers == RegisterHandlers::kYes) {
265 configuration_id_ = ++id;
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700266 }
267
James Kuszmaulb3fba252022-04-06 15:13:31 -0700268 std::vector<SummaryOffset> offsets;
269
270 const uint64_t schema_offset = output_.tellp();
271
272 for (const auto &pair : channels) {
273 WriteSchema(pair.first, pair.second);
274 }
275
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700276 WriteSchema(configuration_id_, &configuration_channel_.message());
277
James Kuszmaulb3fba252022-04-06 15:13:31 -0700278 const uint64_t channel_offset = output_.tellp();
279
280 offsets.push_back(
281 {OpCode::kSchema, schema_offset, channel_offset - schema_offset});
282
283 for (const auto &pair : channels) {
284 // Write out the channel entry that uses the schema (we just re-use
285 // the schema ID for the channel ID, since we aren't deduplicating
286 // schemas for channels that are of the same type).
287 WriteChannel(pair.first, pair.first, pair.second);
288 }
289
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700290 // Provide the configuration message on a special channel that is just named
291 // "configuration", which is guaranteed not to conflict with existing under
292 // our current naming scheme (since our current scheme will, at a minimum, put
293 // a space between the name/type of a channel).
294 WriteChannel(configuration_id_, configuration_id_,
295 &configuration_channel_.message(), "configuration");
296
James Kuszmaulb3fba252022-04-06 15:13:31 -0700297 offsets.push_back({OpCode::kChannel, channel_offset,
298 static_cast<uint64_t>(output_.tellp()) - channel_offset});
299 return offsets;
300}
301
James Kuszmaulbed2af02023-01-28 15:57:24 -0800302void McapLogger::WriteConfigurationMessage() {
303 Context config_context;
304 config_context.monotonic_event_time = event_loop_->monotonic_now();
305 config_context.queue_index = 0;
306 config_context.size = configuration_.span().size();
307 config_context.data = configuration_.span().data();
308 // Avoid infinite recursion...
309 wrote_configuration_ = true;
310 WriteMessage(configuration_id_, &configuration_channel_.message(),
311 config_context, &current_chunks_[configuration_id_]);
312}
313
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700314void McapLogger::WriteMagic() { output_ << "\x89MCAP0\r\n"; }
315
316void McapLogger::WriteHeader() {
317 string_builder_.Reset();
318 // "profile"
319 AppendString(&string_builder_, "x-aos");
320 // "library"
321 AppendString(&string_builder_, "AOS MCAP converter");
322 WriteRecord(OpCode::kHeader, string_builder_.Result());
323}
324
James Kuszmaulb3fba252022-04-06 15:13:31 -0700325void McapLogger::WriteFooter(uint64_t summary_offset,
326 uint64_t summary_offset_offset) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700327 string_builder_.Reset();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700328 AppendInt64(&string_builder_, summary_offset);
329 AppendInt64(&string_builder_, summary_offset_offset);
330 // CRC32 for the Summary section, which we don't bother populating.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700331 AppendInt32(&string_builder_, 0);
332 WriteRecord(OpCode::kFooter, string_builder_.Result());
333}
334
335void McapLogger::WriteDataEnd() {
336 string_builder_.Reset();
337 // CRC32 for the data, which we are too lazy to calculate.
338 AppendInt32(&string_builder_, 0);
339 WriteRecord(OpCode::kDataEnd, string_builder_.Result());
340}
341
342void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
343 CHECK(channel->has_schema());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700344
345 const FlatbufferDetachedBuffer<reflection::Schema> schema =
James Kuszmaulf1dbaff2023-02-08 21:17:32 -0800346 RecursiveCopyFlatBuffer(channel->schema());
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700347
348 // Write out the schema (we don't bother deduplicating schema types):
349 string_builder_.Reset();
350 // Schema ID
351 AppendInt16(&string_builder_, id);
352 // Type name
353 AppendString(&string_builder_, channel->type()->string_view());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700354 switch (serialization_) {
355 case Serialization::kJson:
356 // Encoding
357 AppendString(&string_builder_, "jsonschema");
358 // Actual schema itself
359 AppendString(&string_builder_,
360 JsonSchemaForFlatbuffer({channel->schema()}).dump());
361 break;
362 case Serialization::kFlatbuffer:
363 // Encoding
364 AppendString(&string_builder_, "flatbuffer");
365 // Actual schema itself
366 AppendString(&string_builder_,
367 {reinterpret_cast<const char *>(schema.span().data()),
368 schema.span().size()});
369 break;
370 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700371 WriteRecord(OpCode::kSchema, string_builder_.Result());
372}
373
374void McapLogger::WriteChannel(const uint16_t id, const uint16_t schema_id,
James Kuszmaule4aa01d2022-06-28 14:09:02 -0700375 const aos::Channel *channel,
376 std::string_view override_name) {
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700377 string_builder_.Reset();
378 // Channel ID
379 AppendInt16(&string_builder_, id);
380 // Schema ID
381 AppendInt16(&string_builder_, schema_id);
382 // Topic name
James Kuszmaul9f607c62022-10-27 17:01:55 -0700383 std::string topic_name(override_name);
384 if (topic_name.empty()) {
385 switch (canonical_channels_) {
386 case CanonicalChannelNames::kCanonical:
387 topic_name = absl::StrCat(channel->name()->string_view(), " ",
388 channel->type()->string_view());
389 break;
390 case CanonicalChannelNames::kShortened: {
James Kuszmaul1e418f62023-02-26 14:40:20 -0800391 const std::string shortest_name =
392 ShortenedChannelName(event_loop_->configuration(), channel,
393 event_loop_->name(), event_loop_->node());
James Kuszmaul9f607c62022-10-27 17:01:55 -0700394 if (shortest_name != channel->name()->string_view()) {
395 VLOG(1) << "Shortening " << channel->name()->string_view() << " "
396 << channel->type()->string_view() << " to " << shortest_name;
397 }
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800398 topic_name =
399 absl::StrCat(shortest_name, " ", channel->type()->string_view());
James Kuszmaul9f607c62022-10-27 17:01:55 -0700400 break;
401 }
402 }
403 }
404 AppendString(&string_builder_, topic_name);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700405 // Encoding
James Kuszmaulc31d7362022-05-27 14:20:04 -0700406 switch (serialization_) {
407 case Serialization::kJson:
408 AppendString(&string_builder_, "json");
409 break;
410 case Serialization::kFlatbuffer:
411 AppendString(&string_builder_, "flatbuffer");
412 break;
413 }
414
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700415 // Metadata (technically supposed to be a Map<string, string>)
416 AppendString(&string_builder_, "");
417 WriteRecord(OpCode::kChannel, string_builder_.Result());
418}
419
420void McapLogger::WriteMessage(uint16_t channel_id, const Channel *channel,
James Kuszmaul36a25f42022-10-28 10:18:00 -0700421 const Context &context, ChunkStatus *chunk) {
James Kuszmaulbed2af02023-01-28 15:57:24 -0800422 if (!wrote_configuration_) {
423 WriteConfigurationMessage();
424 }
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700425 CHECK_NOTNULL(context.data);
426
James Kuszmaulb3fba252022-04-06 15:13:31 -0700427 message_counts_[channel_id]++;
428
429 if (!earliest_message_.has_value()) {
430 earliest_message_ = context.monotonic_event_time;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700431 } else {
432 earliest_message_ =
433 std::min(context.monotonic_event_time, earliest_message_.value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700434 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700435 if (!chunk->earliest_message.has_value()) {
436 chunk->earliest_message = context.monotonic_event_time;
James Kuszmaulc31d7362022-05-27 14:20:04 -0700437 } else {
James Kuszmaul36a25f42022-10-28 10:18:00 -0700438 chunk->earliest_message =
439 std::min(context.monotonic_event_time, chunk->earliest_message.value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700440 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700441 chunk->latest_message = context.monotonic_event_time;
James Kuszmaulb3fba252022-04-06 15:13:31 -0700442 latest_message_ = context.monotonic_event_time;
443
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700444 string_builder_.Reset();
445 // Channel ID
446 AppendInt16(&string_builder_, channel_id);
447 // Queue Index
448 AppendInt32(&string_builder_, context.queue_index);
449 // Log time, and publish time. Since we don't log a logged time, just use
450 // published time.
451 // TODO(james): If we use this for multi-node logfiles, use distributed clock.
452 AppendInt64(&string_builder_,
453 context.monotonic_event_time.time_since_epoch().count());
James Kuszmaulc31d7362022-05-27 14:20:04 -0700454 // Note: Foxglove Studio doesn't appear to actually support using publish time
455 // right now.
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700456 AppendInt64(&string_builder_,
457 context.monotonic_event_time.time_since_epoch().count());
458
459 CHECK(flatbuffers::Verify(*channel->schema(),
460 *channel->schema()->root_table(),
461 static_cast<const uint8_t *>(context.data),
462 static_cast<size_t>(context.size)))
463 << ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
464 << channel->type()->c_str();
465
James Kuszmaulc31d7362022-05-27 14:20:04 -0700466 switch (serialization_) {
467 case Serialization::kJson:
468 aos::FlatbufferToJson(&string_builder_, channel->schema(),
469 static_cast<const uint8_t *>(context.data));
470 break;
471 case Serialization::kFlatbuffer:
472 string_builder_.Append(
473 {static_cast<const char *>(context.data), context.size});
474 break;
475 }
476 total_message_bytes_ += context.size;
477 total_channel_bytes_[channel] += context.size;
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700478
James Kuszmaul36a25f42022-10-28 10:18:00 -0700479 chunk->message_indices[channel_id].push_back(
480 std::make_pair<uint64_t, uint64_t>(
481 context.monotonic_event_time.time_since_epoch().count(),
482 chunk->data.tellp()));
James Kuszmaulb3fba252022-04-06 15:13:31 -0700483
James Kuszmaul36a25f42022-10-28 10:18:00 -0700484 WriteRecord(OpCode::kMessage, string_builder_.Result(), &chunk->data);
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700485}
486
James Kuszmaulb3fba252022-04-06 15:13:31 -0700487void McapLogger::WriteRecord(OpCode op, std::string_view record,
488 std::ostream *ostream) {
489 ostream->put(static_cast<char>(op));
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700490 uint64_t record_length = record.size();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700491 ostream->write(reinterpret_cast<const char *>(&record_length),
492 sizeof(record_length));
493 *ostream << record;
494}
495
James Kuszmaul36a25f42022-10-28 10:18:00 -0700496void McapLogger::WriteChunk(ChunkStatus *chunk) {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700497 string_builder_.Reset();
498
James Kuszmaul36a25f42022-10-28 10:18:00 -0700499 CHECK(chunk->earliest_message.has_value());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700500 const uint64_t chunk_offset = output_.tellp();
501 AppendInt64(&string_builder_,
James Kuszmaul36a25f42022-10-28 10:18:00 -0700502 chunk->earliest_message->time_since_epoch().count());
503 CHECK(chunk->latest_message.has_value());
504 AppendInt64(&string_builder_,
505 chunk->latest_message.value().time_since_epoch().count());
James Kuszmaulb3fba252022-04-06 15:13:31 -0700506
James Kuszmaul36a25f42022-10-28 10:18:00 -0700507 std::string chunk_records = chunk->data.str();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700508 // Reset the chunk buffer.
James Kuszmaul36a25f42022-10-28 10:18:00 -0700509 chunk->data.str("");
James Kuszmaulb3fba252022-04-06 15:13:31 -0700510
511 const uint64_t records_size = chunk_records.size();
512 // Uncompressed chunk size.
513 AppendInt64(&string_builder_, records_size);
514 // Uncompressed CRC (unpopulated).
515 AppendInt32(&string_builder_, 0);
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800516 // Compression
517 AppendString(&string_builder_, CompressionName(compression_));
518 uint64_t records_size_compressed = records_size;
519 switch (compression_) {
520 case Compression::kNone:
521 AppendBytes(&string_builder_, chunk_records);
522 break;
523 case Compression::kLz4: {
524 // Default preferences.
525 LZ4F_preferences_t *lz4_preferences = nullptr;
526 const uint64_t max_size =
527 LZ4F_compressFrameBound(records_size, lz4_preferences);
528 CHECK_NE(0u, max_size);
529 if (max_size > compression_buffer_.size()) {
530 compression_buffer_.resize(max_size);
531 }
532 records_size_compressed = LZ4F_compressFrame(
533 compression_buffer_.data(), compression_buffer_.size(),
534 reinterpret_cast<const char *>(chunk_records.data()),
535 chunk_records.size(), lz4_preferences);
536 CHECK(!LZ4F_isError(records_size_compressed));
537 AppendBytes(&string_builder_,
538 {reinterpret_cast<const char *>(compression_buffer_.data()),
539 static_cast<size_t>(records_size_compressed)});
540 break;
541 }
542 }
James Kuszmaulb3fba252022-04-06 15:13:31 -0700543 WriteRecord(OpCode::kChunk, string_builder_.Result());
544
545 std::map<uint16_t, uint64_t> index_offsets;
546 const uint64_t message_index_start = output_.tellp();
James Kuszmaul36a25f42022-10-28 10:18:00 -0700547 for (const auto &indices : chunk->message_indices) {
James Kuszmaulb3fba252022-04-06 15:13:31 -0700548 index_offsets[indices.first] = output_.tellp();
549 string_builder_.Reset();
550 AppendInt16(&string_builder_, indices.first);
551 AppendMessageIndices(&string_builder_, indices.second);
552 WriteRecord(OpCode::kMessageIndex, string_builder_.Result());
553 }
James Kuszmaul36a25f42022-10-28 10:18:00 -0700554 chunk->message_indices.clear();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700555 chunk_indices_.push_back(ChunkIndex{
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800556 .start_time = chunk->earliest_message.value(),
557 .end_time = chunk->latest_message.value(),
558 .offset = chunk_offset,
559 .chunk_size = message_index_start - chunk_offset,
560 .records_size = records_size,
561 .records_size_compressed = records_size_compressed,
562 .message_index_offsets = index_offsets,
563 .message_index_size =
564 static_cast<uint64_t>(output_.tellp()) - message_index_start,
565 .compression = compression_});
James Kuszmaul36a25f42022-10-28 10:18:00 -0700566 chunk->earliest_message.reset();
James Kuszmaulb3fba252022-04-06 15:13:31 -0700567}
568
569McapLogger::SummaryOffset McapLogger::WriteStatistics() {
570 const uint64_t stats_offset = output_.tellp();
571 const uint64_t message_count = std::accumulate(
572 message_counts_.begin(), message_counts_.end(), 0,
573 [](const uint64_t &count, const std::pair<uint16_t, uint64_t> &val) {
574 return count + val.second;
575 });
576 string_builder_.Reset();
577 AppendInt64(&string_builder_, message_count);
578 // Schema count.
579 AppendInt16(&string_builder_, message_counts_.size());
580 // Channel count.
581 AppendInt32(&string_builder_, message_counts_.size());
582 // Attachment count.
583 AppendInt32(&string_builder_, 0);
584 // Metadata count.
585 AppendInt32(&string_builder_, 0);
586 // Chunk count.
587 AppendInt32(&string_builder_, chunk_indices_.size());
588 // Earliest & latest message times.
589 AppendInt64(&string_builder_, earliest_message_->time_since_epoch().count());
590 AppendInt64(&string_builder_, latest_message_.time_since_epoch().count());
591 // Per-channel message counts.
592 AppendChannelMap(&string_builder_, message_counts_);
593 WriteRecord(OpCode::kStatistics, string_builder_.Result());
594 return {OpCode::kStatistics, stats_offset,
595 static_cast<uint64_t>(output_.tellp()) - stats_offset};
596}
597
598McapLogger::SummaryOffset McapLogger::WriteChunkIndices() {
599 const uint64_t index_offset = output_.tellp();
600 for (const ChunkIndex &index : chunk_indices_) {
601 string_builder_.Reset();
602 AppendInt64(&string_builder_, index.start_time.time_since_epoch().count());
603 AppendInt64(&string_builder_, index.end_time.time_since_epoch().count());
604 AppendInt64(&string_builder_, index.offset);
605 AppendInt64(&string_builder_, index.chunk_size);
606 AppendChannelMap(&string_builder_, index.message_index_offsets);
607 AppendInt64(&string_builder_, index.message_index_size);
608 // Compression used.
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800609 AppendString(&string_builder_, CompressionName(index.compression));
James Kuszmaulb3fba252022-04-06 15:13:31 -0700610 // Compressed and uncompressed records size.
James Kuszmaul5ab990d2022-11-07 16:35:49 -0800611 AppendInt64(&string_builder_, index.records_size_compressed);
James Kuszmaulb3fba252022-04-06 15:13:31 -0700612 AppendInt64(&string_builder_, index.records_size);
613 WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
614 }
615 return {OpCode::kChunkIndex, index_offset,
616 static_cast<uint64_t>(output_.tellp()) - index_offset};
617}
618
619void McapLogger::WriteSummaryOffset(const SummaryOffset &offset) {
620 string_builder_.Reset();
621 string_builder_.AppendChar(static_cast<char>(offset.op_code));
622 AppendInt64(&string_builder_, offset.offset);
623 AppendInt64(&string_builder_, offset.size);
624 WriteRecord(OpCode::kSummaryOffset, string_builder_.Result());
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700625}
626
627void McapLogger::AppendString(FastStringBuilder *builder,
628 std::string_view string) {
629 AppendInt32(builder, string.size());
630 builder->Append(string);
631}
632
James Kuszmaulb3fba252022-04-06 15:13:31 -0700633void McapLogger::AppendBytes(FastStringBuilder *builder,
634 std::string_view bytes) {
635 AppendInt64(builder, bytes.size());
636 builder->Append(bytes);
637}
638
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700639namespace {
640template <typename T>
641static void AppendInt(FastStringBuilder *builder, T val) {
642 builder->Append(
643 std::string_view(reinterpret_cast<const char *>(&val), sizeof(T)));
644}
James Kuszmaulb3fba252022-04-06 15:13:31 -0700645template <typename T>
646void AppendMap(FastStringBuilder *builder, const T &map) {
647 AppendInt<uint32_t>(
648 builder, map.size() * (sizeof(typename T::value_type::first_type) +
649 sizeof(typename T::value_type::second_type)));
650 for (const auto &pair : map) {
651 AppendInt(builder, pair.first);
652 AppendInt(builder, pair.second);
653 }
654}
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700655} // namespace
656
James Kuszmaulb3fba252022-04-06 15:13:31 -0700657void McapLogger::AppendChannelMap(FastStringBuilder *builder,
658 const std::map<uint16_t, uint64_t> &map) {
659 AppendMap(builder, map);
660}
661
662void McapLogger::AppendMessageIndices(
663 FastStringBuilder *builder,
664 const std::vector<std::pair<uint64_t, uint64_t>> &messages) {
665 AppendMap(builder, messages);
666}
667
James Kuszmaul4ed5fb12022-03-22 15:20:04 -0700668void McapLogger::AppendInt16(FastStringBuilder *builder, uint16_t val) {
669 AppendInt(builder, val);
670}
671
672void McapLogger::AppendInt32(FastStringBuilder *builder, uint32_t val) {
673 AppendInt(builder, val);
674}
675
676void McapLogger::AppendInt64(FastStringBuilder *builder, uint64_t val) {
677 AppendInt(builder, val);
678}
679} // namespace aos