Add code to xz-compress log files
Change-Id: I2e00c8aa316f5907f7f9bc1a4653b1f7c793a876
diff --git a/WORKSPACE b/WORKSPACE
index eb7ed28..9ddecff 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -70,6 +70,14 @@
"//debian:m4.bzl",
m4_debs = "files",
)
+load(
+ "//debian:lzma_amd64.bzl",
+ lzma_amd64_debs = "files",
+)
+load(
+ "//debian:lzma_arm64.bzl",
+ lzma_arm64_debs = "files",
+)
load("//debian:packages.bzl", "generate_repositories_for_debs")
generate_repositories_for_debs(python_debs)
@@ -106,6 +114,10 @@
generate_repositories_for_debs(m4_debs)
+generate_repositories_for_debs(lzma_amd64_debs)
+
+generate_repositories_for_debs(lzma_arm64_debs)
+
http_archive(
name = "python_repo",
build_file = "@//debian:python.BUILD",
@@ -753,3 +765,45 @@
sha256 = "ee8dfe664ac8c1d066bab64f71bd076a021875581b3cc47dac4a14a475f50b15",
url = "http://www.frc971.org/Build-Dependencies/m4.tar.gz",
)
+
+# //debian:lzma_amd64
+http_archive(
+ name = "lzma_amd64",
+ build_file_content = """
+cc_library(
+ name = "lib",
+ srcs = [
+ "usr/lib/x86_64-linux-gnu/liblzma.a",
+ ],
+ hdrs = glob([
+ "usr/include/lzma/*.h",
+ "usr/include/*.h",
+ ]),
+ strip_include_prefix = "usr/include",
+ visibility = ["//visibility:public"],
+)
+""",
+ sha256 = "e0ccaa7f793e44638e9f89570e00f146073a98a5928e0b547146c8184488bb19",
+ urls = ["http://www.frc971.org/Build-Dependencies/lzma_amd64.tar.gz"],
+)
+
+# //debian:lzma_arm64
+http_archive(
+ name = "lzma_arm64",
+ build_file_content = """
+cc_library(
+ name = "lib",
+ srcs = [
+ "usr/lib/aarch64-linux-gnu/liblzma.a",
+ ],
+ hdrs = glob([
+ "usr/include/lzma/*.h",
+ "usr/include/*.h",
+ ]),
+ strip_include_prefix = "usr/include",
+ visibility = ["//visibility:public"],
+)
+""",
+ sha256 = "18db35669ee49a5f8324a344071dd4ab553e716f385fb75747b909bd1de959f5",
+ urls = ["http://www.frc971.org/Build-Dependencies/lzma_arm64.tar.gz"],
+)
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 3f43e07..573b9aa 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -33,7 +33,11 @@
"@com_github_google_flatbuffers//:flatbuffers",
"@com_github_google_glog//:glog",
"@com_google_absl//absl/types:span",
- ],
+ ] + select({
+ "//tools:cpu_k8": [":lzma_encoder"],
+ "//tools:cpu_arm64": [":lzma_encoder"],
+ "//conditions:default": [],
+ }),
)
cc_library(
@@ -68,6 +72,60 @@
],
)
+# TODO(Brian): Properly restrict this to specific platforms and
+# un-conditionalize the srcs and hdrs once we have support for that which
+# interacts with select properly.
+cc_library(
+ name = "lzma_encoder",
+ srcs = select({
+ "//tools:cpu_k8": [
+ "lzma_encoder.cc",
+ ],
+ "//tools:cpu_arm64": [
+ "lzma_encoder.cc",
+ ],
+ "//conditions:default": [],
+ }),
+ hdrs = select({
+ "//tools:cpu_k8": [
+ "lzma_encoder.h",
+ ],
+ "//tools:cpu_arm64": [
+ "lzma_encoder.h",
+ ],
+ "//conditions:default": [],
+ }),
+ visibility = ["//visibility:public"],
+ deps = [
+ ":buffer_encoder",
+ ":logger_fbs",
+ "//aos:configuration_fbs",
+ "//aos/containers:resizeable_buffer",
+ "//third_party:lzma",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ "@com_github_google_glog//:glog",
+ "@com_google_absl//absl/types:span",
+ ],
+)
+
+cc_test(
+ name = "lzma_encoder_test",
+ srcs = [
+ "lzma_encoder_test.cc",
+ ],
+ restricted_to = [
+ "//tools:k8",
+ "//tools:arm64",
+ ],
+ shard_count = 4,
+ deps = [
+ ":buffer_encoder_param_test",
+ ":lzma_encoder",
+ "//aos/testing:googletest",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
cc_library(
name = "buffer_encoder_param_test",
testonly = True,
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 9d8145d..7ddc658 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -16,6 +16,18 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
+#if defined(__x86_64__)
+#define ENABLE_LZMA 1
+#elif defined(__aarch64__)
+#define ENABLE_LZMA 1
+#else
+#define ENABLE_LZMA 0
+#endif
+
+#if ENABLE_LZMA
+#include "aos/events/logging/lzma_encoder.h"
+#endif
+
DEFINE_int32(flush_size, 128000,
"Number of outstanding bytes to allow before flushing to disk.");
@@ -206,9 +218,16 @@
}
SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
- // Support for other kinds of decoders based on the filename should be added
- // here.
- decoder_ = std::make_unique<DummyDecoder>(filename);
+ static const std::string_view kXz = ".xz";
+ if (filename.substr(filename.size() - kXz.size()) == kXz) {
+#if ENABLE_LZMA
+ decoder_ = std::make_unique<LzmaDecoder>(filename);
+#else
+ LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
+#endif
+ } else {
+ decoder_ = std::make_unique<DummyDecoder>(filename);
+ }
}
absl::Span<const uint8_t> SpanReader::ReadMessage() {
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
new file mode 100644
index 0000000..a8cafda
--- /dev/null
+++ b/aos/events/logging/lzma_encoder.cc
@@ -0,0 +1,194 @@
+#include "aos/events/logging/lzma_encoder.h"
+
+#include "glog/logging.h"
+
+namespace aos::logger {
+namespace {
+
+// Returns if `status` is not an error code, otherwise logs the appropriate
+// error message and crashes.
+void CheckLzmaCodeIsOk(lzma_ret status) {
+ switch (status) {
+ case LZMA_OK:
+ case LZMA_STREAM_END:
+ return;
+ case LZMA_MEM_ERROR:
+ LOG(FATAL) << "Memory allocation failed:" << status;
+ case LZMA_OPTIONS_ERROR:
+ LOG(FATAL) << "The given compression preset or decompression options are "
+ "not supported: "
+ << status;
+ case LZMA_UNSUPPORTED_CHECK:
+ LOG(FATAL) << "The given check type is not supported: " << status;
+ case LZMA_PROG_ERROR:
+ LOG(FATAL) << "One or more of the parameters have values that will never "
+ "be valid: "
+ << status;
+ case LZMA_MEMLIMIT_ERROR:
+ LOG(FATAL) << "Decoder needs more memory than allowed by the specified "
+ "memory usage limit: "
+ << status;
+ case LZMA_FORMAT_ERROR:
+ LOG(FATAL) << "File format not recognized: " << status;
+ case LZMA_DATA_ERROR:
+ LOG(FATAL) << "Compressed file is corrupt: " << status;
+ case LZMA_BUF_ERROR:
+ LOG(FATAL) << "Compressed file is truncated or corrupt: " << status;
+ default:
+ LOG(FATAL) << "Unexpected return value: " << status;
+ }
+}
+
+} // namespace
+
+LzmaEncoder::LzmaEncoder(const uint32_t compression_preset)
+ : stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
+ CHECK_GE(compression_preset_, 0u)
+ << ": Compression preset must be in the range [0, 9].";
+ CHECK_LE(compression_preset_, 9u)
+ << ": Compression preset must be in the range [0, 9].";
+
+ lzma_ret status =
+ lzma_easy_encoder(&stream_, compression_preset_, LZMA_CHECK_CRC64);
+ CheckLzmaCodeIsOk(status);
+ stream_.avail_out = 0;
+ VLOG(2) << "LzmaEncoder: Initialization succeeded.";
+}
+
+LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
+
+void LzmaEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
+ CHECK(in.data()) << ": Encode called with nullptr.";
+
+ stream_.next_in = in.data();
+ stream_.avail_in = in.size();
+
+ RunLzmaCode(LZMA_RUN);
+}
+
+void LzmaEncoder::Finish() { RunLzmaCode(LZMA_FINISH); }
+
+void LzmaEncoder::Clear(const int n) {
+ CHECK_GE(n, 0);
+ CHECK_LE(static_cast<size_t>(n), queue_size());
+ queue_.erase(queue_.begin(), queue_.begin() + n);
+ if (queue_.empty()) {
+ stream_.next_out = nullptr;
+ stream_.avail_out = 0;
+ }
+}
+
+std::vector<absl::Span<const uint8_t>> LzmaEncoder::queue() const {
+ std::vector<absl::Span<const uint8_t>> queue;
+ if (queue_.empty()) {
+ return queue;
+ }
+ for (size_t i = 0; i < queue_.size() - 1; ++i) {
+ queue.emplace_back(
+ absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
+ }
+ // For the last buffer in the queue, we must account for the possibility that
+ // the buffer isn't full yet.
+ queue.emplace_back(absl::MakeConstSpan(
+ queue_.back().data(), queue_.back().size() - stream_.avail_out));
+ return queue;
+}
+
+size_t LzmaEncoder::queued_bytes() const {
+ size_t bytes = queue_size() * kEncodedBufferSizeBytes;
+ // Subtract the bytes that the encoder hasn't filled yet.
+ bytes -= stream_.avail_out;
+ return bytes;
+}
+
+void LzmaEncoder::RunLzmaCode(lzma_action action) {
+ CHECK(!finished_);
+
+ // This is to keep track of how many bytes resulted from encoding this input
+ // buffer.
+ size_t last_avail_out = stream_.avail_out;
+
+ while (stream_.avail_in > 0 || action == LZMA_FINISH) {
+ // If output buffer is full, create a new one, queue it up, and resume
+ // encoding. This could happen in the first call to Encode after
+ // construction or a Reset, or when an input buffer is large enough to fill
+ // more than one output buffer.
+ if (stream_.avail_out == 0) {
+ queue_.emplace_back();
+ queue_.back().resize(kEncodedBufferSizeBytes);
+ stream_.next_out = queue_.back().data();
+ stream_.avail_out = kEncodedBufferSizeBytes;
+ // Update the byte count.
+ total_bytes_ += last_avail_out;
+ last_avail_out = stream_.avail_out;
+ }
+
+ // Encode the data.
+ lzma_ret status = lzma_code(&stream_, action);
+ CheckLzmaCodeIsOk(status);
+ if (action == LZMA_FINISH) {
+ if (status == LZMA_STREAM_END) {
+ // This is returned when lzma_code is all done.
+ finished_ = true;
+ break;
+ }
+ } else {
+ CHECK(status != LZMA_STREAM_END);
+ }
+ VLOG(2) << "LzmaEncoder: Encoded chunk.";
+ }
+
+ // Update the number of resulting encoded bytes.
+ total_bytes_ += last_avail_out - stream_.avail_out;
+}
+
+LzmaDecoder::LzmaDecoder(std::string_view filename)
+ : dummy_decoder_(filename), stream_(LZMA_STREAM_INIT) {
+ compressed_data_.resize(kBufSize);
+
+ lzma_ret status =
+ lzma_stream_decoder(&stream_, UINT64_MAX, LZMA_CONCATENATED);
+ CheckLzmaCodeIsOk(status);
+ stream_.avail_out = 0;
+ VLOG(2) << "LzmaDecoder: Initialization succeeded.";
+}
+
+LzmaDecoder::~LzmaDecoder() { lzma_end(&stream_); }
+
+size_t LzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
+ if (finished_) {
+ return 0;
+ }
+
+ // Write into the given range.
+ stream_.next_out = begin;
+ stream_.avail_out = end - begin;
+ // Keep decompressing until we run out of buffer space.
+ while (stream_.avail_out > 0) {
+ if (action_ == LZMA_RUN && stream_.avail_in == 0) {
+ // Read more bytes from the file if we're all out.
+ const size_t count =
+ dummy_decoder_.Read(compressed_data_.begin(), compressed_data_.end());
+ if (count == 0) {
+ // No more data to read in the file, begin the finishing operation.
+ action_ = LZMA_FINISH;
+ } else {
+ stream_.next_in = compressed_data_.data();
+ stream_.avail_in = count;
+ }
+ }
+ // Decompress the data.
+ const lzma_ret status = lzma_code(&stream_, action_);
+ // Return if we're done.
+ if (status == LZMA_STREAM_END) {
+ CHECK_EQ(action_, LZMA_FINISH)
+ << ": Got LZMA_STREAM_END when action wasn't LZMA_FINISH";
+ finished_ = true;
+ return (end - begin) - stream_.avail_out;
+ }
+ CheckLzmaCodeIsOk(status);
+ }
+ return end - begin;
+}
+
+} // namespace aos::logger
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
new file mode 100644
index 0000000..d7080a9
--- /dev/null
+++ b/aos/events/logging/lzma_encoder.h
@@ -0,0 +1,72 @@
+#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
+#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
+
+#include "absl/types/span.h"
+#include "flatbuffers/flatbuffers.h"
+#include "lzma.h"
+
+#include "aos/containers/resizeable_buffer.h"
+#include "aos/events/logging/buffer_encoder.h"
+#include "aos/events/logging/logger_generated.h"
+
+namespace aos::logger {
+
+// Encodes buffers using liblzma.
+class LzmaEncoder final : public DetachedBufferEncoder {
+ public:
+ // Initializes the LZMA stream and encoder.
+ explicit LzmaEncoder(uint32_t compression_preset);
+ // Gracefully shuts down the encoder.
+ ~LzmaEncoder() final;
+
+ void Encode(flatbuffers::DetachedBuffer &&in) final;
+ void Finish() final;
+ void Clear(int n) final;
+ std::vector<absl::Span<const uint8_t>> queue() const final;
+ size_t queued_bytes() const final;
+ size_t total_bytes() const final { return total_bytes_; }
+ size_t queue_size() const final { return queue_.size(); }
+
+ private:
+ static constexpr size_t kEncodedBufferSizeBytes{1024};
+
+ void RunLzmaCode(lzma_action action);
+
+ lzma_stream stream_;
+ uint32_t compression_preset_;
+ std::vector<ResizeableBuffer> queue_;
+ bool finished_ = false;
+ // Total bytes that resulted from encoding raw data since the last call to
+ // Reset.
+ size_t total_bytes_ = 0;
+};
+
+// Decompresses data with liblzma.
+class LzmaDecoder final : public DataDecoder {
+ public:
+ explicit LzmaDecoder(std::string_view filename);
+ ~LzmaDecoder();
+
+ size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+ // Size of temporary buffer to use.
+ static constexpr size_t kBufSize{256 * 1024};
+
+ // Temporary buffer for storing compressed data.
+ ResizeableBuffer compressed_data_;
+ // Used for reading data from the file.
+ DummyDecoder dummy_decoder_;
+ // Stream for decompression.
+ lzma_stream stream_;
+ // The current action. This is LZMA_RUN until we've run out of data to read
+ // from the file.
+ lzma_action action_ = LZMA_RUN;
+ // Flag that represents whether or not all the data from the file has been
+ // successfully decoded.
+ bool finished_ = false;
+};
+
+} // namespace aos::logger
+
+#endif // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
new file mode 100644
index 0000000..2d619c4
--- /dev/null
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -0,0 +1,18 @@
+#include "aos/events/logging/lzma_encoder.h"
+
+#include "aos/events/logging/buffer_encoder_param_test.h"
+#include "gtest/gtest.h"
+
+namespace aos::logger::testing {
+
+INSTANTIATE_TEST_CASE_P(
+ Lzma, BufferEncoderTest,
+ ::testing::Combine(::testing::Values([]() {
+ return std::make_unique<LzmaEncoder>(2);
+ }),
+ ::testing::Values([](std::string_view filename) {
+ return std::make_unique<LzmaDecoder>(filename);
+ }),
+ ::testing::Range(0, 100)));
+
+} // namespace aos::logger::testing
diff --git a/debian/BUILD b/debian/BUILD
index 5bebf04..6f955ae 100644
--- a/debian/BUILD
+++ b/debian/BUILD
@@ -65,10 +65,18 @@
gstreamer_armhf_debs = "files",
)
load(
- "//debian:m4.bzl",
+ ":m4.bzl",
m4_debs = "files",
)
-load("//debian:packages.bzl", "download_packages", "generate_deb_tarball")
+load(
+ ":lzma_amd64.bzl",
+ lzma_amd64_debs = "files",
+)
+load(
+ ":lzma_arm64.bzl",
+ lzma_arm64_debs = "files",
+)
+load(":packages.bzl", "download_packages", "generate_deb_tarball")
filegroup(
name = "matplotlib_patches",
@@ -340,6 +348,23 @@
files = m4_debs,
)
+download_packages(
+ name = "download_lzma",
+ packages = [
+ "liblzma-dev",
+ ],
+)
+
+generate_deb_tarball(
+ name = "lzma_amd64",
+ files = lzma_amd64_debs,
+)
+
+generate_deb_tarball(
+ name = "lzma_arm64",
+ files = lzma_arm64_debs,
+)
+
exports_files([
"ssh_wrapper.sh",
])
diff --git a/debian/lzma_amd64.bzl b/debian/lzma_amd64.bzl
new file mode 100644
index 0000000..1dbfa38
--- /dev/null
+++ b/debian/lzma_amd64.bzl
@@ -0,0 +1,3 @@
+files = {
+ "liblzma-dev_5.2.4-1_amd64.deb": "8373145ed28563aff33b116c434399347ab1fa77ba672d8d94151dd8f68c8ece",
+}
diff --git a/debian/lzma_arm64.bzl b/debian/lzma_arm64.bzl
new file mode 100644
index 0000000..a38d900
--- /dev/null
+++ b/debian/lzma_arm64.bzl
@@ -0,0 +1,3 @@
+files = {
+ "liblzma-dev_5.2.2-1.3_arm64.deb": "eb730c11f65c9bd10ae6c39b5a461e16294c21a423c9a8bc4011390798387e82",
+}
diff --git a/third_party/BUILD b/third_party/BUILD
index bcba978..2f22b0e 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -113,3 +113,15 @@
"roborio": ["@webrtc_rio//:webrtc"],
}),
)
+
+# TODO(Brian): Properly restrict this to specific platforms and remove the
+# default deps once we have support for that which interacts with select properly.
+cc_library(
+ name = "lzma",
+ visibility = ["//visibility:public"],
+ deps = select({
+ "//tools:cpu_k8": ["@lzma_amd64//:lib"],
+ "//tools:cpu_arm64": ["@lzma_arm64//:lib"],
+ "//conditions:default": [],
+ }),
+)
diff --git a/tools/BUILD b/tools/BUILD
index ac76ee9..d77abf8 100644
--- a/tools/BUILD
+++ b/tools/BUILD
@@ -49,6 +49,11 @@
)
config_setting(
+ name = "cpu_arm64",
+ values = {"cpu": "arm64"},
+)
+
+config_setting(
name = "has_asan",
values = {"define": "have_asan=true"},
)
@@ -75,11 +80,14 @@
environment(name = "cortex-m4f-k22")
+environment(name = "arm64")
+
environment_group(
name = "cpus",
defaults = [
":k8",
":roborio",
+ ":arm64",
":armhf-debian",
],
environments = [
@@ -89,5 +97,6 @@
":armhf-debian",
":cortex-m4f",
":cortex-m4f-k22",
+ ":arm64",
],
)