Add code to xz-compress log files
Change-Id: I2e00c8aa316f5907f7f9bc1a4653b1f7c793a876
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 3f43e07..573b9aa 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -33,7 +33,11 @@
"@com_github_google_flatbuffers//:flatbuffers",
"@com_github_google_glog//:glog",
"@com_google_absl//absl/types:span",
- ],
+ ] + select({
+ "//tools:cpu_k8": [":lzma_encoder"],
+ "//tools:cpu_arm64": [":lzma_encoder"],
+ "//conditions:default": [],
+ }),
)
cc_library(
@@ -68,6 +72,60 @@
],
)
+# TODO(Brian): Properly restrict this to specific platforms and
+# un-conditionalize the srcs and hdrs once we have support for that which
+# interacts with select properly.
+cc_library(
+ name = "lzma_encoder",
+ srcs = select({
+ "//tools:cpu_k8": [
+ "lzma_encoder.cc",
+ ],
+ "//tools:cpu_arm64": [
+ "lzma_encoder.cc",
+ ],
+ "//conditions:default": [],
+ }),
+ hdrs = select({
+ "//tools:cpu_k8": [
+ "lzma_encoder.h",
+ ],
+ "//tools:cpu_arm64": [
+ "lzma_encoder.h",
+ ],
+ "//conditions:default": [],
+ }),
+ visibility = ["//visibility:public"],
+ deps = [
+ ":buffer_encoder",
+ ":logger_fbs",
+ "//aos:configuration_fbs",
+ "//aos/containers:resizeable_buffer",
+ "//third_party:lzma",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ "@com_github_google_glog//:glog",
+ "@com_google_absl//absl/types:span",
+ ],
+)
+
+cc_test(
+ name = "lzma_encoder_test",
+ srcs = [
+ "lzma_encoder_test.cc",
+ ],
+ restricted_to = [
+ "//tools:k8",
+ "//tools:arm64",
+ ],
+ shard_count = 4,
+ deps = [
+ ":buffer_encoder_param_test",
+ ":lzma_encoder",
+ "//aos/testing:googletest",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
cc_library(
name = "buffer_encoder_param_test",
testonly = True,
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 9d8145d..7ddc658 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -16,6 +16,18 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
+#if defined(__x86_64__)
+#define ENABLE_LZMA 1
+#elif defined(__aarch64__)
+#define ENABLE_LZMA 1
+#else
+#define ENABLE_LZMA 0
+#endif
+
+#if ENABLE_LZMA
+#include "aos/events/logging/lzma_encoder.h"
+#endif
+
DEFINE_int32(flush_size, 128000,
"Number of outstanding bytes to allow before flushing to disk.");
@@ -206,9 +218,16 @@
}
SpanReader::SpanReader(std::string_view filename) : filename_(filename) {
- // Support for other kinds of decoders based on the filename should be added
- // here.
- decoder_ = std::make_unique<DummyDecoder>(filename);
+ static const std::string_view kXz = ".xz";
+ if (filename.substr(filename.size() - kXz.size()) == kXz) {
+#if ENABLE_LZMA
+ decoder_ = std::make_unique<LzmaDecoder>(filename);
+#else
+ LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
+#endif
+ } else {
+ decoder_ = std::make_unique<DummyDecoder>(filename);
+ }
}
absl::Span<const uint8_t> SpanReader::ReadMessage() {
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
new file mode 100644
index 0000000..a8cafda
--- /dev/null
+++ b/aos/events/logging/lzma_encoder.cc
@@ -0,0 +1,194 @@
+#include "aos/events/logging/lzma_encoder.h"
+
+#include "glog/logging.h"
+
+namespace aos::logger {
+namespace {
+
+// Returns if `status` is not an error code, otherwise logs the appropriate
+// error message and crashes.
+void CheckLzmaCodeIsOk(lzma_ret status) {
+ switch (status) {
+ case LZMA_OK:
+ case LZMA_STREAM_END:
+ return;
+ case LZMA_MEM_ERROR:
+ LOG(FATAL) << "Memory allocation failed:" << status;
+ case LZMA_OPTIONS_ERROR:
+ LOG(FATAL) << "The given compression preset or decompression options are "
+ "not supported: "
+ << status;
+ case LZMA_UNSUPPORTED_CHECK:
+ LOG(FATAL) << "The given check type is not supported: " << status;
+ case LZMA_PROG_ERROR:
+ LOG(FATAL) << "One or more of the parameters have values that will never "
+ "be valid: "
+ << status;
+ case LZMA_MEMLIMIT_ERROR:
+ LOG(FATAL) << "Decoder needs more memory than allowed by the specified "
+ "memory usage limit: "
+ << status;
+ case LZMA_FORMAT_ERROR:
+ LOG(FATAL) << "File format not recognized: " << status;
+ case LZMA_DATA_ERROR:
+ LOG(FATAL) << "Compressed file is corrupt: " << status;
+ case LZMA_BUF_ERROR:
+ LOG(FATAL) << "Compressed file is truncated or corrupt: " << status;
+ default:
+ LOG(FATAL) << "Unexpected return value: " << status;
+ }
+}
+
+} // namespace
+
+LzmaEncoder::LzmaEncoder(const uint32_t compression_preset)
+ : stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
+ CHECK_GE(compression_preset_, 0u)
+ << ": Compression preset must be in the range [0, 9].";
+ CHECK_LE(compression_preset_, 9u)
+ << ": Compression preset must be in the range [0, 9].";
+
+ lzma_ret status =
+ lzma_easy_encoder(&stream_, compression_preset_, LZMA_CHECK_CRC64);
+ CheckLzmaCodeIsOk(status);
+ stream_.avail_out = 0;
+ VLOG(2) << "LzmaEncoder: Initialization succeeded.";
+}
+
+LzmaEncoder::~LzmaEncoder() { lzma_end(&stream_); }
+
+void LzmaEncoder::Encode(flatbuffers::DetachedBuffer &&in) {
+ CHECK(in.data()) << ": Encode called with nullptr.";
+
+ stream_.next_in = in.data();
+ stream_.avail_in = in.size();
+
+ RunLzmaCode(LZMA_RUN);
+}
+
+void LzmaEncoder::Finish() { RunLzmaCode(LZMA_FINISH); }
+
+void LzmaEncoder::Clear(const int n) {
+ CHECK_GE(n, 0);
+ CHECK_LE(static_cast<size_t>(n), queue_size());
+ queue_.erase(queue_.begin(), queue_.begin() + n);
+ if (queue_.empty()) {
+ stream_.next_out = nullptr;
+ stream_.avail_out = 0;
+ }
+}
+
+std::vector<absl::Span<const uint8_t>> LzmaEncoder::queue() const {
+ std::vector<absl::Span<const uint8_t>> queue;
+ if (queue_.empty()) {
+ return queue;
+ }
+ for (size_t i = 0; i < queue_.size() - 1; ++i) {
+ queue.emplace_back(
+ absl::MakeConstSpan(queue_.at(i).data(), queue_.at(i).size()));
+ }
+ // For the last buffer in the queue, we must account for the possibility that
+ // the buffer isn't full yet.
+ queue.emplace_back(absl::MakeConstSpan(
+ queue_.back().data(), queue_.back().size() - stream_.avail_out));
+ return queue;
+}
+
+size_t LzmaEncoder::queued_bytes() const {
+ size_t bytes = queue_size() * kEncodedBufferSizeBytes;
+ // Subtract the bytes that the encoder hasn't filled yet.
+ bytes -= stream_.avail_out;
+ return bytes;
+}
+
+void LzmaEncoder::RunLzmaCode(lzma_action action) {
+ CHECK(!finished_);
+
+ // This is to keep track of how many bytes resulted from encoding this input
+ // buffer.
+ size_t last_avail_out = stream_.avail_out;
+
+ while (stream_.avail_in > 0 || action == LZMA_FINISH) {
+ // If output buffer is full, create a new one, queue it up, and resume
+ // encoding. This could happen in the first call to Encode after
+ // construction or a Reset, or when an input buffer is large enough to fill
+ // more than one output buffer.
+ if (stream_.avail_out == 0) {
+ queue_.emplace_back();
+ queue_.back().resize(kEncodedBufferSizeBytes);
+ stream_.next_out = queue_.back().data();
+ stream_.avail_out = kEncodedBufferSizeBytes;
+ // Update the byte count.
+ total_bytes_ += last_avail_out;
+ last_avail_out = stream_.avail_out;
+ }
+
+ // Encode the data.
+ lzma_ret status = lzma_code(&stream_, action);
+ CheckLzmaCodeIsOk(status);
+ if (action == LZMA_FINISH) {
+ if (status == LZMA_STREAM_END) {
+ // This is returned when lzma_code is all done.
+ finished_ = true;
+ break;
+ }
+ } else {
+ CHECK(status != LZMA_STREAM_END);
+ }
+ VLOG(2) << "LzmaEncoder: Encoded chunk.";
+ }
+
+ // Update the number of resulting encoded bytes.
+ total_bytes_ += last_avail_out - stream_.avail_out;
+}
+
+LzmaDecoder::LzmaDecoder(std::string_view filename)
+ : dummy_decoder_(filename), stream_(LZMA_STREAM_INIT) {
+ compressed_data_.resize(kBufSize);
+
+ lzma_ret status =
+ lzma_stream_decoder(&stream_, UINT64_MAX, LZMA_CONCATENATED);
+ CheckLzmaCodeIsOk(status);
+ stream_.avail_out = 0;
+ VLOG(2) << "LzmaDecoder: Initialization succeeded.";
+}
+
+LzmaDecoder::~LzmaDecoder() { lzma_end(&stream_); }
+
+size_t LzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
+ if (finished_) {
+ return 0;
+ }
+
+ // Write into the given range.
+ stream_.next_out = begin;
+ stream_.avail_out = end - begin;
+ // Keep decompressing until we run out of buffer space.
+ while (stream_.avail_out > 0) {
+ if (action_ == LZMA_RUN && stream_.avail_in == 0) {
+ // Read more bytes from the file if we're all out.
+ const size_t count =
+ dummy_decoder_.Read(compressed_data_.begin(), compressed_data_.end());
+ if (count == 0) {
+ // No more data to read in the file, begin the finishing operation.
+ action_ = LZMA_FINISH;
+ } else {
+ stream_.next_in = compressed_data_.data();
+ stream_.avail_in = count;
+ }
+ }
+ // Decompress the data.
+ const lzma_ret status = lzma_code(&stream_, action_);
+ // Return if we're done.
+ if (status == LZMA_STREAM_END) {
+ CHECK_EQ(action_, LZMA_FINISH)
+ << ": Got LZMA_STREAM_END when action wasn't LZMA_FINISH";
+ finished_ = true;
+ return (end - begin) - stream_.avail_out;
+ }
+ CheckLzmaCodeIsOk(status);
+ }
+ return end - begin;
+}
+
+} // namespace aos::logger
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
new file mode 100644
index 0000000..d7080a9
--- /dev/null
+++ b/aos/events/logging/lzma_encoder.h
@@ -0,0 +1,72 @@
+#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
+#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
+
+#include "absl/types/span.h"
+#include "flatbuffers/flatbuffers.h"
+#include "lzma.h"
+
+#include "aos/containers/resizeable_buffer.h"
+#include "aos/events/logging/buffer_encoder.h"
+#include "aos/events/logging/logger_generated.h"
+
+namespace aos::logger {
+
+// Encodes buffers using liblzma.
+class LzmaEncoder final : public DetachedBufferEncoder {
+ public:
+ // Initializes the LZMA stream and encoder.
+ explicit LzmaEncoder(uint32_t compression_preset);
+ // Gracefully shuts down the encoder.
+ ~LzmaEncoder() final;
+
+ void Encode(flatbuffers::DetachedBuffer &&in) final;
+ void Finish() final;
+ void Clear(int n) final;
+ std::vector<absl::Span<const uint8_t>> queue() const final;
+ size_t queued_bytes() const final;
+ size_t total_bytes() const final { return total_bytes_; }
+ size_t queue_size() const final { return queue_.size(); }
+
+ private:
+ static constexpr size_t kEncodedBufferSizeBytes{1024};
+
+ void RunLzmaCode(lzma_action action);
+
+ lzma_stream stream_;
+ uint32_t compression_preset_;
+ std::vector<ResizeableBuffer> queue_;
+ bool finished_ = false;
+ // Total bytes that resulted from encoding raw data since the last call to
+ // Reset.
+ size_t total_bytes_ = 0;
+};
+
+// Decompresses data with liblzma.
+class LzmaDecoder final : public DataDecoder {
+ public:
+ explicit LzmaDecoder(std::string_view filename);
+ ~LzmaDecoder();
+
+ size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+ // Size of temporary buffer to use.
+ static constexpr size_t kBufSize{256 * 1024};
+
+ // Temporary buffer for storing compressed data.
+ ResizeableBuffer compressed_data_;
+ // Used for reading data from the file.
+ DummyDecoder dummy_decoder_;
+ // Stream for decompression.
+ lzma_stream stream_;
+ // The current action. This is LZMA_RUN until we've run out of data to read
+ // from the file.
+ lzma_action action_ = LZMA_RUN;
+ // Flag that represents whether or not all the data from the file has been
+ // successfully decoded.
+ bool finished_ = false;
+};
+
+} // namespace aos::logger
+
+#endif // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
new file mode 100644
index 0000000..2d619c4
--- /dev/null
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -0,0 +1,18 @@
+#include "aos/events/logging/lzma_encoder.h"
+
+#include "aos/events/logging/buffer_encoder_param_test.h"
+#include "gtest/gtest.h"
+
+namespace aos::logger::testing {
+
+INSTANTIATE_TEST_CASE_P(
+ Lzma, BufferEncoderTest,
+ ::testing::Combine(::testing::Values([]() {
+ return std::make_unique<LzmaEncoder>(2);
+ }),
+ ::testing::Values([](std::string_view filename) {
+ return std::make_unique<LzmaDecoder>(filename);
+ }),
+ ::testing::Range(0, 100)));
+
+} // namespace aos::logger::testing