Implement flatbuffer mode in log_to_mcap
This makes log_to_mcap output a flatbuffer-based MCAP format (if requested)
instead of a JSON format. This significantly speeds up conversion times
and reduces disk space used, and should at least put us in the ball-park
for working with images in foxglove (although we still need to do some
extra work to either use one of their pre-defined image formats or
support our existing image formats natively in studio).
Change-Id: Id1778c0b5b9ced90517ab34269511d78cd0c7e58
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/log_to_mcap.cc b/aos/util/log_to_mcap.cc
index 49aca69..5330c60 100644
--- a/aos/util/log_to_mcap.cc
+++ b/aos/util/log_to_mcap.cc
@@ -5,6 +5,7 @@
DEFINE_string(node, "", "Node to replay from the perspective of.");
DEFINE_string(output_path, "/tmp/log.mcap", "Log to output.");
+DEFINE_string(mode, "json", "json or flatbuffer serialization.");
// Converts an AOS log to an MCAP log that can be fed into Foxglove. To try this
// out, run:
@@ -30,6 +31,9 @@
std::unique_ptr<aos::EventLoop> mcap_event_loop =
reader.event_loop_factory()->MakeEventLoop("mcap", node);
CHECK(!FLAGS_output_path.empty());
- aos::McapLogger relogger(mcap_event_loop.get(), FLAGS_output_path);
+ aos::McapLogger relogger(mcap_event_loop.get(), FLAGS_output_path,
+ FLAGS_mode == "flatbuffer"
+ ? aos::McapLogger::Serialization::kFlatbuffer
+ : aos::McapLogger::Serialization::kJson);
reader.event_loop_factory()->Run();
}
diff --git a/aos/util/log_to_mcap_test.py b/aos/util/log_to_mcap_test.py
index 86c3d1d..72c9078 100644
--- a/aos/util/log_to_mcap_test.py
+++ b/aos/util/log_to_mcap_test.py
@@ -22,7 +22,7 @@
subprocess.run([args.generate_log, "--output_folder", log_name]).check_returncode()
# Run with a really small chunk size, to force a multi-chunk file.
subprocess.run(
- [args.log_to_mcap, "--output_path", mcap_name, "--mcap_chunk_size", "1000",
+ [args.log_to_mcap, "--output_path", mcap_name, "--mcap_chunk_size", "1000", "--mode", "json",
log_name]).check_returncode()
# MCAP attempts to find $HOME/.mcap.yaml, and dies on $HOME not existing. So
# give it an arbitrary config location (it seems to be fine with a non-existent config).
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index f4056ea..561a02b 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -1,10 +1,17 @@
#include "aos/util/mcap_logger.h"
#include "absl/strings/str_replace.h"
+#include "aos/flatbuffer_merge.h"
#include "single_include/nlohmann/json.hpp"
-DEFINE_uint64(mcap_chunk_size, 10000000,
+DEFINE_uint64(mcap_chunk_size, 10'000'000,
"Size, in bytes, of individual MCAP chunks");
+DEFINE_bool(fetch, false,
+ "Whether to fetch most recent messages at start of logfile. Turn "
+ "this on if there are, e.g., one-time messages sent before the "
+ "start of the logfile that you need access to. Turn it off if you "
+ "don't want to deal with having messages that have timestamps that "
+ "may be arbitrarily far before any other interesting messages.");
namespace aos {
@@ -74,8 +81,11 @@
return schema;
}
-McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path)
- : event_loop_(event_loop), output_(output_path) {
+McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path,
+ Serialization serialization)
+ : event_loop_(event_loop),
+ output_(output_path),
+ serialization_(serialization) {
event_loop->SkipTimingReport();
event_loop->SkipAosLog();
CHECK(output_);
@@ -119,13 +129,34 @@
// summary and summary offset sections.
WriteFooter(summary_offset, summary_offset_offset);
WriteMagic();
+
+ // TODO(james): Add compression. With flatbuffers messages that contain large
+ // numbers of zeros (e.g., large grids or thresholded images) this can result
+ // in massive savings.
+ if (VLOG_IS_ON(2)) {
+ // For debugging, print out how much space each channel is taking in the
+ // overall log.
+ LOG(INFO) << total_message_bytes_;
+ std::vector<std::pair<size_t, const Channel *>> channel_bytes;
+ for (const auto &pair : total_channel_bytes_) {
+ channel_bytes.push_back(std::make_pair(pair.second, pair.first));
+ }
+ std::sort(channel_bytes.begin(), channel_bytes.end());
+ for (const auto &pair : channel_bytes) {
+ LOG(INFO) << configuration::StrippedChannelToString(pair.second) << ": "
+ << static_cast<float>(pair.first) * 1e-6 << "MB "
+ << static_cast<float>(pair.first) / total_message_bytes_
+ << "\n";
+ }
+ }
}
std::vector<McapLogger::SummaryOffset> McapLogger::WriteSchemasAndChannels(
RegisterHandlers register_handlers) {
- uint16_t id = 1;
+ uint16_t id = 0;
std::map<uint16_t, const Channel *> channels;
for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ ++id;
if (!configuration::ChannelIsReadableOnNode(channel, event_loop_->node())) {
continue;
}
@@ -141,8 +172,13 @@
WriteChunk();
}
});
+ fetchers_[id] = event_loop_->MakeRawFetcher(channel);
+ event_loop_->OnRun([this, id, channel]() {
+ if (FLAGS_fetch && fetchers_[id]->Fetch()) {
+ WriteMessage(id, channel, fetchers_[id]->context(), ¤t_chunk_);
+ }
+ });
}
- ++id;
}
std::vector<SummaryOffset> offsets;
@@ -200,7 +236,9 @@
void McapLogger::WriteSchema(const uint16_t id, const aos::Channel *channel) {
CHECK(channel->has_schema());
- std::string schema = JsonSchemaForFlatbuffer({channel->schema()}).dump();
+
+ const FlatbufferDetachedBuffer<reflection::Schema> schema =
+ CopyFlatBuffer(channel->schema());
// Write out the schema (we don't bother deduplicating schema types):
string_builder_.Reset();
@@ -208,10 +246,23 @@
AppendInt16(&string_builder_, id);
// Type name
AppendString(&string_builder_, channel->type()->string_view());
- // Encoding
- AppendString(&string_builder_, "jsonschema");
- // Actual schema itself
- AppendString(&string_builder_, schema);
+ switch (serialization_) {
+ case Serialization::kJson:
+ // Encoding
+ AppendString(&string_builder_, "jsonschema");
+ // Actual schema itself
+ AppendString(&string_builder_,
+ JsonSchemaForFlatbuffer({channel->schema()}).dump());
+ break;
+ case Serialization::kFlatbuffer:
+ // Encoding
+ AppendString(&string_builder_, "flatbuffer");
+ // Actual schema itself
+ AppendString(&string_builder_,
+ {reinterpret_cast<const char *>(schema.span().data()),
+ schema.span().size()});
+ break;
+ }
WriteRecord(OpCode::kSchema, string_builder_.Result());
}
@@ -227,7 +278,15 @@
absl::StrCat(channel->name()->string_view(), " ",
channel->type()->string_view()));
// Encoding
- AppendString(&string_builder_, "json");
+ switch (serialization_) {
+ case Serialization::kJson:
+ AppendString(&string_builder_, "json");
+ break;
+ case Serialization::kFlatbuffer:
+ AppendString(&string_builder_, "flatbuffer");
+ break;
+ }
+
// Metadata (technically supposed to be a Map<string, string>)
AppendString(&string_builder_, "");
WriteRecord(OpCode::kChannel, string_builder_.Result());
@@ -241,9 +300,15 @@
if (!earliest_message_.has_value()) {
earliest_message_ = context.monotonic_event_time;
+ } else {
+ earliest_message_ =
+ std::min(context.monotonic_event_time, earliest_message_.value());
}
if (!earliest_chunk_message_.has_value()) {
earliest_chunk_message_ = context.monotonic_event_time;
+ } else {
+ earliest_chunk_message_ =
+ std::min(context.monotonic_event_time, earliest_chunk_message_.value());
}
latest_message_ = context.monotonic_event_time;
@@ -257,6 +322,8 @@
// TODO(james): If we use this for multi-node logfiles, use distributed clock.
AppendInt64(&string_builder_,
context.monotonic_event_time.time_since_epoch().count());
+ // Note: Foxglove Studio doesn't appear to actually support using publish time
+ // right now.
AppendInt64(&string_builder_,
context.monotonic_event_time.time_since_epoch().count());
@@ -267,8 +334,18 @@
<< ": Corrupted flatbuffer on " << channel->name()->c_str() << " "
<< channel->type()->c_str();
- aos::FlatbufferToJson(&string_builder_, channel->schema(),
- static_cast<const uint8_t *>(context.data));
+ switch (serialization_) {
+ case Serialization::kJson:
+ aos::FlatbufferToJson(&string_builder_, channel->schema(),
+ static_cast<const uint8_t *>(context.data));
+ break;
+ case Serialization::kFlatbuffer:
+ string_builder_.Append(
+ {static_cast<const char *>(context.data), context.size});
+ break;
+ }
+ total_message_bytes_ += context.size;
+ total_channel_bytes_[channel] += context.size;
message_indices_[channel_id].push_back(std::make_pair<uint64_t, uint64_t>(
context.monotonic_event_time.time_since_epoch().count(),
diff --git a/aos/util/mcap_logger.h b/aos/util/mcap_logger.h
index dcacb68..5ae6413 100644
--- a/aos/util/mcap_logger.h
+++ b/aos/util/mcap_logger.h
@@ -30,7 +30,14 @@
// available, to be able to support Foxglove fully.
class McapLogger {
public:
- McapLogger(EventLoop *event_loop, const std::string &output_path);
+ // Whether to serialize the messages into the MCAP file as JSON or
+ // flatbuffers.
+ enum class Serialization {
+ kJson,
+ kFlatbuffer,
+ };
+ McapLogger(EventLoop *event_loop, const std::string &output_path,
+ Serialization serialization);
~McapLogger();
private:
@@ -122,6 +129,9 @@
aos::EventLoop *event_loop_;
std::ofstream output_;
+ const Serialization serialization_;
+ size_t total_message_bytes_ = 0;
+ std::map<const Channel *, size_t> total_channel_bytes_;
// Buffer containing serialized message data for the currently-being-built
// chunk.
std::stringstream current_chunk_;
@@ -136,6 +146,7 @@
aos::monotonic_clock::min_time;
// Count of all messages on each channel, indexed by channel ID.
std::map<uint16_t, uint64_t> message_counts_;
+ std::map<uint16_t, std::unique_ptr<RawFetcher>> fetchers_;
// MessageIndex's for each message. The std::map is indexed by channel ID. The
// vector is then a series of pairs of (timestamp, offset from start of
// current_chunk_).