Add code to xz-compress log files

Change-Id: I2e00c8aa316f5907f7f9bc1a4653b1f7c793a876
diff --git a/WORKSPACE b/WORKSPACE
index eb7ed28..9ddecff 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -70,6 +70,14 @@
     "//debian:m4.bzl",
     m4_debs = "files",
 )
+load(
+    "//debian:lzma_amd64.bzl",
+    lzma_amd64_debs = "files",
+)
+load(
+    "//debian:lzma_arm64.bzl",
+    lzma_arm64_debs = "files",
+)
 load("//debian:packages.bzl", "generate_repositories_for_debs")
 
 generate_repositories_for_debs(python_debs)
@@ -106,6 +114,10 @@
 
 generate_repositories_for_debs(m4_debs)
 
+generate_repositories_for_debs(lzma_amd64_debs)
+
+generate_repositories_for_debs(lzma_arm64_debs)
+
 http_archive(
     name = "python_repo",
     build_file = "@//debian:python.BUILD",
@@ -753,3 +765,45 @@
     sha256 = "ee8dfe664ac8c1d066bab64f71bd076a021875581b3cc47dac4a14a475f50b15",
     url = "http://www.frc971.org/Build-Dependencies/m4.tar.gz",
 )
+
+# //debian:lzma_amd64
+http_archive(
+    name = "lzma_amd64",
+    build_file_content = """
+cc_library(
+    name = "lib",
+    srcs = [
+        "usr/lib/x86_64-linux-gnu/liblzma.a",
+    ],
+    hdrs = glob([
+        "usr/include/lzma/*.h",
+        "usr/include/*.h",
+    ]),
+    strip_include_prefix = "usr/include",
+    visibility = ["//visibility:public"],
+)
+""",
+    sha256 = "e0ccaa7f793e44638e9f89570e00f146073a98a5928e0b547146c8184488bb19",
+    urls = ["http://www.frc971.org/Build-Dependencies/lzma_amd64.tar.gz"],
+)
+
+# //debian:lzma_arm64
+http_archive(
+    name = "lzma_arm64",
+    build_file_content = """
+cc_library(
+    name = "lib",
+    srcs = [
+        "usr/lib/aarch64-linux-gnu/liblzma.a",
+    ],
+    hdrs = glob([
+        "usr/include/lzma/*.h",
+        "usr/include/*.h",
+    ]),
+    strip_include_prefix = "usr/include",
+    visibility = ["//visibility:public"],
+)
+""",
+    sha256 = "18db35669ee49a5f8324a344071dd4ab553e716f385fb75747b909bd1de959f5",
+    urls = ["http://www.frc971.org/Build-Dependencies/lzma_arm64.tar.gz"],
+)
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 3f43e07..573b9aa 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -33,7 +33,11 @@
         "@com_github_google_flatbuffers//:flatbuffers",
         "@com_github_google_glog//:glog",
         "@com_google_absl//absl/types:span",
-    ],
+    ] + select({
+        "//tools:cpu_k8": [":lzma_encoder"],
+        "//tools:cpu_arm64": [":lzma_encoder"],
+        "//conditions:default": [],
+    }),
 )
 
 cc_library(
@@ -68,6 +72,60 @@
     ],
 )
 
