Merge "Don't queue up the remote when it doesn't exist."
diff --git a/aos/events/logging/logfile_utils.cc b/aos/events/logging/logfile_utils.cc
index 4ca4453..7600c7e 100644
--- a/aos/events/logging/logfile_utils.cc
+++ b/aos/events/logging/logfile_utils.cc
@@ -320,7 +320,7 @@
   static const std::string_view kXz = ".xz";
   if (filename.substr(filename.size() - kXz.size()) == kXz) {
 #if ENABLE_LZMA
-    decoder_ = std::make_unique<LzmaDecoder>(filename);
+    decoder_ = std::make_unique<ThreadedLzmaDecoder>(filename);
 #else
     LOG(FATAL) << "Reading xz-compressed files not supported on this platform";
 #endif
diff --git a/aos/events/logging/lzma_encoder.cc b/aos/events/logging/lzma_encoder.cc
index 54dc6f7..60a203e 100644
--- a/aos/events/logging/lzma_encoder.cc
+++ b/aos/events/logging/lzma_encoder.cc
@@ -205,4 +205,102 @@
   return end - begin;
 }
 
+ThreadedLzmaDecoder::ThreadedLzmaDecoder(std::string_view filename)
+    : decoder_(filename), decode_thread_([this] {
+        std::unique_lock lock(decode_mutex_);
+        while (true) {
+          // Wake if the queue is too small or we are finished.
+          continue_decoding_.wait(lock, [this] {
+            return decoded_queue_.size() < kQueueSize || finished_;
+          });
+
+          if (finished_) {
+            return;
+          }
+
+          while (true) {
+            CHECK(!finished_);
+            // Release our lock on the queue before doing decompression work.
+            lock.unlock();
+
+            ResizeableBuffer buffer;
+            buffer.resize(kBufSize);
+
+            const size_t bytes_read =
+                decoder_.Read(buffer.begin(), buffer.end());
+            buffer.resize(bytes_read);
+
+            // Relock the queue and move the new buffer to the end. This should
+            // be fast. We also need to stay locked when we wait().
+            lock.lock();
+            if (bytes_read > 0) {
+              decoded_queue_.emplace_back(std::move(buffer));
+            } else {
+              finished_ = true;
+            }
+
+            // If we've filled the queue or are out of data, go back to sleep.
+            if (decoded_queue_.size() >= kQueueSize || finished_) {
+              break;
+            }
+          }
+
+          // Notify main thread in case it was waiting for us to queue more
+          // data.
+          queue_filled_.notify_one();
+        }
+      }) {}
+
+ThreadedLzmaDecoder::~ThreadedLzmaDecoder() {
+  // Wake up decode thread so it can return.
+  {
+    std::scoped_lock lock(decode_mutex_);
+    finished_ = true;
+  }
+  continue_decoding_.notify_one();
+  decode_thread_.join();
+}
+
+size_t ThreadedLzmaDecoder::Read(uint8_t *begin, uint8_t *end) {
+  std::unique_lock lock(decode_mutex_);
+
+  // Strip any empty buffers
+  for (auto iter = decoded_queue_.begin(); iter != decoded_queue_.end();) {
+    if (iter->size() == 0) {
+      iter = decoded_queue_.erase(iter);
+    } else {
+      ++iter;
+    }
+  }
+
+  // If the queue is empty, sleep until the decoder thread has produced another
+  // buffer.
+  if (decoded_queue_.empty()) {
+    continue_decoding_.notify_one();
+    queue_filled_.wait(lock,
+                       [this] { return finished_ || !decoded_queue_.empty(); });
+    if (finished_ && decoded_queue_.empty()) {
+      return 0;
+    }
+  }
+  // Sanity check if the queue is empty and we're not finished.
+  CHECK(!decoded_queue_.empty()) << "Decoded queue unexpectedly empty";
+
+  ResizeableBuffer &front_buffer = decoded_queue_.front();
+
+  // Copy some data from our working buffer to the requested destination.
+  const std::size_t bytes_requested = end - begin;
+  const std::size_t bytes_to_copy =
+      std::min(bytes_requested, front_buffer.size());
+  memcpy(begin, front_buffer.data(), bytes_to_copy);
+  front_buffer.erase_front(bytes_to_copy);
+
+  // Ensure the decoding thread wakes up if the queue isn't full.
+  if (!finished_ && decoded_queue_.size() < kQueueSize) {
+    continue_decoding_.notify_one();
+  }
+
+  return bytes_to_copy;
+}
+
 }  // namespace aos::logger
