Merge "Fix action_test to not require watchers to watch while not running"
diff --git a/aos/BUILD b/aos/BUILD
index 2ae0fec..f6c6182 100644
--- a/aos/BUILD
+++ b/aos/BUILD
@@ -159,7 +159,6 @@
         "//aos/testing:prevent_exit",
         "//aos/testing:test_shm",
         "//aos/time",
-        "//aos/util:thread",
     ],
 )
 
@@ -254,7 +253,6 @@
         ":complex_thread_local",
         "//aos/logging",
         "//aos/testing:googletest",
-        "//aos/util:thread",
     ],
 )
 
@@ -463,6 +461,7 @@
         "@com_github_google_glog//:glog",
         "@com_google_absl//absl/strings",
         "@com_google_absl//absl/types:span",
+        "//aos:macros",
     ],
 )
 
diff --git a/aos/complex_thread_local_test.cc b/aos/complex_thread_local_test.cc
index b8eaa2c..5323dea 100644
--- a/aos/complex_thread_local_test.cc
+++ b/aos/complex_thread_local_test.cc
@@ -1,11 +1,10 @@
 #include "aos/complex_thread_local.h"
 
 #include <atomic>
+#include <thread>
 
 #include "gtest/gtest.h"
 
-#include "aos/util/thread.h"
-
 namespace aos {
 namespace testing {
 
@@ -65,7 +64,7 @@
 
 TEST_F(ComplexThreadLocalTest, AnotherThread) {
   EXPECT_FALSE(local.created());
-  util::FunctionThread::RunInOtherThread([this]() {
+  std::thread t1([this]() {
     EXPECT_FALSE(local.created());
     local.Create(971);
     EXPECT_TRUE(local.created());
@@ -73,21 +72,21 @@
     EXPECT_EQ(1, TraceableObject::constructions);
     EXPECT_EQ(0, TraceableObject::destructions);
   });
+  t1.join();
   EXPECT_EQ(1, TraceableObject::constructions);
   EXPECT_EQ(1, TraceableObject::destructions);
   EXPECT_FALSE(local.created());
 }
 
 TEST_F(ComplexThreadLocalTest, TwoThreads) {
-  util::FunctionThread thread([this](util::FunctionThread *) {
+  std::thread thread([this]() {
     local.Create(971);
     EXPECT_EQ(971, local->data);
     EXPECT_EQ(0, TraceableObject::destructions);
   });
-  thread.Start();
   local.Create(973);
   EXPECT_EQ(973, local->data);
-  thread.Join();
+  thread.join();
   EXPECT_TRUE(local.created());
   EXPECT_EQ(2, TraceableObject::constructions);
   EXPECT_EQ(1, TraceableObject::destructions);
diff --git a/aos/condition_test.cc b/aos/condition_test.cc
index 1e8d06c..b49f27c 100644
--- a/aos/condition_test.cc
+++ b/aos/condition_test.cc
@@ -19,7 +19,6 @@
 #include "aos/macros.h"
 #include "aos/ipc_lib/aos_sync.h"
 #include "aos/die.h"
-#include "aos/util/thread.h"
 #include "aos/testing/prevent_exit.h"
 
 namespace aos {
@@ -58,20 +57,18 @@
   ::std::atomic_bool child_finished(false);
   Condition child_ready(&mutex_);
   ASSERT_FALSE(mutex_.Lock());
-  util::FunctionThread child([this, &child_finished, &child_ready](
-      util::FunctionThread *) {
+  std::thread child([this, &child_finished, &child_ready] {
     ASSERT_FALSE(mutex_.Lock());
     child_ready.Broadcast();
     ASSERT_FALSE(condition_.Wait());
     child_finished.store(true);
     mutex_.Unlock();
   });
-  child.Start();
   ASSERT_FALSE(child_ready.Wait());
   EXPECT_FALSE(child_finished.load());
   condition_.Signal();
   mutex_.Unlock();
-  child.Join();
+  child.join();
   EXPECT_TRUE(child_finished.load());
 }
 
diff --git a/aos/events/event_loop.h b/aos/events/event_loop.h
index 80cc025..4294226 100644
--- a/aos/events/event_loop.h
+++ b/aos/events/event_loop.h
@@ -438,7 +438,8 @@
                                   T::GetFullyQualifiedName(), name(), node());
     CHECK(channel != nullptr)
         << ": Channel { \"name\": \"" << channel_name << "\", \"type\": \""
-        << T::GetFullyQualifiedName() << "\" } not found in config.";
+        << T::GetFullyQualifiedName() << "\" } not found in config for "
+        << name() << ".";
 
     if (!configuration::ChannelIsSendableOnNode(channel, node())) {
       LOG(FATAL) << "Channel { \"name\": \"" << channel_name
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index e556c0f..15740a1 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -5,6 +5,7 @@
 #include <string_view>
 
 #include "absl/types/span.h"
+#include "aos/macros.h"
 #include "flatbuffers/flatbuffers.h"
 #include "glog/logging.h"
 
@@ -30,12 +31,14 @@
   void Reset() { is_allocated_ = false; }
   bool is_allocated() const { return is_allocated_; }
 
+  bool allocated() { return is_allocated_; }
+
  private:
   bool is_allocated_ = false;
 };
 
 // This class is a fixed memory allocator which holds the data for a flatbuffer
-// in an array.
+// in a vector.
 class FixedAllocator : public FixedAllocatorBase {
  public:
   FixedAllocator(size_t size) : buffer_(size, 0) {}
@@ -55,7 +58,7 @@
 class PreallocatedAllocator : public FixedAllocatorBase {
  public:
   PreallocatedAllocator(void *data, size_t size) : data_(data), size_(size) {}
-  PreallocatedAllocator(const PreallocatedAllocator&) = delete;
+  PreallocatedAllocator(const PreallocatedAllocator &) = delete;
   PreallocatedAllocator(PreallocatedAllocator &&other)
       : data_(other.data_), size_(other.size_) {
     CHECK(!is_allocated());
@@ -229,6 +232,50 @@
   flatbuffers::DetachedBuffer buffer_;
 };
 
+// Array backed flatbuffer which manages building of the flatbuffer.
+template <typename T, size_t Size>
+class FlatbufferFixedAllocatorArray final : public Flatbuffer<T> {
+ public:
+  FlatbufferFixedAllocatorArray() : buffer_(), allocator_(&buffer_[0], Size) {
+    builder_ = flatbuffers::FlatBufferBuilder(Size, &allocator_);
+  }
+
+  flatbuffers::FlatBufferBuilder *Builder() {
+    if (allocator_.allocated()) {
+      LOG(FATAL) << "Array backed flatbuffer can only be built once";
+    }
+    return &builder_;
+  }
+
+  void Finish(flatbuffers::Offset<T> root) {
+    if (!allocator_.allocated()) {
+      LOG(FATAL) << "Cannot finish if never building";
+    }
+    builder_.Finish(root);
+    data_ = builder_.GetBufferPointer();
+    size_ = builder_.GetSize();
+  }
+
+  const uint8_t *data() const override {
+    CHECK_NOTNULL(data_);
+    return data_;
+  }
+  uint8_t *data() override {
+    CHECK_NOTNULL(data_);
+    return data_;
+  }
+  size_t size() const override { return size_; }
+
+ private:
+  std::array<uint8_t, Size> buffer_;
+  PreallocatedAllocator allocator_;
+  flatbuffers::FlatBufferBuilder builder_;
+  uint8_t *data_ = nullptr;
+  size_t size_ = 0;
+
+  DISALLOW_COPY_AND_ASSIGN(FlatbufferFixedAllocatorArray);
+};
+
 // This object associates the message type with the memory storing the
 // flatbuffer.  This only stores root tables.
 //
diff --git a/aos/ipc_lib/BUILD b/aos/ipc_lib/BUILD
index 91b050c..1989f06 100644
--- a/aos/ipc_lib/BUILD
+++ b/aos/ipc_lib/BUILD
@@ -101,7 +101,6 @@
         "//aos/testing:test_shm",
         "//aos/time",
         "//aos/util:death_test_log_implementation",
-        "//aos/util:thread",
     ],
 )
 
diff --git a/aos/ipc_lib/raw_queue_test.cc b/aos/ipc_lib/raw_queue_test.cc
index 1c02c53..3684029 100644
--- a/aos/ipc_lib/raw_queue_test.cc
+++ b/aos/ipc_lib/raw_queue_test.cc
@@ -8,6 +8,7 @@
 #include <ostream>
 #include <memory>
 #include <map>
+#include <thread>
 
 #include "gtest/gtest.h"
 
@@ -17,7 +18,6 @@
 #include "aos/time/time.h"
 #include "aos/logging/logging.h"
 #include "aos/die.h"
-#include "aos/util/thread.h"
 #include "aos/util/options.h"
 #include "aos/util/death_test_log_implementation.h"
 #include "aos/testing/prevent_exit.h"
@@ -976,16 +976,14 @@
   ASSERT_NE(nullptr, message1);
   ASSERT_NE(nullptr, message2);
   EXPECT_EQ(free_before, queue->FreeMessages());
-  util::FunctionThread t1([message1, queue](util::Thread *) {
+  std::thread t1([message1, queue] {
     queue->FreeMessage(message1);
   });
-  util::FunctionThread t2([message2, queue](util::Thread *) {
+  std::thread t2([message2, queue] {
     queue->FreeMessage(message2);
   });
-  t1.Start();
-  t2.Start();
-  t1.WaitUntilDone();
-  t2.WaitUntilDone();
+  t1.join();
+  t2.join();
   EXPECT_EQ(free_before, queue->FreeMessages());
 }
 
diff --git a/aos/mutex/BUILD b/aos/mutex/BUILD
index e698f62..bdfb8fc 100644
--- a/aos/mutex/BUILD
+++ b/aos/mutex/BUILD
@@ -27,7 +27,6 @@
         "//aos/testing:test_shm",
         "//aos/time",
         "//aos/util:death_test_log_implementation",
-        "//aos/util:thread",
         "@com_github_google_glog//:glog",
     ],
 )
diff --git a/aos/mutex/mutex_test.cc b/aos/mutex/mutex_test.cc
index e7bd617..bd47ae0 100644
--- a/aos/mutex/mutex_test.cc
+++ b/aos/mutex/mutex_test.cc
@@ -16,7 +16,6 @@
 #include "aos/testing/test_shm.h"
 #include "aos/time/time.h"
 #include "aos/util/death_test_log_implementation.h"
-#include "aos/util/thread.h"
 
 namespace aos {
 namespace testing {
@@ -108,9 +107,10 @@
       static_cast<Mutex *>(shm_malloc_aligned(sizeof(Mutex), alignof(Mutex)));
   new (mutex) Mutex();
 
-  util::FunctionThread::RunInOtherThread([&]() {
+  std::thread thread([&]() {
     ASSERT_FALSE(mutex->Lock());
   });
+  thread.join();
   EXPECT_TRUE(mutex->Lock());
 
   mutex->Unlock();
@@ -124,9 +124,10 @@
       static_cast<Mutex *>(shm_malloc_aligned(sizeof(Mutex), alignof(Mutex)));
   new (mutex) Mutex();
 
-  util::FunctionThread::RunInOtherThread([&]() {
+  std::thread thread([&]() {
     ASSERT_FALSE(mutex->Lock());
   });
+  thread.join();
   EXPECT_EQ(Mutex::State::kOwnerDied, mutex->TryLock());
 
   mutex->Unlock();
@@ -145,11 +146,12 @@
   // in the original failure.
   Mutex mutex2;
 
-  util::FunctionThread::RunInOtherThread([&]() {
+  std::thread thread([&]() {
     ASSERT_FALSE(mutex1.Lock());
     ASSERT_FALSE(mutex2.Lock());
     mutex1.Unlock();
   });
+  thread.join();
 
   EXPECT_EQ(Mutex::State::kLocked, mutex1.TryLock());
   EXPECT_EQ(Mutex::State::kOwnerDied, mutex2.TryLock());
@@ -158,49 +160,21 @@
   mutex2.Unlock();
 }
 
-namespace {
-
-class AdderThread : public ::aos::util::Thread {
- public:
-  AdderThread(int *counter, Mutex *mutex,
-              monotonic_clock::duration sleep_before_time,
-              monotonic_clock::duration sleep_after_time)
-      : counter_(counter),
-        mutex_(mutex),
-        sleep_before_time_(sleep_before_time),
-        sleep_after_time_(sleep_after_time) {}
-
- private:
-  virtual void Run() override {
-    this_thread::sleep_for(sleep_before_time_);
-    MutexLocker locker(mutex_);
-    ++(*counter_);
-    this_thread::sleep_for(sleep_after_time_);
-  }
-
-  int *const counter_;
-  Mutex *const mutex_;
-  const monotonic_clock::duration sleep_before_time_, sleep_after_time_;
-};
-
-}  // namespace
-
 // Verifies that ThreadSanitizer understands that a contended mutex establishes
 // a happens-before relationship.
 TEST_F(MutexTest, ThreadSanitizerContended) {
   int counter = 0;
-  AdderThread threads[2]{
-      {&counter, &test_mutex_, chrono::milliseconds(200),
-       chrono::milliseconds(0)},
-      {&counter, &test_mutex_, chrono::milliseconds(0),
-       chrono::milliseconds(0)},
-  };
-  for (auto &c : threads) {
-    c.Start();
-  }
-  for (auto &c : threads) {
-    c.WaitUntilDone();
-  }
+  std::thread thread1([this, &counter]() {
+    std::this_thread::sleep_for(std::chrono::milliseconds(200));
+    MutexLocker locker(&test_mutex_);
+    ++counter;
+  });
+  std::thread thread2([this, &counter]() {
+    MutexLocker locker(&test_mutex_);
+    ++counter;
+  });
+  thread1.join();
+  thread2.join();
   EXPECT_EQ(2, counter);
 }
 
@@ -229,50 +203,29 @@
 // establishes a happens-before relationship.
 TEST_F(MutexTest, ThreadSanitizerUncontended) {
   int counter = 0;
-  AdderThread threads[2]{
-      {&counter, &test_mutex_, chrono::milliseconds(0),
-       chrono::milliseconds(0)},
-      {&counter, &test_mutex_, chrono::milliseconds(200),
-       chrono::milliseconds(0)}, };
-  for (auto &c : threads) {
-    c.Start();
-  }
-  for (auto &c : threads) {
-    c.WaitUntilDone();
-  }
+  std::thread thread1([this, &counter]() {
+    MutexLocker locker(&test_mutex_);
+    ++counter;
+  });
+  std::thread thread2([this, &counter]() {
+    std::this_thread::sleep_for(std::chrono::milliseconds(200));
+    MutexLocker locker(&test_mutex_);
+    ++counter;
+  });
+  thread1.join();
+  thread2.join();
   EXPECT_EQ(2, counter);
 }
 
-namespace {
-
-class LockerThread : public util::Thread {
- public:
-  LockerThread(Mutex *mutex, bool lock, bool unlock)
-      : mutex_(mutex), lock_(lock), unlock_(unlock) {}
-
- private:
-  virtual void Run() override {
-    if (lock_) {
-      ASSERT_FALSE(mutex_->Lock());
-    }
-    if (unlock_) {
-      mutex_->Unlock();
-    }
-  }
-
-  Mutex *const mutex_;
-  const bool lock_, unlock_;
-};
-
-}  // namespace
-
 // Makes sure that we don't SIGSEGV or something with multiple threads.
 TEST_F(MutexTest, MultiThreadedLock) {
-  LockerThread t(&test_mutex_, true, true);
-  t.Start();
+  std::thread thread([this] {
+    ASSERT_FALSE(test_mutex_.Lock());
+    test_mutex_.Unlock();
+  });
   ASSERT_FALSE(test_mutex_.Lock());
   test_mutex_.Unlock();
-  t.Join();
+  thread.join();
 }
 
 TEST_F(MutexLockerTest, Basic) {
@@ -292,9 +245,10 @@
       static_cast<Mutex *>(shm_malloc_aligned(sizeof(Mutex), alignof(Mutex)));
   new (mutex) Mutex();
 
-  util::FunctionThread::RunInOtherThread([&]() {
+  std::thread thread([&]() {
     ASSERT_FALSE(mutex->Lock());
   });
+  thread.join();
   EXPECT_DEATH(
       {
         logging::SetImplementation(new util::DeathTestLogImplementation());
@@ -359,9 +313,10 @@
       static_cast<Mutex *>(shm_malloc_aligned(sizeof(Mutex), alignof(Mutex)));
   new (mutex) Mutex();
 
-  util::FunctionThread::RunInOtherThread([&]() {
+  std::thread thread([&]() {
     ASSERT_FALSE(mutex->Lock());
   });
+  thread.join();
   {
     aos::IPCMutexLocker locker(mutex);
     EXPECT_EQ(Mutex::State::kLockFailed, mutex->TryLock());
diff --git a/aos/stl_mutex/BUILD b/aos/stl_mutex/BUILD
index f187542..06599e3 100644
--- a/aos/stl_mutex/BUILD
+++ b/aos/stl_mutex/BUILD
@@ -21,6 +21,5 @@
         "//aos:die",
         "//aos/testing:googletest",
         "//aos/testing:test_logging",
-        "//aos/util:thread",
     ],
 )
diff --git a/aos/stl_mutex/stl_mutex_test.cc b/aos/stl_mutex/stl_mutex_test.cc
index 36d7067..0fd052a 100644
--- a/aos/stl_mutex/stl_mutex_test.cc
+++ b/aos/stl_mutex/stl_mutex_test.cc
@@ -3,7 +3,6 @@
 #include "gtest/gtest.h"
 
 #include "aos/testing/test_logging.h"
-#include "aos/util/thread.h"
 #include "aos/die.h"
 
 namespace aos {
diff --git a/aos/util/BUILD b/aos/util/BUILD
index 4d7a3d1..f487892 100644
--- a/aos/util/BUILD
+++ b/aos/util/BUILD
@@ -55,7 +55,6 @@
     ],
     deps = [
         ":run_command",
-        ":thread",
         "//aos/logging",
         "//aos/testing:googletest",
     ],
@@ -146,20 +145,6 @@
 )
 
 cc_library(
-    name = "thread",
-    srcs = [
-        "thread.cc",
-    ],
-    hdrs = [
-        "thread.h",
-    ],
-    deps = [
-        "//aos:macros",
-        "@com_github_google_glog//:glog",
-    ],
-)
-
-cc_library(
     name = "trapezoid_profile",
     srcs = [
         "trapezoid_profile.cc",
diff --git a/aos/util/run_command_test.cc b/aos/util/run_command_test.cc
index 775b716..400d2f9 100644
--- a/aos/util/run_command_test.cc
+++ b/aos/util/run_command_test.cc
@@ -1,8 +1,6 @@
 #include "aos/util/run_command.h"
-
 #include "gtest/gtest.h"
-
-#include "aos/util/thread.h"
+#include <thread>
 
 namespace aos {
 namespace util {
@@ -38,16 +36,14 @@
 
 TEST(RunCommandTest, MultipleThreads) {
   int result1, result2;
-  util::FunctionThread t1([&result1](util::Thread *) {
+  std::thread t1([&result1]() {
     result1 = RunCommand("true");
   });
-  util::FunctionThread t2([&result2](util::Thread *) {
+  std::thread t2([&result2]() {
     result2 = RunCommand("true");
   });
-  t1.Start();
-  t2.Start();
-  t1.WaitUntilDone();
-  t2.WaitUntilDone();
+  t1.join();
+  t2.join();
   ASSERT_NE(-1, result1);
   ASSERT_NE(-1, result2);
   ASSERT_TRUE(WIFEXITED(result1));
diff --git a/aos/util/thread.cc b/aos/util/thread.cc
deleted file mode 100644
index bb5999f..0000000
--- a/aos/util/thread.cc
+++ /dev/null
@@ -1,79 +0,0 @@
-#include "aos/util/thread.h"
-
-#include <pthread.h>
-#include <signal.h>
-
-#include "glog/logging.h"
-
-namespace aos {
-namespace util {
-
-Thread::Thread() : started_(false), joined_(false), should_terminate_(false) {}
-
-Thread::~Thread() {
-  CHECK(!(started_ && !joined_));
-}
-
-void Thread::Start() {
-  CHECK(!started_);
-  started_ = true;
-  CHECK(pthread_create(&thread_, NULL, &Thread::StaticRun, this) == 0);
-}
-
-void Thread::Join() {
-  CHECK(!joined_ && started_);
-  joined_ = true;
-  should_terminate_.store(true);
-  CHECK(pthread_join(thread_, NULL) == 0);
-}
-
-bool Thread::TryJoin() {
-  CHECK(!joined_ && started_);
-#ifdef AOS_SANITIZER_thread
-  // ThreadSanitizer misses the tryjoin and then complains about leaking the
-  // thread. Instead, we'll just check if the thread is still around and then
-  // do a regular Join() iff it isn't.
-  // TODO(brians): Remove this once tsan learns about pthread_tryjoin_np.
-  const int kill_ret = pthread_kill(thread_, 0);
-  // If it's still around.
-  if (kill_ret == 0) return false;
-  // If it died, we'll get ESRCH. Otherwise, something went wrong.
-  if (kill_ret != ESRCH) {
-    errno = kill_ret;
-    PLOG(FATAL) << "pthread_kill(thread_, 0) failed";
-  }
-  Join();
-  return true;
-#else
-  const int ret = pthread_tryjoin_np(thread_, nullptr);
-  if (ret == 0) {
-    joined_ = true;
-    return true;
-  } else if (ret == EBUSY) {
-    return false;
-  } else {
-    errno = ret;
-    PLOG(FATAL) << "pthread_tryjoin_np(thread_, nullptr) failed";
-    return false;
-  }
-#endif
-}
-
-void Thread::RequestStop() {
-  CHECK(!joined_ && started_);
-  should_terminate_.store(true);
-}
-
-void Thread::WaitUntilDone() {
-  CHECK(!joined_ && started_);
-  joined_ = true;
-  CHECK(pthread_join(thread_, NULL) == 0);
-}
-
-void *Thread::StaticRun(void *self) {
-  static_cast<Thread *>(self)->Run();
-  return NULL;
-}
-
-}  // namespace util
-}  // namespace aos
diff --git a/aos/util/thread.h b/aos/util/thread.h
deleted file mode 100644
index 337ea48..0000000
--- a/aos/util/thread.h
+++ /dev/null
@@ -1,90 +0,0 @@
-#ifndef AOS_UTIL_THREAD_H_
-#define AOS_UTIL_THREAD_H_
-
-#include <functional>
-#include <atomic>
-
-#include <pthread.h>
-
-#include "aos/macros.h"
-
-namespace aos {
-namespace util {
-
-// A nice wrapper around a pthreads thread.
-//
-// TODO(aschuh): replace this with std::thread
-class Thread {
- public:
-  Thread();
-  virtual ~Thread();
-
-  // Actually creates the thread.
-  void Start();
-
-  // Asks the code to stop and then waits until it has done so.
-  // This or TryJoin() (returning true) must be called exactly once for every
-  // instance.
-  void Join();
-
-  // If the code has already finished, returns true. Does not block waiting if
-  // it isn't.
-  // Join() must not be called on this instance if this returns true.
-  // This must return true or Join() must be called exactly once for every
-  // instance.
-  bool TryJoin();
-
-  // Asks the code to stop (in preparation for a Join()).
-  void RequestStop();
-
-  // Waits until the code has stopped. Does not ask it to do so.
-  void WaitUntilDone();
-
- protected:
-  // Subclasses need to call this periodically if they are going to loop to
-  // check whether they have been asked to stop.
-  bool should_continue() {
-    return !should_terminate_.load();
-  }
-
- private:
-  // Where subclasses actually do something.
-  //
-  // They should not block for long periods of time without checking
-  // should_continue().
-  virtual void Run() = 0;
-
-  static void *StaticRun(void *self);
-
-  pthread_t thread_;
-  bool started_;
-  bool joined_;
-  ::std::atomic_bool should_terminate_;
-
-  DISALLOW_COPY_AND_ASSIGN(Thread);
-};
-
-class FunctionThread : public Thread {
- public:
-  FunctionThread(::std::function<void(FunctionThread *)> function)
-      : function_(function) {}
-
-  // Runs function in a new thread and waits for it to return.
-  static void RunInOtherThread(::std::function<void()> function) {
-    FunctionThread t([&function](FunctionThread *) { function(); });
-    t.Start();
-    t.Join();
-  }
-
- private:
-  virtual void Run() override {
-    function_(this);
-  }
-
-  const ::std::function<void(FunctionThread *)> function_;
-};
-
-}  // namespace util
-}  // namespace aos
-
-#endif  // AOS_UTIL_THREAD_H_
diff --git a/y2020/control_loops/python/BUILD b/y2020/control_loops/python/BUILD
index b7f393b..492326b 100644
--- a/y2020/control_loops/python/BUILD
+++ b/y2020/control_loops/python/BUILD
@@ -65,6 +65,23 @@
     ],
 )
 
+py_binary(
+    name = "intake",
+    srcs = [
+        "intake.py",
+    ],
+    legacy_create_init = False,
+    restricted_to = ["//tools:k8"],
+    deps = [
+        ":python_init",
+        "//external:python-gflags",
+        "//external:python-glog",
+        "//frc971/control_loops/python:angular_system",
+        "//frc971/control_loops/python:controls",
+    ],
+)
+
+
 py_library(
     name = "python_init",
     srcs = ["__init__.py"],
diff --git a/y2020/control_loops/python/intake.py b/y2020/control_loops/python/intake.py
new file mode 100644
index 0000000..f65679a
--- /dev/null
+++ b/y2020/control_loops/python/intake.py
@@ -0,0 +1,54 @@
+#!/usr/bin/python
+
+from aos.util.trapezoid_profile import TrapezoidProfile
+from frc971.control_loops.python import control_loop
+from frc971.control_loops.python import angular_system
+from frc971.control_loops.python import controls
+import numpy
+import sys
+from matplotlib import pylab
+import gflags
+import glog
+
+FLAGS = gflags.FLAGS
+
+try:
+    gflags.DEFINE_bool('plot', False, 'If true, plot the loop response.')
+except gflags.DuplicateFlagError:
+    pass
+
+#TODO: update constants
+kIntake = angular_system.AngularSystemParams(
+    name='Intake',
+    motor=control_loop.BAG(),
+    G=(1.0 / 1.0),
+    J=0.3,
+    q_pos=0.20,
+    q_vel=5.0,
+    kalman_q_pos=0.12,
+    kalman_q_vel=2.0,
+    kalman_q_voltage=4.0,
+    kalman_r_position=0.05)
+
+
+def main(argv):
+    if FLAGS.plot:
+        R = numpy.matrix([[numpy.pi / 2.0], [0.0]])
+        angular_system.PlotKick(kIntake, R)
+        angular_system.PlotMotion(kIntake, R)
+
+    # Write the generated constants out to a file.
+    if len(argv) != 5:
+        glog.fatal(
+            'Expected .h file name and .cc file name for the intake and integral intake.'
+        )
+    else:
+        namespaces = ['y2020', 'control_loops', 'superstructure', 'intake']
+        angular_system.WriteAngularSystem(kIntake, argv[1:3], argv[3:5],
+                                          namespaces)
+
+
+if __name__ == '__main__':
+    argv = FLAGS(sys.argv)
+    glog.init()
+    sys.exit(main(argv))
diff --git a/y2020/control_loops/superstructure/intake/BUILD b/y2020/control_loops/superstructure/intake/BUILD
new file mode 100644
index 0000000..2133204
--- /dev/null
+++ b/y2020/control_loops/superstructure/intake/BUILD
@@ -0,0 +1,32 @@
+package(default_visibility = ["//y2020:__subpackages__"])
+
+genrule(
+    name = "genrule_intake",
+    outs = [
+        "intake_plant.h",
+        "intake_plant.cc",
+        "integral_intake_plant.h",
+        "integral_intake_plant.cc",
+    ],
+    cmd = "$(location //y2020/control_loops/python:intake) $(OUTS)",
+    tools = [
+        "//y2020/control_loops/python:intake",
+    ],
+)
+
+cc_library(
+    name = "intake_plants",
+    srcs = [
+        "intake_plant.cc",
+        "integral_intake_plant.cc",
+    ],
+    hdrs = [
+        "intake_plant.h",
+        "integral_intake_plant.h",
+    ],
+    visibility = ["//visibility:public"],
+    deps = [
+        "//frc971/control_loops:hybrid_state_feedback_loop",
+        "//frc971/control_loops:state_feedback_loop",
+    ],
+)
diff --git a/y2020/control_loops/superstructure/superstructure_goal.fbs b/y2020/control_loops/superstructure/superstructure_goal.fbs
index 5953590..c1b3b5a 100644
--- a/y2020/control_loops/superstructure/superstructure_goal.fbs
+++ b/y2020/control_loops/superstructure/superstructure_goal.fbs
@@ -5,6 +5,12 @@
 table Goal {
   // Zero is at the horizontal, positive towards the front (meters on the lead screw).
   hood:frc971.control_loops.StaticZeroingSingleDOFProfiledSubsystemGoal;
+
+  //0 = Linkage on sprocket is pointing straight up
+  //Positive = forward
+  intake:frc971.control_loops.StaticZeroingSingleDOFProfiledSubsystemGoal;
+  //Positive is rollers intaking to Washing Machine.
+  roller_voltage:float;
 }
 
 root_type Goal;
diff --git a/y2020/control_loops/superstructure/superstructure_output.fbs b/y2020/control_loops/superstructure/superstructure_output.fbs
index e245712..e887cf5 100644
--- a/y2020/control_loops/superstructure/superstructure_output.fbs
+++ b/y2020/control_loops/superstructure/superstructure_output.fbs
@@ -3,6 +3,12 @@
 table Output {
   // Votage sent to the hood. Positive moves up.
   hood_voltage:double;
+
+  // Voltage sent to motors on intake joint. Positive extends rollers.
+  intake_joint_voltage:double;
+
+  // Voltage sent to rollers on intake. Positive rolls inward.
+  intake_roller_voltage:double;
 }
 
 root_type Output;
diff --git a/y2020/control_loops/superstructure/superstructure_position.fbs b/y2020/control_loops/superstructure/superstructure_position.fbs
index 50b9c36..62754df 100644
--- a/y2020/control_loops/superstructure/superstructure_position.fbs
+++ b/y2020/control_loops/superstructure/superstructure_position.fbs
@@ -3,9 +3,11 @@
 namespace y2020.control_loops.superstructure;
 
 table Position {
-
   // Zero is at the horizontal, positive towards the front (meters on the lead screw).
   hood:frc971.AbsolutePosition;
+
+  // Position of the intake. 0 when four-bar is vertical, positive extended.
+  intake_joint:frc971.AbsolutePosition;
 }
 
 root_type Position;
diff --git a/y2020/control_loops/superstructure/superstructure_status.fbs b/y2020/control_loops/superstructure/superstructure_status.fbs
index 8c29e23..a230528 100644
--- a/y2020/control_loops/superstructure/superstructure_status.fbs
+++ b/y2020/control_loops/superstructure/superstructure_status.fbs
@@ -12,6 +12,7 @@
 
   //Subsystem status.
   hood:frc971.control_loops.AbsoluteEncoderProfiledJointStatus;
+  intake:frc971.control_loops.AbsoluteEncoderProfiledJointStatus;
 }
 
 root_type Status;
diff --git a/y2020/vision/v4l2_reader.cc b/y2020/vision/v4l2_reader.cc
index 727f8ba..f43a2ac 100644
--- a/y2020/vision/v4l2_reader.cc
+++ b/y2020/vision/v4l2_reader.cc
@@ -15,10 +15,7 @@
   PCHECK(fd_.get() != -1);
 
   // First, clean up after anybody else who left the device streaming.
-  {
-    int type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
-    PCHECK(Ioctl(VIDIOC_STREAMOFF, &type) == 0);
-  }
+  StreamOff();
 
   {
     struct v4l2_format format;
@@ -130,5 +127,19 @@
   PCHECK(Ioctl(VIDIOC_QBUF, &buffer) == 0);
 }
 
+void V4L2Reader::StreamOff() {
+  int type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+  const int result = Ioctl(VIDIOC_STREAMOFF, &type);
+  if (result == 0) {
+    return;
+  }
+  // Some devices (like Alex's webcam) return this if streaming isn't currently
+  // on, unlike what the documentations says should happen.
+  if (errno == EBUSY) {
+    return;
+  }
+  PLOG(FATAL) << "VIDIOC_STREAMOFF failed";
+}
+
 }  // namespace vision
 }  // namespace frc971
diff --git a/y2020/vision/v4l2_reader.h b/y2020/vision/v4l2_reader.h
index 969f4a8..bdf4a8e 100644
--- a/y2020/vision/v4l2_reader.h
+++ b/y2020/vision/v4l2_reader.h
@@ -88,6 +88,8 @@
 
   int Ioctl(unsigned long number, void *arg);
 
+  void StreamOff();
+
   // The mmaped V4L2 buffers.
   std::array<Buffer, kNumberBuffers> buffers_;