+# TODO(Brian): Properly restrict this to specific platforms and
+# un-conditionalize the srcs and hdrs once we have support for that which
+# interacts with select properly.
+cc_library(
+    name = "lzma_encoder",
+    srcs = select({
+        "//tools:cpu_k8": [
+            "lzma_encoder.cc",
+        ],
+        "//tools:cpu_arm64": [
+            "lzma_encoder.cc",
+        ],
+        "//conditions:default": [],
+    }),
+    hdrs = select({
+        "//tools:cpu_k8": [
+            "lzma_encoder.h",
+        ],
+        "//tools:cpu_arm64": [
+            "lzma_encoder.h",
+        ],
+        "//conditions:default": [],
+    }),
+    visibility = ["//visibility:public"],
+    deps = [
+        ":buffer_encoder",
+        ":logger_fbs",
+        "//aos:configuration_fbs",
+        "//aos/containers:resizeable_buffer",
+        "//third_party:lzma",
+        "@com_github_google_flatbuffers//:flatbuffers",
+        "@com_github_google_glog//:glog",
+        "@com_google_absl//absl/types:span",
+    ],
+)
+
+cc_test(
+    name = "lzma_encoder_test",
+    srcs = [
+        "lzma_encoder_test.cc",
+    ],
+    restricted_to = [
+        "//tools:k8",
+        "//tools:arm64",
+    ],
+    shard_count = 4,
+    deps = [
+        ":buffer_encoder_param_test",
+        ":lzma_encoder",
+        "//aos/testing:googletest",
+        "@com_github_google_glog//:glog",
+    ],
+)
+
 cc_library(
     name = "buffer_encoder_param_test",
     testonly = True,
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 9d8145d..7ddc658 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -16,6 +16,18 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
+#if defined(__x86_64__)
+#define ENABLE_LZMA 1
+#elif defined(__aarch64__)
+#define ENABLE_LZMA 1
+#else
+#define ENABLE_LZMA 0
+#endif
+
+#if ENABLE_LZMA
+#include "aos/events/logging/lzma_encoder.h"
+#endif
+
 DEFINE_int32(flush_size, 128000,
              "Number of outstanding bytes to allow before flushing to disk.");
 
@@ -206,9 +218,16 @@
 }
 
 SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
