Implement flatbuffer mode in log_to_mcap
This makes log_to_mcap output a flatbuffer-based MCAP format (if requested)
instead of a JSON format. This significantly speeds up conversion times
and reduces disk space used, and should at least put us in the ball-park
for working with images in foxglove (although we still need to do some
extra work to either use one of their pre-defined image formats or
support our existing image formats natively in studio).
Change-Id: Id1778c0b5b9ced90517ab34269511d78cd0c7e58
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index f4056ea..561a02b 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -1,10 +1,17 @@
#include "aos/util/mcap_logger.h"
#include "absl/strings/str_replace.h"
+#include "aos/flatbuffer_merge.h"
#include "single_include/nlohmann/json.hpp"
-DEFINE_uint64(mcap_chunk_size, 10000000,
+DEFINE_uint64(mcap_chunk_size, 10'000'000,
"Size, in bytes, of individual MCAP chunks");
+DEFINE_bool(fetch, false,
+ "Whether to fetch most recent messages at start of logfile. Turn "
+ "this on if there are, e.g., one-time messages sent before the "
+ "start of the logfile that you need access to. Turn it off if you "
+ "don't want to deal with having messages that have timestamps that "
+ "may be arbitrarily far before any other interesting messages.");
namespace aos {
@@ -74,8 +81,11 @@
return schema;
}
-McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path)
- : event_loop_(event_loop), output_(output_path) {
+McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path,
+ Serialization serialization)
+ : event_loop_(event_loop),
+ output_(output_path),
+ serialization_(serialization) {
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
CHECK(output_);
@@ -119,13 +129,34 @@
// summary and summary offset sections.
WriteFooter(summary_offset, summary_offset_offset);
WriteMagic();
+
+ // TODO(james): Add compression. With flatbuffers messages that contain large
+ // numbers of zeros (e.g., large grids or thresholded images) this can result
+ // in massive savings.
+ if (VLOG_IS_ON(2)) {
+ // For debugging, print out how much space each channel is taking in the
+ // overall log.
+ LOG(INFO) << total_message_bytes_;
+ std::vector<std::pair<size_t, const Channel *>> channel_bytes;
+ for (const auto &pair : total_channel_bytes_) {
+ channel_bytes.push_back(std::make_pair(pair.second, pair.first));
+ }
+ std::sort(channel_bytes.begin(), channel_bytes.end());
+ for (const auto &pair : channel_bytes) {
+ LOG(INFO) << configuration::StrippedChannelToString(pair.second) << ": "
+ << static_cast<float>(pair.first) * 1e-6 << "MB "
+ << static_cast<float>(pair.first) / total_message_bytes_
+ << "\n";
+ }
+ }
}
std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
RegisterHandlers register_handlers) {
- uint16_t id = 1;
+ uint16_t id = 0;
std::map<uint16_t, const Channel *> channels;
for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ ++id;
if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
continue;
}
@@ -141,8 +172,13 @@
WriteChunk();
}
});
+ fetchers_[id] = event_loop_->MakeRawFetcher(channel);
+ event_loop_->OnRun([this, id, channel]() {
+ if (FLAGS_fetch && fetchers_[id]->Fetch()) {
+ WriteMessage(id, channel, fetchers_[id]->context(), ¤t_chunk_);
+ }
+ });
}
- ++id;
}
std::vector<SummaryOffset> offsets;
@@ -200,7 +236,9 @@
void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
CHECK(channel->has_schema());
- std::string schema = JsonSchemaForFlatbuffer({channel->schema()}).dump();
+
+ const FlatbufferDetachedBuffer<reflection::Schema> schema =
+ CopyFlatBuffer(channel->schema());
// Write out the schema (we don't bother deduplicating schema types):
string_builder_.Reset();
@@ -208,10 +246,23 @@
AppendInt16(&string_builder_, id);
// Type name
AppendString(&string_builder_, channel->type()->string_view());
- // Encoding
- AppendString(&string_builder_, "jsonschema");
- // Actual schema itself
- AppendString(&string_builder_, schema);
+ switch (serialization_) {
+ case Serialization::kJson:
+ // Encoding
+ AppendString(&string_builder_, "jsonschema");
+ // Actual schema itself
+ AppendString(&string_builder_,
+ JsonSchemaForFlatbuffer({channel->schema()}).dump());
+ break;
+ case Serialization::kFlatbuffer:
+ // Encoding
+ AppendString(&string_builder_, "flatbuffer");
+ // Actual schema itself
+ AppendString(&string_builder_,
+ {reinterpret_cast<const char *>(schema.span().data()),
+ schema.span().size()});
+ break;
+ }
WriteRecord(OpCode::kSchema, string_builder_.Result());
}
@@ -227,7 +278,15 @@
absl::StrCat(channel->name()->string_view(), " ",
channel->type()->string_view()));
// Encoding
- AppendString(&string_builder_, "json");
+ switch (serialization_) {
+ case Serialization::kJson:
+ AppendString(&string_builder_, "json");
+ break;
+ case Serialization::kFlatbuffer:
+ AppendString(&string_builder_, "flatbuffer");
+ break;
+ }
+
// Metadata (technically supposed to be a Map<string, string>)
AppendString(&string_builder_, "");
WriteRecord(OpCode::kChannel, string_builder_.Result());
@@ -241,9 +300,15 @@
if (!earliest_message_.has_value()) {
earliest_message_ = context.monotonic_event_time;
+ } else {
+ earliest_message_ =
+ std::min(context.monotonic_event_time, earliest_message_.value());
}
if (!earliest_chunk_message_.has_value()) {
earliest_chunk_message_ = context.monotonic_event_time;
+ } else {
+ earliest_chunk_message_ =
+ std::min(context.monotonic_event_time, earliest_chunk_message_.value());
}
latest_message_ = context.monotonic_event_time;
@@ -257,6 +322,8 @@
// TODO(james): If we use this for multi-node logfiles, use distributed clock.
AppendInt64(&string_builder_,
context.monotonic_event_time.time_since_epoch().count());
+ // Note: Foxglove Studio doesn't appear to actually support using publish time
+ // right now.
AppendInt64(&string_builder_,
context.monotonic_event_time.time_since_epoch().count());
@@ -267,8 +334,18 @@
<< ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
<< channel->type()->c_str();
- aos::FlatbufferToJson(&string_builder_, channel->schema(),
- static_cast<const uint8_t *>(context.data));
+ switch (serialization_) {
+ case Serialization::kJson:
+ aos::FlatbufferToJson(&string_builder_, channel->schema(),
+ static_cast<const uint8_t *>(context.data));
+ break;
+ case Serialization::kFlatbuffer:
+ string_builder_.Append(
+ {static_cast<const char *>(context.data), context.size});
+ break;
+ }
+ total_message_bytes_ += context.size;
+ total_channel_bytes_[channel] += context.size;
message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
context.monotonic_event_time.time_since_epoch().count(),