diff --git a/aos/events/logging/lzma_encoder.h b/aos/events/logging/lzma_encoder.h
index 919f5fa..972ed6c 100644
--- a/aos/events/logging/lzma_encoder.h
+++ b/aos/events/logging/lzma_encoder.h
@@ -1,13 +1,16 @@
 #ifndef AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
 #define AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
 
-#include "absl/types/span.h"
-#include "flatbuffers/flatbuffers.h"
-#include "lzma.h"
+#include <condition_variable>
+#include <mutex>
+#include <thread>
 
+#include "absl/types/span.h"
 #include "aos/containers/resizeable_buffer.h"
 #include "aos/events/logging/buffer_encoder.h"
 #include "aos/events/logging/logger_generated.h"
+#include "flatbuffers/flatbuffers.h"
+#include "lzma.h"
 
 namespace aos::logger {
 
@@ -78,6 +81,38 @@
   std::string filename_;
 };
 
+// Decompresses data with liblzma in a new thread, up to a maximum queue
+// size. Calls to Read() will return data from the queue if available,
+// or block until more data is queued or the stream finishes.
+class ThreadedLzmaDecoder : public DataDecoder {
+ public:
+  explicit ThreadedLzmaDecoder(std::string_view filename);
+  ThreadedLzmaDecoder(const ThreadedLzmaDecoder &) = delete;
+  ThreadedLzmaDecoder &operator=(const ThreadedLzmaDecoder &) = delete;
+
+  ~ThreadedLzmaDecoder();
+
+  size_t Read(uint8_t *begin, uint8_t *end) final;
+
+ private:
+  static constexpr size_t kBufSize{256 * 1024};
+  static constexpr size_t kQueueSize{8};
+
+  LzmaDecoder decoder_;
+
+  // Queue of decompressed data to return on calls to Read
+  std::vector<ResizeableBuffer> decoded_queue_;
+
+  // Mutex to control access to decoded_queue_.
+  std::mutex decode_mutex_;
+  std::condition_variable continue_decoding_;
+  std::condition_variable queue_filled_;
+
+  bool finished_ = false;
+
+  std::thread decode_thread_;
+};
+
 }  // namespace aos::logger
 
 #endif  // AOS_EVENTS_LOGGING_LZMA_ENCODER_H_
diff --git a/aos/events/logging/lzma_encoder_test.cc b/aos/events/logging/lzma_encoder_test.cc
index 63ed6c2..bbd0c60 100644
--- a/aos/events/logging/lzma_encoder_test.cc
+++ b/aos/events/logging/lzma_encoder_test.cc
@@ -17,6 +17,16 @@
                        }),
                        ::testing::Range(0, 100)));
 
+INSTANTIATE_TEST_SUITE_P(
+    LzmaThreaded, BufferEncoderTest,
+    ::testing::Combine(::testing::Values([]() {
+                         return std::make_unique<LzmaEncoder>(2);
+                       }),
+                       ::testing::Values([](std::string_view filename) {
+                         return std::make_unique<ThreadedLzmaDecoder>(filename);
+                       }),
+                       ::testing::Range(0, 100)));
+
 // Tests that we return as much of the file as we can read if the end is
 // corrupted.
 TEST_F(BufferEncoderBaseTest, CorruptedBuffer) {
diff --git a/frc971/control_loops/drivetrain/drivetrain_test_lib.cc b/frc971/control_loops/drivetrain/drivetrain_test_lib.cc
index fb685f3..7680688 100644
--- a/frc971/control_loops/drivetrain/drivetrain_test_lib.cc
+++ b/frc971/control_loops/drivetrain/drivetrain_test_lib.cc
@@ -122,7 +122,6 @@
                   dt_config_.make_hybrid_drivetrain_velocity_loop()))) {
   Reinitialize();
   last_U_.setZero();
-
   event_loop_->AddPhasedLoop(
       [this](int) {
         // Skip this the first time.
@@ -144,10 +143,11 @@
         first_ = false;
         SendPositionMessage();
         SendTruthMessage();
+        SendImuMessage();
       },
       dt_config_.dt);
-
-  event_loop_->AddPhasedLoop([this](int) { SendImuMessage(); },
+  // TODO(milind): We should be able to get IMU readings at 1 kHz instead of 2.
+  event_loop_->AddPhasedLoop([this](int) { ReadImu(); },
                              std::chrono::microseconds(500));
 }
 
@@ -188,54 +188,78 @@
   }
 }
 
