Run log decompression off the main thread.

Change-Id: Ief21b4cabb64b5375800712af6ef6024b7b8d4c2
Signed-off-by: Tyler Chatow <tchatow@gmail.com>
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index 54dc6f7..60a203e 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -205,4 +205,102 @@
   return end - begin;
 }
 
+ThreadedLzmaDecoder::ThreadedLzmaDecoder(std::string_view filename)
+    : decoder_(filename), decode_thread_([this] {
+        std::unique_lock lock(decode_mutex_);
+        while (true) {
+          // Wake if the queue is too small or we are finished.
+          continue_decoding_.wait(lock, [this] {
+            return decoded_queue_.size() < kQueueSize || finished_;
+          });
+
+          if (finished_) {
+            return;
+          }
+
+          while (true) {
+            CHECK(!finished_);
+            // Release our lock on the queue before doing decompression work.
+            lock.unlock();
+
+            ResizeableBuffer buffer;
+            buffer.resize(kBufSize);
+
+            const size_t bytes_read =
+                decoder_.Read(buffer.begin(), buffer.end());
+            buffer.resize(bytes_read);
+
+            // Relock the queue and move the new buffer to the end. This should
+            // be fast. We also need to stay locked when we wait().
+            lock.lock();
+            if (bytes_read > 0) {
+              decoded_queue_.emplace_back(std::move(buffer));
+            } else {
+              finished_ = true;
+            }
+
+            // If we've filled the queue or are out of data, go back to sleep.
+            if (decoded_queue_.size() >= kQueueSize || finished_) {
+              break;
+            }
+          }
+
+          // Notify main thread in case it was waiting for us to queue more
+          // data.
+          queue_filled_.notify_one();
+        }
+      }) {}
+
+ThreadedLzmaDecoder::~ThreadedLzmaDecoder() {
+  // Wake up decode thread so it can return.
+  {
+    std::scoped_lock lock(decode_mutex_);
+    finished_ = true;
+  }
+  continue_decoding_.notify_one();
+  decode_thread_.join();
+}
+
+size_t ThreadedLzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
+  std::unique_lock lock(decode_mutex_);
+
+  // Strip any empty buffers
+  for (auto iter = decoded_queue_.begin(); iter != decoded_queue_.end();) {
+    if (iter->size() == 0) {
+      iter = decoded_queue_.erase(iter);
+    } else {
+      ++iter;
+    }
+  }
+
+  // If the queue is empty, sleep until the decoder thread has produced another
+  // buffer.
+  if (decoded_queue_.empty()) {
+    continue_decoding_.notify_one();
+    queue_filled_.wait(lock,
+                       [this] { return finished_ || !decoded_queue_.empty(); });
+    if (finished_ && decoded_queue_.empty()) {
+      return 0;
+    }
+  }
+  // Sanity check if the queue is empty and we're not finished.
+  CHECK(!decoded_queue_.empty()) << "Decoded queue unexpectedly empty";
+
+  ResizeableBuffer &front_buffer = decoded_queue_.front();
+
+  // Copy some data from our working buffer to the requested destination.
+  const std::size_t bytes_requested = end - begin;
+  const std::size_t bytes_to_copy =
+      std::min(bytes_requested, front_buffer.size());
+  memcpy(begin, front_buffer.data(), bytes_to_copy);
+  front_buffer.erase_front(bytes_to_copy);
+
+  // Ensure the decoding thread wakes up if the queue isn't full.
+  if (!finished_ && decoded_queue_.size() < kQueueSize) {
+    continue_decoding_.notify_one();
+  }
+
+  return bytes_to_copy;
+}
+
 }  // namespace aos::logger