Run log decompression off the main thread.

Change-Id: Ief21b4cabb64b5375800712af6ef6024b7b8d4c2
Signed-off-by: Tyler Chatow <tchatow@gmail.com>
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index 919f5fa..972ed6c 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -1,13 +1,16 @@
 #ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
 #define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
 
-#include "absl/types/span.h"
-#include "flatbuffers/flatbuffers.h"
-#include "lzma.h"
+#include <condition_variable>
+#include <mutex>
+#include <thread>
 
+#include "absl/types/span.h"
 #include "aos/containers/resizeable_buffer.h"
 #include "aos/events/logging/buffer_encoder.h"
 #include "aos/events/logging/logger_generated.h"
+#include "flatbuffers/flatbuffers.h"
+#include "lzma.h"
 
 namespace aos::logger {
 
@@ -78,6 +81,38 @@
   std::string filename_;
 };
 
+// Decompresses data with liblzma in a new thread, up to a maximum queue
+// size. Calls to Read() will return data from the queue if available,
+// or block until more data is queued or the stream finishes.
+class ThreadedLzmaDecoder : public DataDecoder {
+ public:
+  explicit ThreadedLzmaDecoder(std::string_view filename);
+  ThreadedLzmaDecoder(const ThreadedLzmaDecoder &) = delete;
+  ThreadedLzmaDecoder &operator=(const ThreadedLzmaDecoder &) = delete;
+
+  ~ThreadedLzmaDecoder();
+
+  size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+  static constexpr size_t kBufSize{256 * 1024};
+  static constexpr size_t kQueueSize{8};
+
+  LzmaDecoder decoder_;
+
+  // Queue of decompressed data to return on calls to Read
+  std::vector<ResizeableBuffer> decoded_queue_;
+
+  // Mutex to control access to decoded_queue_.
+  std::mutex decode_mutex_;
+  std::condition_variable continue_decoding_;
+  std::condition_variable queue_filled_;
+
+  bool finished_ = false;
+
+  std::thread decode_thread_;
+};
+
 }  // namespace aos::logger
 
 #endif  // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_