Add LZ4 compression to MCAP converter

AOS messages take up a lot of space uncompressed. This results in ~10x
reductions in disk space used on diagnostic logs and has relatively
minimal impact on Foxglove performance (it'll do better if you are
trying to copy over the network, slightly worse if accessing
locally).

Parameterize the MCAP tests to cover more flag combinations.

Change-Id: Id61bb2bc871837b0dd927187dd857964ea534a0b
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/WORKSPACE b/WORKSPACE
index 6ef1eb9..172e7bb 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -1266,11 +1266,22 @@
     url = "https://www.frc971.org/Build-Dependencies/aws_sdk-19.0.0-RC1.tar.gz",
 )
 
+# Source code of LZ4 (files under lib/) are under BSD 2-Clause.
+# The rest of the repository (build information, documentation, etc.) is under GPLv2.
+# We only care about the lib/ subfolder anyways, and strip out any other files.
+http_archive(
+    name = "com_github_lz4_lz4",
+    build_file = "//debian:BUILD.lz4.bazel",
+    sha256 = "0b0e3aa07c8c063ddf40b082bdf7e37a1562bda40a0ff5272957f3e987e0e54b",
+    strip_prefix = "lz4-1.9.4/lib",
+    url = "https://github.com/lz4/lz4/archive/refs/tags/v1.9.4.tar.gz",
+)
+
 http_file(
     name = "com_github_foxglove_mcap_mcap",
     executable = True,
-    sha256 = "cf4dfcf71e20a60406aaded03a165312c1ca535b509ead90eb1846fc598137d2",
-    urls = ["https://github.com/foxglove/mcap/releases/download/releases%2Fmcap-cli%2Fv0.0.5/mcap-linux-amd64"],
+    sha256 = "ae745dd09cf4c9570c1c038a72630c07b073f0ed4b05983d64108ff748a40d3f",
+    urls = ["https://github.com/foxglove/mcap/releases/download/releases%2Fmcap-cli%2Fv0.0.22/mcap-linux-amd64"],
 )
 
 http_archive(
diff --git a/aos/util/BUILD b/aos/util/BUILD
index 19a9172..f8eb09f 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -86,6 +86,7 @@
         "//aos:fast_string_builder",
         "//aos:flatbuffer_utils",
         "//aos/events:event_loop",
+        "@com_github_lz4_lz4//:lz4",
         "@com_github_nlohmann_json//:json",
     ],
 )
diff --git a/aos/util/log_to_mcap.cc b/aos/util/log_to_mcap.cc
index 9a0af48..e669658 100644
--- a/aos/util/log_to_mcap.cc
+++ b/aos/util/log_to_mcap.cc
@@ -10,6 +10,7 @@
     canonical_channel_names, false,
     "If set, use full channel names; by default, will shorten names to be the "
     "shortest possible version of the name (e.g., /aos instead of /pi/aos).");
+DEFINE_bool(compress, true, "Whether to use LZ4 compression in MCAP file.");
 
 // Converts an AOS log to an MCAP log that can be fed into Foxglove. To try this
 // out, run:
@@ -58,6 +59,8 @@
                                  : aos::McapLogger::Serialization::kJson,
       FLAGS_canonical_channel_names
           ? aos::McapLogger::CanonicalChannelNames::kCanonical
-          : aos::McapLogger::CanonicalChannelNames::kShortened);
+          : aos::McapLogger::CanonicalChannelNames::kShortened,
+      FLAGS_compress ? aos::McapLogger::Compression::kLz4
+                     : aos::McapLogger::Compression::kNone);
   reader.event_loop_factory()->Run();
 }
diff --git a/aos/util/log_to_mcap_test.py b/aos/util/log_to_mcap_test.py
index 7bdffe4..36f8de0 100644
--- a/aos/util/log_to_mcap_test.py
+++ b/aos/util/log_to_mcap_test.py
@@ -10,6 +10,27 @@
 from typing import Sequence, Text
 
 
+def make_permutations(options):
+    if len(options) == 0:
+        return [[]]
+    permutations = []
+    for option in options[0]:
+        for sub_permutations in make_permutations(options[1:]):
+            permutations.append([option] + sub_permutations)
+    return permutations
+
+
+def generate_argument_permutations():
+    arg_sets = [["--compress", "--nocompress"],
+                ["--mode=flatbuffer", "--mode=json"],
+                ["--canonical_channel_names", "--nocanonical_channel_names"],
+                ["--mcap_chunk_size=1000", "--mcap_chunk_size=10000000"],
+                ["--fetch", "--nofetch"]]
+    permutations = make_permutations(arg_sets)
+    print(permutations)
+    return permutations
+
+
 def main(argv: Sequence[Text]):
     parser = argparse.ArgumentParser()
     parser.add_argument("--log_to_mcap",
@@ -20,35 +41,38 @@
                         required=True,
                         help="Path to logfile generator.")
     args = parser.parse_args(argv)