-  // Support for other kinds of decoders based on the filename should be added
-  // here.
-  decoder_ = std::make_unique<DummyDecoder>(filename);
+  static const std::string_view kXz = ".xz";
+  if (filename.substr(filename.size() - kXz.size()) == kXz) {
+#if ENABLE_LZMA
+    decoder_ = std::make_unique<LzmaDecoder>(filename);
+#else
+    LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
+#endif
+  } else {
+    decoder_ = std::make_unique<DummyDecoder>(filename);
+  }
 }
 
 absl::Span<const uint8_t> SpanReader::ReadMessage() {
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
new file mode 100644
index 0000000..a8cafda
--- /dev/null
+++ b/aos/events/logging/lzma_encoder.cc
@@ -0,0 +1,194 @@
+#include "aos/events/logging/lzma_encoder.h"
+
+#include "glog/logging.h"
+
+namespace aos::logger {
+namespace {
+
+// Returns if `status` is not an error code, otherwise logs the appropriate
+// error message and crashes.
+void CheckLzmaCodeIsOk(lzma_ret status) {
+  switch (status) {
+    case LZMA_OK:
+    case LZMA_STREAM_END:
+      return;
+    case LZMA_MEM_ERROR:
+      LOG(FATAL) << "Memory allocation failed:" << status;
+    case LZMA_OPTIONS_ERROR:
+      LOG(FATAL) << "The given compression preset or decompression options are "
+                    "not supported: "
+                 << status;
+    case LZMA_UNSUPPORTED_CHECK:
+      LOG(FATAL) << "The given check type is not supported: " << status;
+    case LZMA_PROG_ERROR:
+      LOG(FATAL) << "One or more of the parameters have values that will never "
+                    "be valid: "
+                 << status;
+    case LZMA_MEMLIMIT_ERROR:
+      LOG(FATAL) << "Decoder needs more memory than allowed by the specified "
+                    "memory usage limit: "
+                 << status;
+    case LZMA_FORMAT_ERROR:
+      LOG(FATAL) << "File format not recognized: " << status;
+    case LZMA_DATA_ERROR:
+      LOG(FATAL) << "Compressed file is corrupt: " << status;
+    case LZMA_BUF_ERROR:
+      LOG(FATAL) << "Compressed file is truncated or corrupt: " << status;
+    default:
+      LOG(FATAL) << "Unexpected return value: " << status;
+  }
+}
+
+}  // namespace
+
+LzmaEncoder::LzmaEncoder(const uint32_t compression_preset)
+    : stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
+  CHECK_GE(compression_preset_, 0u)
+      << ": Compression preset must be in the range [0, 9].";
+  CHECK_LE(compression_preset_, 9u)
+      << ": Compression preset must be in the range [0, 9].";
+
+  lzma_ret status =
+      lzma_easy_encoder(&stream_, compression_preset_, LZMA_CHECK_CRC64);
+  CheckLzmaCodeIsOk(status);
+  stream_.avail_out = 0;
+  VLOG(2) << "LzmaEncoder: Initialization succeeded.";
+}
+
+LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
+
+void LzmaEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
+  CHECK(in.data()) << ": Encode called with nullptr.";
+
+  stream_.next_in = in.data();
+  stream_.avail_in = in.size();
+
+  RunLzmaCode(LZMA_RUN);
+}
+
+void LzmaEncoder::Finish() { RunLzmaCode(LZMA_FINISH); }
+
+void LzmaEncoder::Clear(const int n) {
+  CHECK_GE(n, 0);
+  CHECK_LE(static_cast<size_t>(n), queue_size());
+  queue_.erase(queue_.begin(), queue_.begin() + n);
+  if (queue_.empty()) {
+    stream_.next_out = nullptr;
+    stream_.avail_out = 0;
+  }
+}
+
+std::vector<absl::Span<const uint8_t>> LzmaEncoder::queue() const {
+  std::vector<absl::Span<const uint8_t>> queue;
+  if (queue_.empty()) {
+    return queue;
+  }
+  for (size_t i = 0; i < queue_.size() - 1; ++i) {
+    queue.emplace_back(
+        absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
+  }
+  // For the last buffer in the queue, we must account for the possibility that
+  // the buffer isn't full yet.
+  queue.emplace_back(absl::MakeConstSpan(
+      queue_.back().data(), queue_.back().size() - stream_.avail_out));
+  return queue;
+}
+
+size_t LzmaEncoder::queued_bytes() const {
+  size_t bytes = queue_size() * kEncodedBufferSizeBytes;
+  // Subtract the bytes that the encoder hasn't filled yet.
+  bytes -= stream_.avail_out;
+  return bytes;
+}
+
+void LzmaEncoder::RunLzmaCode(lzma_action action) {
+  CHECK(!finished_);
+
+  // This is to keep track of how many bytes resulted from encoding this input
+  // buffer.
+  size_t last_avail_out = stream_.avail_out;
+
+  while (stream_.avail_in > 0 || action == LZMA_FINISH) {
+    // If output buffer is full, create a new one, queue it up, and resume
+    // encoding. This could happen in the first call to Encode after
+    // construction or a Reset, or when an input buffer is large enough to fill
+    // more than one output buffer.
+    if (stream_.avail_out == 0) {
+      queue_.emplace_back();
+      queue_.back().resize(kEncodedBufferSizeBytes);
+      stream_.next_out = queue_.back().data();
+      stream_.avail_out = kEncodedBufferSizeBytes;
+      // Update the byte count.
+      total_bytes_ += last_avail_out;
+      last_avail_out = stream_.avail_out;
+    }
+
+    // Encode the data.
+    lzma_ret status = lzma_code(&stream_, action);
+    CheckLzmaCodeIsOk(status);
+    if (action == LZMA_FINISH) {
+      if (status == LZMA_STREAM_END) {
+        // This is returned when lzma_code is all done.
+        finished_ = true;
+        break;
+      }
+    } else {
+      CHECK(status != LZMA_STREAM_END);
+    }
+    VLOG(2) << "LzmaEncoder: Encoded chunk.";
+  }
+
+  // Update the number of resulting encoded bytes.
+  total_bytes_ += last_avail_out - stream_.avail_out;
+}
+
+LzmaDecoder::LzmaDecoder(std::string_view filename)
+    : dummy_decoder_(filename), stream_(LZMA_STREAM_INIT) {
+  compressed_data_.resize(kBufSize);
+
+  lzma_ret status =
+      lzma_stream_decoder(&stream_, UINT64_MAX, LZMA_CONCATENATED);
+  CheckLzmaCodeIsOk(status);
+  stream_.avail_out = 0;
+  VLOG(2) << "LzmaDecoder: Initialization succeeded.";
+}
+
+LzmaDecoder::~LzmaDecoder() { lzma_end(&stream_); }
+
+size_t LzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
+  if (finished_) {
+    return 0;
+  }
+
+  // Write into the given range.
+  stream_.next_out = begin;
+  stream_.avail_out = end - begin;
+  // Keep decompressing until we run out of buffer space.
+  while (stream_.avail_out > 0) {
+    if (action_ == LZMA_RUN && stream_.avail_in == 0) {
+      // Read more bytes from the file if we're all out.
+      const size_t count =
+          dummy_decoder_.Read(compressed_data_.begin(), compressed_data_.end());
+      if (count == 0) {
+        // No more data to read in the file, begin the finishing operation.
+        action_ = LZMA_FINISH;
+      } else {
+        stream_.next_in = compressed_data_.data();
+        stream_.avail_in = count;
+      }
+    }
+    // Decompress the data.
+    const lzma_ret status = lzma_code(&stream_, action_);
+    // Return if we're done.
+    if (status == LZMA_STREAM_END) {
+      CHECK_EQ(action_, LZMA_FINISH)
+          << ": Got LZMA_STREAM_END when action wasn't LZMA_FINISH";
+      finished_ = true;
+      return (end - begin) - stream_.avail_out;
+    }
+    CheckLzmaCodeIsOk(status);
+  }
+  return end - begin;
+}
+
+}  // namespace aos::logger
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
new file mode 100644
index 0000000..d7080a9
--- /dev/null
+++ b/aos/events/logging/lzma_encoder.h
@@ -0,0 +1,72 @@
+#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
+#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
+
+#include "absl/types/span.h"
+#include "flatbuffers/flatbuffers.h"
+#include "lzma.h"
+
+#include "aos/containers/resizeable_buffer.h"
+#include "aos/events/logging/buffer_encoder.h"
+#include "aos/events/logging/logger_generated.h"
+
+namespace aos::logger {
+
+// Encodes buffers using liblzma.
+class LzmaEncoder final : public DetachedBufferEncoder {
+ public:
+  // Initializes the LZMA stream and encoder.
+  explicit LzmaEncoder(uint32_t compression_preset);
+  // Gracefully shuts down the encoder.
+  ~LzmaEncoder() final;
+
+  void Encode(flatbuffers::DetachedBuffer &&in) final;
+  void Finish() final;
+  void Clear(int n) final;
+  std::vector<absl::Span<const uint8_t>> queue() const final;
+  size_t queued_bytes() const final;
+  size_t total_bytes() const final { return total_bytes_; }
+  size_t queue_size() const final { return queue_.size(); }
+
+ private:
+  static constexpr size_t kEncodedBufferSizeBytes{1024};
+
+  void RunLzmaCode(lzma_action action);
+
+  lzma_stream stream_;
+  uint32_t compression_preset_;
+  std::vector<ResizeableBuffer> queue_;
+  bool finished_ = false;
+  // Total bytes that resulted from encoding raw data since the last call to
+  // Reset.
+  size_t total_bytes_ = 0;
+};
+
+// Decompresses data with liblzma.
+class LzmaDecoder final : public DataDecoder {
+ public:
+  explicit LzmaDecoder(std::string_view filename);
+  ~LzmaDecoder();
+
+  size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+  // Size of temporary buffer to use.
+  static constexpr size_t kBufSize{256 * 1024};
+
+  // Temporary buffer for storing compressed data.
+  ResizeableBuffer compressed_data_;
+  // Used for reading data from the file.
+  DummyDecoder dummy_decoder_;
+  // Stream for decompression.
+  lzma_stream stream_;
+  // The current action. This is LZMA_RUN until we've run out of data to read
+  // from the file.
+  lzma_action action_ = LZMA_RUN;
+  // Flag that represents whether or not all the data from the file has been
+  // successfully decoded.
+  bool finished_ = false;
+};
+
+}  // namespace aos::logger
+
+#endif  // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
new file mode 100644
index 0000000..2d619c4
--- /dev/null
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -0,0 +1,18 @@
+#include "aos/events/logging/lzma_encoder.h"
+
+#include "aos/events/logging/buffer_encoder_param_test.h"
+#include "gtest/gtest.h"
+
+namespace aos::logger::testing {
+
+INSTANTIATE_TEST_CASE_P(
+    Lzma, BufferEncoderTest,
+    ::testing::Combine(::testing::Values([]() {
+                         return std::make_unique<LzmaEncoder>(2);
+                       }),
+                       ::testing::Values([](std::string_view filename) {
+                         return std::make_unique<LzmaDecoder>(filename);
+                       }),
+                       ::testing::Range(0, 100)));
+
+}  // namespace aos::logger::testing
diff --git a/debian/BUILD b/debian/BUILD
index 5bebf04..6f955ae 100644
--- a/debian/BUILD
+++ b/debian/BUILD
@@ -65,10 +65,18 @@
     gstreamer_armhf_debs = "files",
 )
 load(
-    "//debian:m4.bzl",
+    ":m4.bzl",
     m4_debs = "files",
 )
