Add support for multithreaded lzma encoder
liblzma already has support for a multithreaded encoder. The plumbing
needed to enable it is pretty small, and can be done at runtime. This
sets us up to compress more than 1 core of data, potentially.
I did some basic profiling testing, and the block size should be left at
the default for optimal performance. I made it configurable otherwise
it is so big that the tests don't pass. Decreasing the block size
increases overhead and decreases performance significantly. The timeout
parameter seems to have little effect for reasonable workloads.
Change-Id: Ia886bc8576db19f1b59a76b41dc7cdc2e85d52bd
Signed-off-by: Austin Schuh <austin.schuh@bluerivertech.com>
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index a4de942..b00d2ad 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -187,7 +187,7 @@
srcs = [
"lzma_encoder_test.cc",
],
- shard_count = 4,
+ shard_count = 8,
target_compatible_with = ["@platforms//os:linux"],
deps = [
":buffer_encoder_param_test",
diff --git a/aos/events/logging/buffer_encoder_param_test.cc b/aos/events/logging/buffer_encoder_param_test.cc
index e464a52..7c9bb4b 100644
--- a/aos/events/logging/buffer_encoder_param_test.cc
+++ b/aos/events/logging/buffer_encoder_param_test.cc
@@ -69,7 +69,7 @@
const size_t read_result =
decoder->Read(chunk.data(), chunk.data() + chunk_size);
if (read_result + total_decoded_size != total_encoded_size) {
- // We didn't read anything, so we should've read the complete chunk.
+ // We didn't read everything, so we should've read the complete chunk.
ASSERT_EQ(read_result, chunk_size)
<< "Read " << read_result + total_decoded_size << " of "
<< total_encoded_size << " expected bytes.";
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index f32d7e2..5cb9897 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -2,6 +2,8 @@
#include "glog/logging.h"
+DEFINE_int32(lzma_threads, 1, "Number of threads to use for encoding");
+
namespace aos::logger {
namespace {
@@ -48,16 +50,32 @@
} // namespace
-LzmaEncoder::LzmaEncoder(const uint32_t compression_preset)
+LzmaEncoder::LzmaEncoder(const uint32_t compression_preset, size_t block_size)
: stream_(LZMA_STREAM_INIT), compression_preset_(compression_preset) {
CHECK_GE(compression_preset_, 0u)
<< ": Compression preset must be in the range [0, 9].";
CHECK_LE(compression_preset_, 9u)
<< ": Compression preset must be in the range [0, 9].";
- lzma_ret status =
- lzma_easy_encoder(&stream_, compression_preset_, LZMA_CHECK_CRC64);
- CHECK(LzmaCodeIsOk(status));
+ if (FLAGS_lzma_threads <= 1) {
+ lzma_ret status =
+ lzma_easy_encoder(&stream_, compression_preset_, LZMA_CHECK_CRC64);
+ CHECK(LzmaCodeIsOk(status));
+ } else {
+ lzma_mt mt_options;
+ memset(&mt_options, 0, sizeof(mt_options));
+ mt_options.threads = FLAGS_lzma_threads;
+ mt_options.block_size = block_size;
+ // Compress for at most 100 ms before relinquishing control back to the main
+ // thread.
+ mt_options.timeout = 100;
+ mt_options.preset = compression_preset_;
+ mt_options.filters = nullptr;
+ mt_options.check = LZMA_CHECK_CRC64;
+ lzma_ret status = lzma_stream_encoder_mt(&stream_, &mt_options);
+ CHECK(LzmaCodeIsOk(status));
+ }
+
stream_.avail_out = 0;
VLOG(2) << "LzmaEncoder: Initialization succeeded.";
}
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index 4edd0e8..14c00eb 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -18,8 +18,10 @@
// Encodes buffers using liblzma.
class LzmaEncoder final : public DetachedBufferEncoder {
public:
- // Initializes the LZMA stream and encoder.
- explicit LzmaEncoder(uint32_t compression_preset);
+ // Initializes the LZMA stream and encoder. The block size is the block size
+ // used by the multithreaded encoder for batching. A block size of 0 tells
+ // lzma to pick it's favorite block size.
+ explicit LzmaEncoder(uint32_t compression_preset, size_t block_size = 0);
LzmaEncoder(const LzmaEncoder &) = delete;
LzmaEncoder(LzmaEncoder &&other) = delete;
LzmaEncoder &operator=(const LzmaEncoder &) = delete;
@@ -36,7 +38,7 @@
size_t queue_size() const final { return queue_.size(); }
private:
- static constexpr size_t kEncodedBufferSizeBytes{4096};
+ static constexpr size_t kEncodedBufferSizeBytes{4096 * 10};
void RunLzmaCode(lzma_action action);
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
index bbd0c60..1b8c895 100644
--- a/aos/events/logging/lzma_encoder_test.cc
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -5,12 +5,37 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
+DECLARE_int32(lzma_threads);
+
namespace aos::logger::testing {
INSTANTIATE_TEST_SUITE_P(
+ MtLzma, BufferEncoderTest,
+ ::testing::Combine(::testing::Values([]() {
+ FLAGS_lzma_threads = 3;
+ return std::make_unique<LzmaEncoder>(2, 4096);
+ }),
+ ::testing::Values([](std::string_view filename) {
+ return std::make_unique<LzmaDecoder>(filename);
+ }),
+ ::testing::Range(0, 100)));
+
+INSTANTIATE_TEST_SUITE_P(
+ MtLzmaThreaded, BufferEncoderTest,
+ ::testing::Combine(::testing::Values([]() {
+ FLAGS_lzma_threads = 3;
+ return std::make_unique<LzmaEncoder>(5, 4096);
+ }),
+ ::testing::Values([](std::string_view filename) {
+ return std::make_unique<ThreadedLzmaDecoder>(filename);
+ }),
+ ::testing::Range(0, 100)));
+
+INSTANTIATE_TEST_SUITE_P(
Lzma, BufferEncoderTest,
::testing::Combine(::testing::Values([]() {
- return std::make_unique<LzmaEncoder>(2);
+ FLAGS_lzma_threads = 1;
+ return std::make_unique<LzmaEncoder>(2, 4096);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<LzmaDecoder>(filename);
@@ -20,7 +45,8 @@
INSTANTIATE_TEST_SUITE_P(
LzmaThreaded, BufferEncoderTest,
::testing::Combine(::testing::Values([]() {
- return std::make_unique<LzmaEncoder>(2);
+ FLAGS_lzma_threads = 1;
+ return std::make_unique<LzmaEncoder>(5, 4096);
}),
::testing::Values([](std::string_view filename) {
return std::make_unique<ThreadedLzmaDecoder>(filename);