Add LZ4 compression to MCAP converter
AOS messages take up a lot of space uncompressed. This results in ~10x
reductions in disk space used on diagnostic logs and has relatively
minimal impact on Foxglove performance (it'll do better if you are
trying to copy over the network, slightly worse if accessing
locally).
Parameterize the MCAP tests to cover more flag combinations.
Change-Id: Id61bb2bc871837b0dd927187dd857964ea534a0b
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index 818844b..742f284 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -3,6 +3,8 @@
#include "absl/strings/str_replace.h"
#include "aos/configuration_schema.h"
#include "aos/flatbuffer_merge.h"
+#include "lz4/lz4.h"
+#include "lz4/lz4frame.h"
#include "single_include/nlohmann/json.hpp"
DEFINE_uint64(mcap_chunk_size, 10'000'000,
@@ -82,12 +84,27 @@
return schema;
}
+namespace {
+std::string_view CompressionName(McapLogger::Compression compression) {
+ switch (compression) {
+ case McapLogger::Compression::kNone:
+ return "";
+ case McapLogger::Compression::kLz4:
+ return "lz4";
+ }
+ LOG(FATAL) << "Unreachable.";
+}
+} // namespace
+
McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path,
- Serialization serialization, CanonicalChannelNames canonical_channels)
+ Serialization serialization,
+ CanonicalChannelNames canonical_channels,
+ Compression compression)
: event_loop_(event_loop),
output_(output_path),
serialization_(serialization),
canonical_channels_(canonical_channels),
+ compression_(compression),
configuration_channel_([]() {
// Setup a fake Channel for providing the configuration in the MCAP
// file. This is included for convenience so that consumers of the MCAP
@@ -202,7 +219,8 @@
fetchers_[id] = event_loop_->MakeRawFetcher(channel);
event_loop_->OnRun([this, id, channel]() {
if (FLAGS_fetch && fetchers_[id]->Fetch()) {
- WriteMessage(id, channel, fetchers_[id]->context(), ¤t_chunks_[id]);
+ WriteMessage(id, channel, fetchers_[id]->context(),
+ ¤t_chunks_[id]);
}
});
}
@@ -346,8 +364,8 @@
VLOG(1) << "Shortening " << channel->name()->string_view() << " "
<< channel->type()->string_view() << " to " << shortest_name;
}
- topic_name = absl::StrCat(shortest_name, " ",
- channel->type()->string_view());
+ topic_name =
+ absl::StrCat(shortest_name, " ", channel->type()->string_view());
break;
}
}
@@ -461,8 +479,33 @@
AppendInt64(&string_builder_, records_size);
// Uncompressed CRC (unpopulated).
AppendInt32(&string_builder_, 0);
- AppendString(&string_builder_, "");
- AppendBytes(&string_builder_, chunk_records);
+ // Compression
+ AppendString(&string_builder_, CompressionName(compression_));
+ uint64_t records_size_compressed = records_size;
+ switch (compression_) {
+ case Compression::kNone:
+ AppendBytes(&string_builder_, chunk_records);
+ break;
+ case Compression::kLz4: {
+ // Default preferences.
+ LZ4F_preferences_t *lz4_preferences = nullptr;
+ const uint64_t max_size =
+ LZ4F_compressFrameBound(records_size, lz4_preferences);
+ CHECK_NE(0u, max_size);
+ if (max_size > compression_buffer_.size()) {
+ compression_buffer_.resize(max_size);
+ }
+ records_size_compressed = LZ4F_compressFrame(
+ compression_buffer_.data(), compression_buffer_.size(),
+ reinterpret_cast<const char *>(chunk_records.data()),
+ chunk_records.size(), lz4_preferences);
+ CHECK(!LZ4F_isError(records_size_compressed));
+ AppendBytes(&string_builder_,
+ {reinterpret_cast<const char *>(compression_buffer_.data()),
+ static_cast<size_t>(records_size_compressed)});
+ break;
+ }
+ }
WriteRecord(OpCode::kChunk, string_builder_.Result());
std::map<uint16_t, uint64_t> index_offsets;
@@ -476,9 +519,16 @@
}
chunk->message_indices.clear();
chunk_indices_.push_back(ChunkIndex{
- chunk->earliest_message.value(), chunk->latest_message.value(), chunk_offset,
- message_index_start - chunk_offset, records_size, index_offsets,
- static_cast<uint64_t>(output_.tellp()) - message_index_start});
+ .start_time = chunk->earliest_message.value(),
+ .end_time = chunk->latest_message.value(),
+ .offset = chunk_offset,
+ .chunk_size = message_index_start - chunk_offset,
+ .records_size = records_size,
+ .records_size_compressed = records_size_compressed,
+ .message_index_offsets = index_offsets,
+ .message_index_size =
+ static_cast<uint64_t>(output_.tellp()) - message_index_start,
+ .compression = compression_});
chunk->earliest_message.reset();
}
@@ -522,9 +572,9 @@
AppendChannelMap(&string_builder_, index.message_index_offsets);
AppendInt64(&string_builder_, index.message_index_size);
// Compression used.
- AppendString(&string_builder_, "");
+ AppendString(&string_builder_, CompressionName(index.compression));
// Compressed and uncompressed records size.
- AppendInt64(&string_builder_, index.records_size);
+ AppendInt64(&string_builder_, index.records_size_compressed);
AppendInt64(&string_builder_, index.records_size);
WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
}