Run log decompression off the main thread.
Change-Id: Ief21b4cabb64b5375800712af6ef6024b7b8d4c2
Signed-off-by: Tyler Chatow <tchatow@gmail.com>
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index 919f5fa..972ed6c 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -1,13 +1,16 @@
#ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
#define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
-#include "absl/types/span.h"
-#include "flatbuffers/flatbuffers.h"
-#include "lzma.h"
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+#include "absl/types/span.h"
#include "aos/containers/resizeable_buffer.h"
#include "aos/events/logging/buffer_encoder.h"
#include "aos/events/logging/logger_generated.h"
+#include "flatbuffers/flatbuffers.h"
+#include "lzma.h"
namespace aos::logger {
@@ -78,6 +81,38 @@
std::string filename_;
};
+// Decompresses data with liblzma in a new thread, up to a maximum queue
+// size. Calls to Read() will return data from the queue if available,
+// or block until more data is queued or the stream finishes.
+class ThreadedLzmaDecoder : public DataDecoder {
+ public:
+ explicit ThreadedLzmaDecoder(std::string_view filename);
+ ThreadedLzmaDecoder(const ThreadedLzmaDecoder &) = delete;
+ ThreadedLzmaDecoder &operator=(const ThreadedLzmaDecoder &) = delete;
+
+ ~ThreadedLzmaDecoder();
+
+ size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+ static constexpr size_t kBufSize{256 * 1024};
+ static constexpr size_t kQueueSize{8};
+
+ LzmaDecoder decoder_;
+
+ // Queue of decompressed data to return on calls to Read
+ std::vector<ResizeableBuffer> decoded_queue_;
+
+ // Mutex to control access to decoded_queue_.
+ std::mutex decode_mutex_;
+ std::condition_variable continue_decoding_;
+ std::condition_variable queue_filled_;
+
+ bool finished_ = false;
+
+ std::thread decode_thread_;
+};
+
} // namespace aos::logger
#endif // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_