-    with tempfile.TemporaryDirectory() as tmpdir:
-        log_name = tmpdir + "/test_log/"
-        mcap_name = tmpdir + "/log.mcap"
-        subprocess.run([args.generate_log, "--output_folder",
-                        log_name]).check_returncode()
-        # Run with a really small chunk size, to force a multi-chunk file.
-        subprocess.run([
-            args.log_to_mcap, "--output_path", mcap_name, "--mcap_chunk_size",
-            "1000", "--mode", "json", log_name
-        ]).check_returncode()
-        # MCAP attempts to find $HOME/.mcap.yaml, and dies on $HOME not existing. So
-        # give it an arbitrary config location (it seems to be fine with a non-existent config).
-        doctor_result = subprocess.run([
-            args.mcap, "doctor", mcap_name, "--config", tmpdir + "/.mcap.yaml"
-        ],
-                                       stdout=subprocess.PIPE,
-                                       stderr=subprocess.PIPE,
-                                       encoding='utf-8')
-        print(doctor_result.stdout)
-        print(doctor_result.stderr)
-        # mcap doctor doesn't actually return a non-zero exit code on certain failures...
-        # See https://github.com/foxglove/mcap/issues/356
-        if len(doctor_result.stderr) != 0:
-            print("Didn't expect any stderr output.")
-            return 1
-        if doctor_result.stdout != f"Examining {mcap_name}\n":
-            print("Only expected one line of stdout.")
-            return 1
-        doctor_result.check_returncode()
+    log_to_mcap_argument_permutations = generate_argument_permutations()
+    for log_to_mcap_args in log_to_mcap_argument_permutations:
+        with tempfile.TemporaryDirectory() as tmpdir:
+            log_name = tmpdir + "/test_log/"
+            mcap_name = tmpdir + "/log.mcap"
+            subprocess.run([args.generate_log, "--output_folder",
+                            log_name]).check_returncode()
+            # Run with a really small chunk size, to force a multi-chunk file.
+            subprocess.run([
+                args.log_to_mcap, "--output_path", mcap_name,
+                "--mcap_chunk_size", "1000", "--mode", "json", log_name
+            ] + log_to_mcap_args).check_returncode()
+            # MCAP attempts to find $HOME/.mcap.yaml, and dies on $HOME not existing. So
+            # give it an arbitrary config location (it seems to be fine with a non-existent config).
+            doctor_result = subprocess.run([
+                args.mcap, "doctor", mcap_name, "--config",
+                tmpdir + "/.mcap.yaml"
+            ],
+                                           stdout=subprocess.PIPE,
+                                           stderr=subprocess.PIPE,
+                                           encoding='utf-8')
+            print(doctor_result.stdout)
+            print(doctor_result.stderr)
+            # mcap doctor doesn't actually return a non-zero exit code on certain failures...
+            # See https://github.com/foxglove/mcap/issues/356
+            if len(doctor_result.stderr) != 0:
+                print("Didn't expect any stderr output.")
+                return 1
+            if doctor_result.stdout != f"Examining {mcap_name}\nHeader.profile field \"x-aos\" is not a well-known profile.\n":
+                print("Only expected two lines of stdout.")
+                return 1
+            doctor_result.check_returncode()
     return 0
 
 
diff --git a/aos/util/mcap_logger.cc b/aos/util/mcap_logger.cc
index 818844b..742f284 100644
--- a/aos/util/mcap_logger.cc
+++ b/aos/util/mcap_logger.cc
@@ -3,6 +3,8 @@
 #include "absl/strings/str_replace.h"
 #include "aos/configuration_schema.h"
 #include "aos/flatbuffer_merge.h"
+#include "lz4/lz4.h"
+#include "lz4/lz4frame.h"
 #include "single_include/nlohmann/json.hpp"
 
 DEFINE_uint64(mcap_chunk_size, 10'000'000,
@@ -82,12 +84,27 @@
   return schema;
 }
 