-load("//debian:packages.bzl", "download_packages", "generate_deb_tarball")
+load(
+    ":lzma_amd64.bzl",
+    lzma_amd64_debs = "files",
+)
+load(
+    ":lzma_arm64.bzl",
+    lzma_arm64_debs = "files",
+)
+load(":packages.bzl", "download_packages", "generate_deb_tarball")
 
 filegroup(
     name = "matplotlib_patches",
@@ -340,6 +348,23 @@
     files = m4_debs,
 )
 
+download_packages(
+    name = "download_lzma",
+    packages = [
+        "liblzma-dev",
+    ],
+)
+
+generate_deb_tarball(
+    name = "lzma_amd64",
+    files = lzma_amd64_debs,
+)
+
+generate_deb_tarball(
+    name = "lzma_arm64",
+    files = lzma_arm64_debs,
+)
+
 exports_files([
     "ssh_wrapper.sh",
 ])
diff --git a/debian/lzma_amd64.bzl b/debian/lzma_amd64.bzl
new file mode 100644
index 0000000..1dbfa38
--- /dev/null
+++ b/debian/lzma_amd64.bzl
@@ -0,0 +1,3 @@
+files = {
+    "liblzma-dev_5.2.4-1_amd64.deb": "8373145ed28563aff33b116c434399347ab1fa77ba672d8d94151dd8f68c8ece",
+}
diff --git a/debian/lzma_arm64.bzl b/debian/lzma_arm64.bzl
new file mode 100644
index 0000000..a38d900
--- /dev/null
+++ b/debian/lzma_arm64.bzl
@@ -0,0 +1,3 @@
+files = {
+    "liblzma-dev_5.2.2-1.3_arm64.deb": "eb730c11f65c9bd10ae6c39b5a461e16294c21a423c9a8bc4011390798387e82",
+}
diff --git a/third_party/BUILD b/third_party/BUILD
index bcba978..2f22b0e 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -113,3 +113,15 @@
         "roborio": ["@webrtc_rio//:webrtc"],
     }),
 )