-void DrivetrainSimulation::SendImuMessage() {
+void DrivetrainSimulation::ReadImu() {
+  // Don't accumalate readings when we aren't sending them
   if (!send_messages_) {
     return;
   }
-  auto builder = imu_sender_.MakeBuilder();
 
-  frc971::ADIS16470DiagStat::Builder diag_stat_builder =
-      builder.MakeBuilder<frc971::ADIS16470DiagStat>();
-  diag_stat_builder.add_clock_error(false);
-  diag_stat_builder.add_memory_failure(imu_faulted_);
-  diag_stat_builder.add_sensor_failure(false);
-  diag_stat_builder.add_standby_mode(false);
-  diag_stat_builder.add_spi_communication_error(false);
-  diag_stat_builder.add_flash_memory_update_error(false);
-  diag_stat_builder.add_data_path_overrun(false);
-
-  const auto diag_stat_offset = diag_stat_builder.Finish();
-
-  frc971::IMUValues::Builder imu_builder =
-      builder.MakeBuilder<frc971::IMUValues>();
-  imu_builder.add_self_test_diag_stat(diag_stat_offset);
   const Eigen::Vector3d gyro =
       dt_config_.imu_transform.inverse() *
       Eigen::Vector3d(0.0, 0.0,
                       (drivetrain_plant_.X(3, 0) - drivetrain_plant_.X(1, 0)) /
                           (dt_config_.robot_radius * 2.0));
-  imu_builder.add_gyro_x(gyro.x());
-  imu_builder.add_gyro_y(gyro.y());
-  imu_builder.add_gyro_z(gyro.z());
+
   // Acceleration due to gravity, in m/s/s.
   constexpr double kG = 9.807;
   const Eigen::Vector3d accel =
       dt_config_.imu_transform.inverse() *
       Eigen::Vector3d(last_acceleration_.x() / kG, last_acceleration_.y() / kG,
                       1.0);
-  imu_builder.add_accelerometer_x(accel.x());
-  imu_builder.add_accelerometer_y(accel.y());
-  imu_builder.add_accelerometer_z(accel.z());
-  imu_builder.add_monotonic_timestamp_ns(
+  const int64_t timestamp =
       std::chrono::duration_cast<std::chrono::nanoseconds>(
           event_loop_->monotonic_now().time_since_epoch())
-          .count());
-  flatbuffers::Offset<frc971::IMUValues> imu_values_offsets =
-      imu_builder.Finish();
+          .count();
+  imu_readings_.push({.gyro = gyro,
+                      .accel = accel,
+                      .timestamp = timestamp,
+                      .faulted = imu_faulted_});
+}
+
+void DrivetrainSimulation::SendImuMessage() {
+  if (!send_messages_) {
+    return;
+  }
+
+  std::vector<flatbuffers::Offset<IMUValues>> imu_values;
+  auto builder = imu_sender_.MakeBuilder();
+
+  // Send all the IMU readings and pop the ones we have sent
+  while (!imu_readings_.empty()) {
+    const auto imu_reading = imu_readings_.front();
+    imu_readings_.pop();
+
+    frc971::ADIS16470DiagStat::Builder diag_stat_builder =
+        builder.MakeBuilder<frc971::ADIS16470DiagStat>();
+    diag_stat_builder.add_clock_error(false);
+    diag_stat_builder.add_memory_failure(imu_reading.faulted);
+    diag_stat_builder.add_sensor_failure(false);
+    diag_stat_builder.add_standby_mode(false);
+    diag_stat_builder.add_spi_communication_error(false);
+    diag_stat_builder.add_flash_memory_update_error(false);
+    diag_stat_builder.add_data_path_overrun(false);
+
+    const auto diag_stat_offset = diag_stat_builder.Finish();
+
+    frc971::IMUValues::Builder imu_builder =
+        builder.MakeBuilder<frc971::IMUValues>();
+    imu_builder.add_self_test_diag_stat(diag_stat_offset);
+
+    imu_builder.add_gyro_x(imu_reading.gyro.x());
+    imu_builder.add_gyro_y(imu_reading.gyro.y());
+    imu_builder.add_gyro_z(imu_reading.gyro.z());
+
+    imu_builder.add_accelerometer_x(imu_reading.accel.x());
+    imu_builder.add_accelerometer_y(imu_reading.accel.y());
+    imu_builder.add_accelerometer_z(imu_reading.accel.z());
+    imu_builder.add_monotonic_timestamp_ns(imu_reading.timestamp);
+
+    imu_values.push_back(imu_builder.Finish());
+  }
+
   flatbuffers::Offset<
       flatbuffers::Vector<flatbuffers::Offset<frc971::IMUValues>>>
-      imu_values_offset = builder.fbb()->CreateVector(&imu_values_offsets, 1);
-
+      imu_values_offset = builder.fbb()->CreateVector(imu_values);
   frc971::IMUValuesBatch::Builder imu_values_batch_builder =
       builder.MakeBuilder<frc971::IMUValuesBatch>();
   imu_values_batch_builder.add_readings(imu_values_offset);
diff --git a/frc971/control_loops/drivetrain/drivetrain_test_lib.h b/frc971/control_loops/drivetrain/drivetrain_test_lib.h
index 2075e66..b98711b 100644
--- a/frc971/control_loops/drivetrain/drivetrain_test_lib.h
+++ b/frc971/control_loops/drivetrain/drivetrain_test_lib.h
@@ -1,6 +1,9 @@
 #ifndef FRC971_CONTROL_LOOPS_DRIVETRAIN_DRIVETRAIN_TEST_LIB_H_
 #define FRC971_CONTROL_LOOPS_DRIVETRAIN_DRIVETRAIN_TEST_LIB_H_
 
+#include <queue>
+#include <vector>
+
 #include "aos/events/event_loop.h"
 #include "frc971/control_loops/control_loops_generated.h"
 #include "frc971/control_loops/drivetrain/drivetrain_config.h"
@@ -76,16 +79,27 @@
   // Set whether we should send out the drivetrain Position and IMU messages
   // (this will keep sending the "truth" message).
   void set_send_messages(const bool send_messages) {
+    if (!send_messages && !imu_readings_.empty()) {
+      // Flush current IMU readings
+      SendImuMessage();
+    }
     send_messages_ = send_messages;
   }
 
-  void set_imu_faulted(const bool fault_imu) {
-    imu_faulted_ = fault_imu;
-  }
+  void set_imu_faulted(const bool fault_imu) { imu_faulted_ = fault_imu; }
 
  private:
+  struct ImuReading {
+    Eigen::Vector3d gyro;
+    Eigen::Vector3d accel;
+    int64_t timestamp;
+    bool faulted;
+  };
+
   // Sends out the position queue messages.
   void SendPositionMessage();
+  // Reads and stores the IMU state
+  void ReadImu();
   // Sends out the IMU messages.
   void SendImuMessage();
   // Sends out the "truth" status message.
@@ -109,6 +123,8 @@
 
   bool imu_faulted_ = false;
 
+  std::queue<ImuReading> imu_readings_;
+
   DrivetrainConfig<double> dt_config_;
 
   DrivetrainPlant drivetrain_plant_;