+namespace {
+std::string_view CompressionName(McapLogger::Compression compression) {
+  switch (compression) {
+    case McapLogger::Compression::kNone:
+      return "";
+    case McapLogger::Compression::kLz4:
+      return "lz4";
+  }
+  LOG(FATAL) << "Unreachable.";
+}
+}  // namespace
+
 McapLogger::McapLogger(EventLoop *event_loop, const std::string &output_path,
-                       Serialization serialization, CanonicalChannelNames canonical_channels)
+                       Serialization serialization,
+                       CanonicalChannelNames canonical_channels,
+                       Compression compression)
     : event_loop_(event_loop),
       output_(output_path),
       serialization_(serialization),
       canonical_channels_(canonical_channels),
+      compression_(compression),
       configuration_channel_([]() {
         // Setup a fake Channel for providing the configuration in the MCAP
         // file. This is included for convenience so that consumers of the MCAP
@@ -202,7 +219,8 @@
       fetchers_[id] = event_loop_->MakeRawFetcher(channel);
       event_loop_->OnRun([this, id, channel]() {
         if (FLAGS_fetch && fetchers_[id]->Fetch()) {
-          WriteMessage(id, channel, fetchers_[id]->context(), &current_chunks_[id]);
+          WriteMessage(id, channel, fetchers_[id]->context(),
+                       &current_chunks_[id]);
         }
       });
     }
@@ -346,8 +364,8 @@
           VLOG(1) << "Shortening " << channel->name()->string_view() << " "
                   << channel->type()->string_view() << " to " << shortest_name;
         }
-        topic_name = absl::StrCat(shortest_name, " ",
-                                  channel->type()->string_view());
+        topic_name =
+            absl::StrCat(shortest_name, " ", channel->type()->string_view());
         break;
       }
     }
@@ -461,8 +479,33 @@
   AppendInt64(&string_builder_, records_size);
   // Uncompressed CRC (unpopulated).
   AppendInt32(&string_builder_, 0);
-  AppendString(&string_builder_, "");
-  AppendBytes(&string_builder_, chunk_records);
+  // Compression
+  AppendString(&string_builder_, CompressionName(compression_));
+  uint64_t records_size_compressed = records_size;
+  switch (compression_) {
+    case Compression::kNone:
+      AppendBytes(&string_builder_, chunk_records);
+      break;
+    case Compression::kLz4: {
+      // Default preferences.
+      LZ4F_preferences_t *lz4_preferences = nullptr;
+      const uint64_t max_size =
+          LZ4F_compressFrameBound(records_size, lz4_preferences);
+      CHECK_NE(0u, max_size);
+      if (max_size > compression_buffer_.size()) {
+        compression_buffer_.resize(max_size);
+      }
+      records_size_compressed = LZ4F_compressFrame(
+          compression_buffer_.data(), compression_buffer_.size(),
+          reinterpret_cast<const char *>(chunk_records.data()),
+          chunk_records.size(), lz4_preferences);
+      CHECK(!LZ4F_isError(records_size_compressed));
+      AppendBytes(&string_builder_,
+                  {reinterpret_cast<const char *>(compression_buffer_.data()),
+                   static_cast<size_t>(records_size_compressed)});
+      break;
+    }
+  }
   WriteRecord(OpCode::kChunk, string_builder_.Result());
 
   std::map<uint16_t, uint64_t> index_offsets;
@@ -476,9 +519,16 @@
   }
   chunk->message_indices.clear();
   chunk_indices_.push_back(ChunkIndex{
-      chunk->earliest_message.value(), chunk->latest_message.value(), chunk_offset,
-      message_index_start - chunk_offset, records_size, index_offsets,
-      static_cast<uint64_t>(output_.tellp()) - message_index_start});
+      .start_time = chunk->earliest_message.value(),
+      .end_time = chunk->latest_message.value(),
+      .offset = chunk_offset,
+      .chunk_size = message_index_start - chunk_offset,
+      .records_size = records_size,
+      .records_size_compressed = records_size_compressed,
+      .message_index_offsets = index_offsets,
+      .message_index_size =
+          static_cast<uint64_t>(output_.tellp()) - message_index_start,
+      .compression = compression_});
   chunk->earliest_message.reset();
 }
 
@@ -522,9 +572,9 @@
     AppendChannelMap(&string_builder_, index.message_index_offsets);
     AppendInt64(&string_builder_, index.message_index_size);
     // Compression used.
-    AppendString(&string_builder_, "");
+    AppendString(&string_builder_, CompressionName(index.compression));
     // Compressed and uncompressed records size.
