Run log decompression off the main thread.
Change-Id: Ief21b4cabb64b5375800712af6ef6024b7b8d4c2
Signed-off-by: Tyler Chatow <tchatow@gmail.com>
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 5f0e372..ca92f0f 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -316,7 +316,7 @@
static const std::string_view kXz = ".xz";
if (filename.substr(filename.size() - kXz.size()) == kXz) {
#if ENABLE_LZMA
- decoder_ = std::make_unique<LzmaDecoder>(filename);
+ decoder_ = std::make_unique<ThreadedLzmaDecoder>(filename);
#else
LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
#endif
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index 54dc6f7..60a203e 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -205,4 +205,102 @@
return end - begin;
}
+ThreadedLzmaDecoder::ThreadedLzmaDecoder(std::string_view filename)
+ : decoder_(filename), decode_thread_([this] {
+ std::unique_lock lock(decode_mutex_);
+ while (true) {
+ // Wake if the queue is too small or we are finished.
+ continue_decoding_.wait(lock, [this] {
+ return decoded_queue_.size() < kQueueSize || finished_;
+ });
+
+ if (finished_) {
+ return;
+ }
+
+ while (true) {
+ CHECK(!finished_);
+ // Release our lock on the queue before doing decompression work.
+ lock.unlock();
+
+ ResizeableBuffer buffer;
+ buffer.resize(kBufSize);
+
+ const size_t bytes_read =
+ decoder_.Read(buffer.begin(), buffer.end());
+ buffer.resize(bytes_read);
+
+ // Relock the queue and move the new buffer to the end. This should
+ // be fast. We also need to stay locked when we wait().
+ lock.lock();
+ if (bytes_read > 0) {
+ decoded_queue_.emplace_back(std::move(buffer));
+ } else {
+ finished_ = true;
+ }
+
+ // If we've filled the queue or are out of data, go back to sleep.
+ if (decoded_queue_.size() >= kQueueSize || finished_) {
+ break;
+ }
+ }
+
+ // Notify main thread in case it was waiting for us to queue more
+ // data.
+ queue_filled_.notify_one();
+ }
+ }) {}
+
+ThreadedLzmaDecoder::~ThreadedLzmaDecoder() {
+ // Wake up decode thread so it can return.
+ {
+ std::scoped_lock lock(decode_mutex_);
+ finished_ = true;
+ }
+ continue_decoding_.notify_one();
+ decode_thread_.join();
+}
+
+size_t ThreadedLzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
+ std::unique_lock lock(decode_mutex_);
+
+ // Strip any empty buffers
+ for (auto iter = decoded_queue_.begin(); iter != decoded_queue_.end();) {
+ if (iter->size() == 0) {
+ iter = decoded_queue_.erase(iter);
+ } else {
+ ++iter;
+ }
+ }
+
+ // If the queue is empty, sleep until the decoder thread has produced another
+ // buffer.
+ if (decoded_queue_.empty()) {
+ continue_decoding_.notify_one();
+ queue_filled_.wait(lock,
+ [this] { return finished_ || !decoded_queue_.empty(); });
+ if (finished_ && decoded_queue_.empty()) {
+ return 0;
+ }
+ }
+ // Sanity check if the queue is empty and we're not finished.
+ CHECK(!decoded_queue_.empty()) << "Decoded queue unexpectedly empty";
+
+ ResizeableBuffer &front_buffer = decoded_queue_.front();
+
+ // Copy some data from our working buffer to the requested destination.
+ const std::size_t bytes_requested = end - begin;
+ const std::size_t bytes_to_copy =
+ std::min(bytes_requested, front_buffer.size());
+ memcpy(begin, front_buffer.data(), bytes_to_copy);
+ front_buffer.erase_front(bytes_to_copy);
+
+ // Ensure the decoding thread wakes up if the queue isn't full.
+ if (!finished_ && decoded_queue_.size() < kQueueSize) {
+ continue_decoding_.notify_one();
+ }
+
+ return bytes_to_copy;
+}
+
} // namespace aos::logger
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index 919f5fa..972ed6c 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -1,13 +1,16 @@
#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
-#include "absl/types/span.h"
-#include "flatbuffers/flatbuffers.h"
-#include "lzma.h"
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+#include "absl/types/span.h"
#include "aos/containers/resizeable_buffer.h"
#include "aos/events/logging/buffer_encoder.h"
#include "aos/events/logging/logger_generated.h"
+#include "flatbuffers/flatbuffers.h"
+#include "lzma.h"
namespace aos::logger {
@@ -78,6 +81,38 @@
std::string filename_;
};
+// Decompresses data with liblzma in a new thread, up to a maximum queue
+// size. Calls to Read() will return data from the queue if available,
+// or block until more data is queued or the stream finishes.
+class ThreadedLzmaDecoder : public DataDecoder {
+ public:
+ explicit ThreadedLzmaDecoder(std::string_view filename);
+ ThreadedLzmaDecoder(const ThreadedLzmaDecoder &) = delete;
+ ThreadedLzmaDecoder &operator=(const ThreadedLzmaDecoder &) = delete;
+
+ ~ThreadedLzmaDecoder();
+
+ size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+ static constexpr size_t kBufSize{256 * 1024};
+ static constexpr size_t kQueueSize{8};
+
+ LzmaDecoder decoder_;
+
+ // Queue of decompressed data to return on calls to Read
+ std::vector<ResizeableBuffer> decoded_queue_;
+
+ // Mutex to control access to decoded_queue_.
+ std::mutex decode_mutex_;
+ std::condition_variable continue_decoding_;
+ std::condition_variable queue_filled_;
+
+ bool finished_ = false;
+
+ std::thread decode_thread_;
+};
+
} // namespace aos::logger
#endif // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
index 63ed6c2..bbd0c60 100644
--- a/aos/events/logging/lzma_encoder_test.cc
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -17,6 +17,16 @@
}),
::testing::Range(0, 100)));
+INSTANTIATE_TEST_SUITE_P(
+ LzmaThreaded, BufferEncoderTest,
+ ::testing::Combine(::testing::Values([]() {
+ return std::make_unique<LzmaEncoder>(2);
+ }),
+ ::testing::Values([](std::string_view filename) {
+ return std::make_unique<ThreadedLzmaDecoder>(filename);
+ }),
+ ::testing::Range(0, 100)));
+
// Tests that we return as much of the file as we can read if the end is
// corrupted.
TEST_F(BufferEncoderBaseTest, CorruptedBuffer) {