+
+# TODO(Brian): Properly restrict this to specific platforms and remove the
+# default deps once we have support for that which interacts with select properly.
+cc_library(
+    name = "lzma",
+    visibility = ["//visibility:public"],
+    deps = select({
+        "//tools:cpu_k8": ["@lzma_amd64//:lib"],
+        "//tools:cpu_arm64": ["@lzma_arm64//:lib"],
+        "//conditions:default": [],
+    }),
+)
diff --git a/tools/BUILD b/tools/BUILD
index ac76ee9..d77abf8 100644
--- a/tools/BUILD
+++ b/tools/BUILD
@@ -49,6 +49,11 @@
 )
 
 config_setting(
+    name = "cpu_arm64",
+    values = {"cpu": "arm64"},
+)
+
+config_setting(
     name = "has_asan",
     values = {"define": "have_asan=true"},
 )
@@ -75,11 +80,14 @@
 
 environment(name = "cortex-m4f-k22")
 
+environment(name = "arm64")
+
 environment_group(
     name = "cpus",
     defaults = [
         ":k8",
         ":roborio",
+        ":arm64",
         ":armhf-debian",
     ],
     environments = [
@@ -89,5 +97,6 @@
         ":armhf-debian",
         ":cortex-m4f",
         ":cortex-m4f-k22",
+        ":arm64",
     ],
 )