-    AppendInt64(&string_builder_, index.records_size);
+    AppendInt64(&string_builder_, index.records_size_compressed);
     AppendInt64(&string_builder_, index.records_size);
     WriteRecord(OpCode::kChunkIndex, string_builder_.Result());
   }
diff --git a/aos/util/mcap_logger.h b/aos/util/mcap_logger.h
index 267364a..70a1328 100644
--- a/aos/util/mcap_logger.h
+++ b/aos/util/mcap_logger.h
@@ -46,8 +46,14 @@
     // the channel names that are used in "real" applications.
     kShortened,
   };
+  // Chunk compression to use in the MCAP file.
+  enum class Compression {
+    kNone,
+    kLz4,
+  };
   McapLogger(EventLoop *event_loop, const std::string &output_path,
-             Serialization serialization, CanonicalChannelNames canonical_channels);
+             Serialization serialization,
+             CanonicalChannelNames canonical_channels, Compression compression);
   ~McapLogger();
 
  private:
@@ -87,8 +93,10 @@
     uint64_t offset;
     // Total size of the Chunk, in bytes.
     uint64_t chunk_size;
-    // Total size of the records portion of the Chunk, in bytes.
+    // Total uncompressed size of the records portion of the Chunk, in bytes.
     uint64_t records_size;
+    // Total size of the records portion of the Chunk, when compressed
+    uint64_t records_size_compressed;
     // Mapping of channel IDs to the MessageIndex entry for that channel within
     // the referenced Chunk. The MessageIndex is referenced by an offset from
     // the start of the file.
@@ -96,10 +104,13 @@
     // Total size, in bytes, of all the MessageIndex entries for this Chunk
     // together (note that they are required to be contiguous).
     uint64_t message_index_size;
+    // Compression used in this Chunk.
+    Compression compression;
   };
-  // Maintains the state of a single Chunk. In order to maximize read performance,
-  // we currently maintain separate chunks for each channel so that, in order to
-  // read a given channel, only data associated with that channel nead be read.
+  // Maintains the state of a single Chunk. In order to maximize read
+  // performance, we currently maintain separate chunks for each channel so
+  // that, in order to read a given channel, only data associated with that
+  // channel nead be read.
   struct ChunkStatus {
     // Buffer containing serialized message data for the currently-being-built
     // chunk.
@@ -163,6 +174,7 @@
   std::ofstream output_;
   const Serialization serialization_;
   const CanonicalChannelNames canonical_channels_;
+  const Compression compression_;
   size_t total_message_bytes_ = 0;
   std::map<const Channel *, size_t> total_channel_bytes_;
   FastStringBuilder string_builder_;
@@ -170,7 +182,8 @@
   // Earliest message observed in this logfile.
   std::optional<aos::monotonic_clock::time_point> earliest_message_;
   // Latest message observed in this logfile.
-  aos::monotonic_clock::time_point latest_message_ = aos::monotonic_clock::min_time;
+  aos::monotonic_clock::time_point latest_message_ =
+      aos::monotonic_clock::min_time;
   // Count of all messages on each channel, indexed by channel ID.
   std::map<uint16_t, uint64_t> message_counts_;
   std::map<uint16_t, std::unique_ptr<RawFetcher>> fetchers_;
@@ -186,6 +199,9 @@
   uint16_t configuration_id_ = 0;
   FlatbufferDetachedBuffer<Channel> configuration_channel_;
   FlatbufferDetachedBuffer<Configuration> configuration_;
+
+  // Memory buffer to use for compressing data.
+  std::vector<uint8_t> compression_buffer_;
 };
 }  // namespace aos
 #endif  // AOS_UTIL_MCAP_LOGGER_H_
diff --git a/debian/BUILD.lz4.bazel b/debian/BUILD.lz4.bazel
new file mode 100644
index 0000000..7aa8def
--- /dev/null
+++ b/debian/BUILD.lz4.bazel
@@ -0,0 +1,22 @@
+licenses(["notice"])
+
+cc_library(
+    name = "lz4",
+    srcs = [
+        "lz4.c",
+        "lz4frame.c",
+        "lz4hc.c",
+        "xxhash.c",
+    ],
+    hdrs = [
+        # lz4hc.c tries to #include lz4.c....
+        "lz4.c",
+        "lz4.h",
+        "lz4frame.h",
+        "lz4hc.h",
+        "xxhash.h",
+    ],
+    include_prefix = "lz4",
+    includes = ["."],
+    visibility = ["//visibility:public"],
+)