Move Actions To Common:

  - Splits Actions into an ActionQueue and Actions and Actors
  - unittested action preemption.

Final cleanup by Brian.

Change-Id: If444ae9902ef6c511898730e042f46f1781f4fb9
diff --git a/aos/common/actions/action_test.cc b/aos/common/actions/action_test.cc
new file mode 100644
index 0000000..df204bc
--- /dev/null
+++ b/aos/common/actions/action_test.cc
@@ -0,0 +1,286 @@
+#include <unistd.h>
+
+#include <memory>
+
+#include "gtest/gtest.h"
+#include "aos/common/queue.h"
+#include "aos/common/queue_testutils.h"
+#include "aos/common/actions/actor.h"
+#include "aos/common/actions/actions.h"
+#include "aos/common/actions/actions.q.h"
+#include "aos/common/actions/test_action.q.h"
+
+using ::aos::time::Time;
+
+namespace aos {
+namespace common {
+namespace actions {
+namespace testing {
+
+class TestActorNOP
+    : public aos::common::actions::ActorBase<actions::TestActionQueueGroup> {
+ public:
+  explicit TestActorNOP(actions::TestActionQueueGroup* s)
+      : actions::ActorBase<actions::TestActionQueueGroup>(s) {}
+
+  void RunAction() { return; }
+};
+
+::std::unique_ptr<
+    aos::common::actions::TypedAction<actions::TestActionQueueGroup>>
+MakeTestActionNOP() {
+  return ::std::unique_ptr<
+      aos::common::actions::TypedAction<actions::TestActionQueueGroup>>(
+      new aos::common::actions::TypedAction<actions::TestActionQueueGroup>(
+          &actions::test_action));
+}
+
+class TestActorShouldCancel
+    : public aos::common::actions::ActorBase<actions::TestActionQueueGroup> {
+ public:
+  explicit TestActorShouldCancel(actions::TestActionQueueGroup* s)
+      : aos::common::actions::ActorBase<actions::TestActionQueueGroup>(s) {}
+
+  void RunAction() {
+    while (!ShouldCancel()) {
+      LOG(FATAL, "NOT CANCELED!!\n");
+    }
+    return;
+  }
+};
+
+::std::unique_ptr<
+    aos::common::actions::TypedAction<actions::TestActionQueueGroup>>
+MakeTestActionShouldCancel() {
+  return ::std::unique_ptr<
+      aos::common::actions::TypedAction<actions::TestActionQueueGroup>>(
+      new aos::common::actions::TypedAction<actions::TestActionQueueGroup>(
+          &actions::test_action));
+}
+
+class ActionTest : public ::testing::Test {
+ protected:
+  ActionTest() {
+    // Flush the robot state queue so we can use clean shared memory for this.
+    // test.
+    actions::test_action.goal.Clear();
+    actions::test_action.status.Clear();
+  }
+
+  virtual ~ActionTest() {
+    actions::test_action.goal.Clear();
+    actions::test_action.status.Clear();
+  }
+
+  // Bring up and down Core.
+  ::aos::common::testing::GlobalCoreInstance my_core;
+  ::aos::common::actions::ActionQueue action_queue_;
+};
+
+// Tests that the the actions exist in a safe state at startup.
+TEST_F(ActionTest, DoesNothing) {
+  // Tick an empty queue and make sure it was not running.
+  EXPECT_FALSE(action_queue_.Running());
+  action_queue_.Tick();
+  EXPECT_FALSE(action_queue_.Running());
+}
+
+// Tests that the queues are properly configured for testing. Tests that queues
+// work exactly as used in the tests.
+TEST_F(ActionTest, QueueCheck) {
+  actions::TestActionQueueGroup* send_side = &actions::test_action;
+  actions::TestActionQueueGroup* recv_side = &actions::test_action;
+
+  send_side->goal.MakeMessage();
+  send_side->goal.MakeWithBuilder().run(1).Send();
+
+  EXPECT_TRUE(recv_side->goal.FetchLatest());
+  EXPECT_TRUE(recv_side->goal->run);
+
+  send_side->goal.MakeWithBuilder().run(0).Send();
+
+  EXPECT_TRUE(recv_side->goal.FetchLatest());
+  EXPECT_FALSE(recv_side->goal->run);
+
+  send_side->status.MakeMessage();
+  send_side->status.MakeWithBuilder().running(5).last_running(6).Send();
+
+  EXPECT_TRUE(recv_side->status.FetchLatest());
+  EXPECT_EQ(5, static_cast<int>(recv_side->status->running));
+  EXPECT_EQ(6, static_cast<int>(recv_side->status->last_running));
+}
+
+// Tests that an action starts and stops.
+TEST_F(ActionTest, ActionQueueWasRunning) {
+  TestActorNOP nop_act(&actions::test_action);
+
+  // Tick an empty queue and make sure it was not running.
+  action_queue_.Tick();
+  EXPECT_FALSE(action_queue_.Running());
+
+  action_queue_.EnqueueAction(MakeTestActionNOP());
+  nop_act.WaitForActionRequest();
+
+  // We started an action and it should be running.
+  EXPECT_TRUE(action_queue_.Running());
+
+  // Tick it and make sure it is still running.
+  action_queue_.Tick();
+  EXPECT_TRUE(action_queue_.Running());
+
+  // Run the action so it can signal completion.
+  nop_act.RunIteration();
+  action_queue_.Tick();
+
+  // Make sure it stopped.
+  EXPECT_FALSE(action_queue_.Running());
+}
+
+// Tests that we can cancel two actions and have them both stop.
+TEST_F(ActionTest, ActionQueueCancelAll) {
+  TestActorNOP nop_act(&actions::test_action);
+
+  // Tick an empty queue and make sure it was not running.
+  action_queue_.Tick();
+  EXPECT_FALSE(action_queue_.Running());
+
+  // Enqueue two actions to test both cancel. We can have an action and a next
+  // action so we want to test that.
+  action_queue_.EnqueueAction(MakeTestActionNOP());
+  action_queue_.EnqueueAction(MakeTestActionNOP());
+  nop_act.WaitForActionRequest();
+  action_queue_.Tick();
+
+  // Check that current and next exist.
+  EXPECT_TRUE(action_queue_.GetCurrentActionState(nullptr, nullptr, nullptr,
+                                                  nullptr, nullptr, nullptr));
+  EXPECT_TRUE(action_queue_.GetNextActionState(nullptr, nullptr, nullptr,
+                                               nullptr, nullptr, nullptr));
+
+  action_queue_.CancelAllActions();
+  action_queue_.Tick();
+
+  // It should still be running as the actor could not have signaled.
+  EXPECT_TRUE(action_queue_.Running());
+
+  bool sent_started, sent_cancel, interrupted;
+  EXPECT_TRUE(action_queue_.GetCurrentActionState(
+      nullptr, &sent_started, &sent_cancel, &interrupted, nullptr, nullptr));
+  EXPECT_TRUE(sent_started);
+  EXPECT_TRUE(sent_cancel);
+  EXPECT_FALSE(interrupted);
+
+  EXPECT_FALSE(action_queue_.GetNextActionState(nullptr, nullptr, nullptr,
+                                                nullptr, nullptr, nullptr));
+
+  // Run the action so it can signal completion.
+  nop_act.RunIteration();
+  action_queue_.Tick();
+
+  // Make sure it stopped.
+  EXPECT_FALSE(action_queue_.Running());
+}
+
+// Tests that an action that would block forever stops when canceled.
+TEST_F(ActionTest, ActionQueueCancelOne) {
+  TestActorShouldCancel cancel_act(&actions::test_action);
+
+  // Enqueue blocking action.
+  action_queue_.EnqueueAction(MakeTestActionShouldCancel());
+
+  cancel_act.WaitForActionRequest();
+  action_queue_.Tick();
+  EXPECT_TRUE(action_queue_.Running());
+
+  // Tell action to cancel.
+  action_queue_.CancelCurrentAction();
+  action_queue_.Tick();
+
+  // This will block forever on failure.
+  // TODO(ben): prolly a bad way to fail
+  cancel_act.RunIteration();
+  action_queue_.Tick();
+
+  // It should still be running as the actor could not have signalled.
+  EXPECT_FALSE(action_queue_.Running());
+}
+
+// Tests that an action starts and stops.
+TEST_F(ActionTest, ActionQueueTwoActions) {
+  TestActorNOP nop_act(&actions::test_action);
+
+  // Tick an empty queue and make sure it was not running.
+  action_queue_.Tick();
+  EXPECT_FALSE(action_queue_.Running());
+
+  // Enqueue action to be canceled.
+  action_queue_.EnqueueAction(MakeTestActionNOP());
+  nop_act.WaitForActionRequest();
+  action_queue_.Tick();
+
+  // Should still be running as the actor could not have signalled.
+  EXPECT_TRUE(action_queue_.Running());
+
+  // id for the first time run.
+  uint32_t nop_act_id = 0;
+  // Check the internal state and write down id for later use.
+  bool sent_started, sent_cancel, interrupted;
+  EXPECT_TRUE(action_queue_.GetCurrentActionState(nullptr, &sent_started,
+                                                  &sent_cancel, &interrupted,
+                                                  &nop_act_id, nullptr));
+  EXPECT_TRUE(sent_started);
+  EXPECT_FALSE(sent_cancel);
+  EXPECT_FALSE(interrupted);
+  ASSERT_NE(0u, nop_act_id);
+
+  // Add the next action which should ensure the first stopped.
+  action_queue_.EnqueueAction(MakeTestActionNOP());
+
+  // id for the second run.
+  uint32_t nop_act2_id = 0;
+  // Check the internal state and write down id for later use.
+  EXPECT_TRUE(action_queue_.GetNextActionState(nullptr, &sent_started,
+                                               &sent_cancel, &interrupted,
+                                               &nop_act2_id, nullptr));
+  EXPECT_NE(nop_act_id, nop_act2_id);
+  EXPECT_FALSE(sent_started);
+  EXPECT_FALSE(sent_cancel);
+  EXPECT_FALSE(interrupted);
+  ASSERT_NE(0u, nop_act2_id);
+
+  action_queue_.Tick();
+
+  // Run the action so it can signal completion.
+  nop_act.RunIteration();
+  action_queue_.Tick();
+  // Wait for the first id to finish, needed for the correct number of fetches.
+  nop_act.WaitForStop(nop_act_id);
+
+  // Start the next action on the actor side.
+  nop_act.WaitForActionRequest();
+
+  // Check the new action is the right one.
+  uint32_t test_id = 0;
+  EXPECT_TRUE(action_queue_.GetCurrentActionState(
+      nullptr, &sent_started, &sent_cancel, &interrupted, &test_id, nullptr));
+  EXPECT_TRUE(sent_started);
+  EXPECT_FALSE(sent_cancel);
+  EXPECT_FALSE(interrupted);
+  EXPECT_EQ(nop_act2_id, test_id);
+
+  // Make sure it is still going.
+  EXPECT_TRUE(action_queue_.Running());
+
+  // Run the next action so it can accomplish signal completion.
+  nop_act.RunIteration();
+  action_queue_.Tick();
+  nop_act.WaitForStop(nop_act_id);
+
+  // Make sure it stopped.
+  EXPECT_FALSE(action_queue_.Running());
+}
+
+}  // namespace testing.
+}  // namespace actions.
+}  // namespace common.
+}  // namespace aos.
diff --git a/aos/common/actions/actions.cc b/aos/common/actions/actions.cc
new file mode 100644
index 0000000..ff3f386
--- /dev/null
+++ b/aos/common/actions/actions.cc
@@ -0,0 +1,75 @@
+#include "aos/common/actions/actions.h"
+
+namespace aos {
+namespace common {
+namespace actions {
+
+void ActionQueue::EnqueueAction(::std::unique_ptr<Action> action) {
+  if (current_action_) {
+    LOG(INFO, "Queueing action, canceling prior\n");
+    current_action_->Cancel();
+    next_action_ = ::std::move(action);
+  } else {
+    LOG(INFO, "Queueing action\n");
+    current_action_ = ::std::move(action);
+    current_action_->Start();
+  }
+}
+
+void ActionQueue::CancelCurrentAction() {
+  LOG(INFO, "Canceling current action\n");
+  if (current_action_) {
+    current_action_->Cancel();
+  }
+}
+
+void ActionQueue::CancelAllActions() {
+  LOG(DEBUG, "Cancelling all actions\n");
+  if (current_action_) {
+    current_action_->Cancel();
+  }
+  next_action_.reset();
+}
+
+void ActionQueue::Tick() {
+  if (current_action_) {
+    if (!current_action_->Running()) {
+      LOG(INFO, "Action is done.\n");
+      current_action_ = ::std::move(next_action_);
+      if (current_action_) {
+        LOG(INFO, "Running next action\n");
+        current_action_->Start();
+      }
+    }
+  }
+}
+
+bool ActionQueue::Running() { return static_cast<bool>(current_action_); }
+
+bool ActionQueue::GetCurrentActionState(bool* has_started, bool* sent_started,
+                                        bool* sent_cancel, bool* interrupted,
+                                        uint32_t* run_value,
+                                        uint32_t* old_run_value) {
+  if (current_action_) {
+    current_action_->GetState(has_started, sent_started, sent_cancel,
+                              interrupted, run_value, old_run_value);
+    return true;
+  }
+  return false;
+}
+
+bool ActionQueue::GetNextActionState(bool* has_started, bool* sent_started,
+                                     bool* sent_cancel, bool* interrupted,
+                                     uint32_t* run_value,
+                                     uint32_t* old_run_value) {
+  if (next_action_) {
+    next_action_->GetState(has_started, sent_started, sent_cancel, interrupted,
+                           run_value, old_run_value);
+    return true;
+  }
+  return false;
+}
+
+}  // namespace actions
+}  // namespace common
+}  // namespace aos
diff --git a/aos/common/actions/actions.gyp b/aos/common/actions/actions.gyp
new file mode 100644
index 0000000..68c52b2
--- /dev/null
+++ b/aos/common/actions/actions.gyp
@@ -0,0 +1,66 @@
+{
+  'targets': [
+    {
+      'target_name': 'action_lib',
+      'type': 'static_library',
+      'sources': [
+        'actions.cc',
+        'actor.cc',
+      ],
+      'dependencies': [
+        '<(AOS)/build/aos.gyp:logging',
+        '<(AOS)/common/common.gyp:queues',
+        '<(AOS)/common/logging/logging.gyp:queue_logging',
+        '<(AOS)/common/common.gyp:time'
+      ],
+      'export_dependent_settings': [
+        '<(AOS)/build/aos.gyp:logging',
+        '<(AOS)/common/common.gyp:queues',
+        '<(AOS)/common/logging/logging.gyp:queue_logging',
+        '<(AOS)/common/common.gyp:time'
+      ],
+    },
+    {
+      'target_name': 'action_queue',
+      'type': 'static_library',
+      'sources': ['actions.q'],
+      'variables': {
+        'header_path': 'aos/common/actions',
+      },
+      'includes': ['../../build/queues.gypi'],
+    },
+    {
+      'target_name': 'test_action_queue',
+      'type': 'static_library',
+      'sources': ['test_action.q'],
+      'variables': {
+        'header_path': 'aos/common/actions',
+      },
+      'dependencies': [
+        'action_queue',
+      ],
+      'export_dependent_settings': [
+        'action_queue',
+      ],
+      'includes': ['../../build/queues.gypi'],
+    },
+    {
+      'target_name': 'action_test',
+      'type': 'executable',
+      'sources': [
+        'action_test.cc',
+      ],
+      'dependencies': [
+        '<(EXTERNALS):gtest',
+        'action_lib',
+        'test_action_queue',
+        '<(AOS)/common/common.gyp:queue_testutils',
+        '<(AOS)/build/aos.gyp:logging',
+        '<(AOS)/common/logging/logging.gyp:queue_logging',
+        '<(AOS)/common/common.gyp:queues',
+        '<(AOS)/common/common.gyp:time',
+        'action_queue'
+      ],
+    },
+  ],
+}
diff --git a/aos/common/actions/actions.h b/aos/common/actions/actions.h
new file mode 100644
index 0000000..08d94f7
--- /dev/null
+++ b/aos/common/actions/actions.h
@@ -0,0 +1,307 @@
+#ifndef AOS_COMMON_ACTIONS_ACTIONS_H_
+#define AOS_COMMON_ACTIONS_ACTIONS_H_
+
+#include <inttypes.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <type_traits>
+#include <atomic>
+#include <memory>
+
+#include "aos/common/logging/logging.h"
+#include "aos/common/queue.h"
+
+namespace aos {
+namespace common {
+namespace actions {
+
+class Action;
+
+// A queue which queues Actions, verifies a single action is running, and
+// cancels Actions.
+class ActionQueue {
+ public:
+  // Queues up an action for sending.
+  void EnqueueAction(::std::unique_ptr<Action> action);
+
+  // Cancels the current action, and runs the next one after the current one has
+  // finished and the action queue ticks again.
+  void CancelCurrentAction();
+
+  // Cancels all running actions.
+  void CancelAllActions();
+
+  // Runs the next action when the current one is finished running and handles
+  // periodically updating various other information.
+  void Tick();
+
+  // Returns true if any action is running or could be running.
+  // For a one cycle faster response, call Tick before running this.
+  // Daniel can think of at least one case (current_action_ is cancelled and
+  // there is no next_action_) in which calling this without running Tick()
+  // first would return a wrong answer.
+  bool Running();
+
+  // Retrieves the internal state of the current action for testing.
+  // See comments on the private members of TypedAction<T> for details.
+  bool GetCurrentActionState(bool* has_started, bool* sent_started,
+                             bool* sent_cancel, bool* interrupted,
+                             uint32_t* run_value, uint32_t* old_run_value);
+
+  // Retrieves the internal state of the next action for testing.
+  // See comments on the private members of TypedAction<T> for details.
+  bool GetNextActionState(bool* has_started, bool* sent_started,
+                          bool* sent_cancel, bool* interrupted,
+                          uint32_t* run_value, uint32_t* old_run_value);
+
+ private:
+  ::std::unique_ptr<Action> current_action_;
+  ::std::unique_ptr<Action> next_action_;
+};
+
+// The basic interface an ActionQueue can access.
+class Action {
+ public:
+  virtual ~Action() {}
+
+  // Cancels the action.
+  void Cancel() { DoCancel(); }
+  // Returns true if the action is currently running.
+  bool Running() { return DoRunning(); }
+  // Starts the action.
+  void Start() { DoStart(); }
+
+  // Waits until the action has finished.
+  void WaitUntilDone() { DoWaitUntilDone(); }
+
+  // Retrieves the internal state of the action for testing.
+  // See comments on the private members of TypedAction<T> for details.
+  void GetState(bool* has_started, bool* sent_started, bool* sent_cancel,
+                bool* interrupted, uint32_t* run_value,
+                uint32_t* old_run_value) {
+    DoGetState(has_started, sent_started, sent_cancel, interrupted, run_value,
+               old_run_value);
+  }
+
+ private:
+  // Cancels the action.
+  virtual void DoCancel() = 0;
+  // Returns true if the action is running or we don't have an initial response
+  // back from it to signal whether or not it is running.
+  virtual bool DoRunning() = 0;
+  // Starts the action if a goal has been created.
+  virtual void DoStart() = 0;
+  // Blocks until complete.
+  virtual void DoWaitUntilDone() = 0;
+  // For testing we will need to get the internal state.
+  // See comments on the private members of TypedAction<T> for details.
+  virtual void DoGetState(bool* has_started, bool* sent_started,
+                          bool* sent_cancel, bool* interrupted,
+                          uint32_t* run_value, uint32_t* old_run_value) = 0;
+};
+
+// Templated subclass to hold the type information.
+template <typename T>
+class TypedAction : public Action {
+ public:
+  // A convenient way to refer to the type of our goals.
+  typedef typename std::remove_reference<decltype(
+      *(static_cast<T*>(NULL)->goal.MakeMessage().get()))>::type GoalType;
+
+  TypedAction(T* queue_group)
+      : queue_group_(queue_group),
+        goal_(queue_group_->goal.MakeMessage()),
+        // This adds 1 to the counter (atomically because it's potentially
+        // shared across threads) and then bitwise-ORs the bottom of the PID to
+        // differentiate it from other processes's values (ie a unique id).
+        run_value_(run_counter_.fetch_add(1, ::std::memory_order_relaxed) |
+                   ((getpid() & 0xFFFF) << 16)) {
+    LOG(INFO, "Action %" PRIx32 " created on queue %s\n", run_value_,
+        queue_group_->goal.name());
+    // Clear out any old status messages from before now.
+    queue_group_->status.FetchLatest();
+  }
+
+  virtual ~TypedAction() {
+    LOG(DEBUG, "Calling destructor of %" PRIx32 "\n", run_value_);
+    DoCancel();
+  }
+
+  // Returns the current goal that will be sent when the action is sent.
+  GoalType* GetGoal() { return goal_.get(); }
+
+ private:
+  void DoCancel() override;
+
+  bool DoRunning() override;
+
+  void DoWaitUntilDone() override;
+
+  // Sets the started flag (also possibly the interrupted flag).
+  void CheckStarted();
+
+  // Checks for interrupt.
+  void CheckInterrupted();
+
+  void DoStart() override;
+
+  void DoGetState(bool* has_started, bool* sent_started, bool* sent_cancel,
+                  bool* interrupted, uint32_t* run_value,
+                  uint32_t* old_run_value) override {
+    if (has_started != nullptr) *has_started = has_started_;
+    if (sent_started != nullptr) *sent_started = sent_started_;
+    if (sent_cancel != nullptr) *sent_cancel = sent_cancel_;
+    if (interrupted != nullptr) *interrupted = interrupted_;
+    if (run_value != nullptr) *run_value = run_value_;
+    if (old_run_value != nullptr) *old_run_value = old_run_value_;
+  }
+
+  T* const queue_group_;
+  ::aos::ScopedMessagePtr<GoalType> goal_;
+
+  // Track if we have seen a response to the start message.
+  bool has_started_ = false;
+  // Track if we have sent an initial start message.
+  bool sent_started_ = false;
+
+  bool sent_cancel_ = false;
+
+  // Gets set to true if we ever see somebody else's value in running.
+  bool interrupted_ = false;
+
+  // The value we're going to use for goal.run etc.
+  const uint32_t run_value_;
+
+  // The old value for running that we may have seen. If we see any value other
+  // than this or run_value_, somebody else got in the way and we're done. 0 if
+  // there was nothing there to start with. Only valid after sent_started_
+  // changes to true.
+  uint32_t old_run_value_;
+
+  static ::std::atomic<uint16_t> run_counter_;
+};
+
+template <typename T>
+::std::atomic<uint16_t> TypedAction<T>::run_counter_{0};
+
+template <typename T>
+void TypedAction<T>::DoCancel() {
+  if (!sent_started_) {
+    LOG(INFO, "Action %" PRIx32 " was never started\n", run_value_);
+  } else {
+    if (interrupted_) {
+      LOG(INFO, "Action %" PRIx32 " was interrupted -> not cancelling\n",
+          run_value_);
+    } else {
+      if (sent_cancel_) {
+        LOG(INFO, "Action %" PRIx32 " already cancelled\n", run_value_);
+      } else {
+        LOG(INFO, "Canceling action %" PRIx32 " on queue %s\n", run_value_,
+            queue_group_->goal.name());
+        queue_group_->goal.MakeWithBuilder().run(0).Send();
+        sent_cancel_ = true;
+      }
+    }
+  }
+}
+
+template <typename T>
+bool TypedAction<T>::DoRunning() {
+  if (!sent_started_) return false;
+  if (has_started_) {
+    queue_group_->status.FetchNext();
+    CheckInterrupted();
+  } else if (queue_group_->status.FetchLatest()) {
+    CheckStarted();
+  }
+  if (interrupted_) return false;
+  // We've asked it to start but haven't gotten confirmation that it's started
+  // yet.
+  if (!has_started_) return true;
+  return queue_group_->status.get() &&
+         queue_group_->status->running == run_value_;
+}
+
+template <typename T>
+void TypedAction<T>::DoWaitUntilDone() {
+  CHECK(sent_started_);
+  queue_group_->status.FetchNext();
+  CheckInterrupted();
+  while (true) {
+    if (interrupted_) return;
+    CheckStarted();
+    queue_group_->status.FetchNextBlocking();
+    CheckStarted();
+    CheckInterrupted();
+    if (has_started_ && (queue_group_->status.get() &&
+                         queue_group_->status->running != run_value_)) {
+      return;
+    }
+  }
+}
+
+template <typename T>
+void TypedAction<T>::CheckStarted() {
+  if (has_started_) return;
+  if (queue_group_->status.get()) {
+    if (queue_group_->status->running == run_value_ ||
+        (queue_group_->status->running == 0 &&
+         queue_group_->status->last_running == run_value_)) {
+      // It's currently running our instance.
+      has_started_ = true;
+      LOG(DEBUG, "Action %" PRIx32 " has been started\n", run_value_);
+    } else if (queue_group_->status->running == old_run_value_) {
+      // It's still running an old instance (or still doing nothing).
+    } else {
+      LOG(WARNING,
+          "Action %" PRIx32 " interrupted by %" PRIx32 " before starting\n",
+          run_value_, queue_group_->status->running);
+      has_started_ = true;
+      interrupted_ = true;
+    }
+  } else {
+    LOG(WARNING, "No status message recieved.\n");
+  }
+}
+
+template <typename T>
+void TypedAction<T>::CheckInterrupted() {
+  if (!interrupted_ && has_started_ && queue_group_->status.get()) {
+    if (queue_group_->status->running != 0 &&
+        queue_group_->status->running != run_value_) {
+      LOG(WARNING,
+          "Action %" PRIx32 " interrupted by %" PRIx32 " after starting\n",
+          run_value_, queue_group_->status->running);
+    }
+  }
+}
+
+template <typename T>
+void TypedAction<T>::DoStart() {
+  if (goal_) {
+    LOG(INFO, "Starting action %" PRIx32 "\n", run_value_);
+    goal_->run = run_value_;
+    sent_started_ = true;
+    if (!goal_.Send()) {
+      LOG(ERROR, "sending goal for action %" PRIx32 " failed\n", run_value_);
+      // Don't wait to see a message with it.
+      has_started_ = true;
+    }
+    queue_group_->status.FetchLatest();
+    if (queue_group_->status.get() && queue_group_->status->running != 0) {
+      old_run_value_ = queue_group_->status->running;
+      LOG(INFO, "Action %" PRIx32 " already running\n", old_run_value_);
+    } else {
+      old_run_value_ = 0;
+    }
+  } else {
+    LOG(WARNING, "Action %" PRIx32 " already started\n", run_value_);
+  }
+}
+
+}  // namespace actions
+}  // namespace common
+}  // namespace aos
+
+#endif  // AOS_COMMON_ACTIONS_ACTIONS_H_
diff --git a/aos/common/actions/actions.q b/aos/common/actions/actions.q
new file mode 100644
index 0000000..fff6326
--- /dev/null
+++ b/aos/common/actions/actions.q
@@ -0,0 +1,29 @@
+package aos.common.actions;
+
+interface StatusInterface {
+  // 0 if the action isn't running or the value from goal.run.
+  uint32_t running;
+};
+
+interface GoalInterface {
+  // 0 to stop or an arbitrary value to put in status.running.
+  uint32_t run;
+};
+
+message Status {
+  // The run value of the instance we're currently running or 0.
+  uint32_t running;
+  // A run value we were previously running or 0.
+  uint32_t last_running;
+};
+
+message Goal {
+  // The unique value to put into status.running while running this instance or
+  // 0 to cancel.
+  uint32_t run;
+};
+
+interface ActionQueueGroup {
+  queue Status status;
+  queue Goal goal;
+};
diff --git a/aos/common/actions/actor.cc b/aos/common/actions/actor.cc
new file mode 100644
index 0000000..6c6bef7
--- /dev/null
+++ b/aos/common/actions/actor.cc
@@ -0,0 +1,9 @@
+#include "aos/common/actions/actor.h"
+
+namespace aos {
+namespace common {
+namespace actions {
+
+}  // namespace actions
+}  // namespace common
+}  // namespace aos
diff --git a/aos/common/actions/actor.h b/aos/common/actions/actor.h
new file mode 100644
index 0000000..958927b
--- /dev/null
+++ b/aos/common/actions/actor.h
@@ -0,0 +1,204 @@
+#ifndef AOS_COMMON_ACTIONS_ACTOR_H_
+#define AOS_COMMON_ACTIONS_ACTOR_H_
+
+#include <stdio.h>
+#include <inttypes.h>
+
+#include <functional>
+
+#include "aos/common/logging/logging.h"
+#include "aos/common/logging/queue_logging.h"
+#include "aos/common/time.h"
+
+namespace aos {
+namespace common {
+namespace actions {
+
+template <class T>
+class ActorBase {
+ public:
+  ActorBase(T* acq) : action_q_(acq) {}
+
+  virtual void RunAction() = 0;
+
+  // Runs action while enabled.
+  void Run();
+
+  // Checks if an action was initially running when the thread started.
+  bool CheckInitialRunning();
+
+  // Wait here until someone asks us to go.
+  void WaitForActionRequest();
+
+  // Do work son.
+  uint32_t RunIteration();
+
+  // Wait for stop is signalled.
+  void WaitForStop(uint32_t running_id);
+
+  // Will run until the done condition is met or times out.
+  // Will return false if successful and true if the action was canceled or
+  // failed or end_time was reached before it succeeded.
+  // Done condition are defined as functions that return true when done and have
+  // some sort of blocking statement (such as FetchNextBlocking) to throttle
+  // spin rate.
+  // end_time is when to stop and return true. Time(0, 0) (the default) means
+  // never time out.
+  bool WaitUntil(::std::function<bool(void)> done_condition,
+                 const ::aos::time::Time& end_time = ::aos::time::Time(0, 0));
+
+  // Returns true if the action should be canceled.
+  bool ShouldCancel();
+
+ protected:
+  // Set to true when we should stop ASAP.
+  bool abort_ = false;
+
+  // The queue for this action.
+  T* action_q_;
+};
+
+template <class T>
+bool ActorBase<T>::CheckInitialRunning() {
+  LOG(DEBUG, "Waiting for input to start\n");
+
+  if (action_q_->goal.FetchLatest()) {
+    LOG_STRUCT(DEBUG, "goal queue ", *action_q_->goal);
+    const uint32_t initially_running = action_q_->goal->run;
+    if (initially_running != 0) {
+      while (action_q_->goal->run == initially_running) {
+        LOG(INFO, "run is still %" PRIx32 "\n", initially_running);
+        action_q_->goal.FetchNextBlocking();
+        LOG_STRUCT(DEBUG, "goal queue ", *action_q_->goal);
+      }
+    }
+    return true;
+  }
+  return false;
+}
+
+template <class T>
+void ActorBase<T>::WaitForActionRequest() {
+  while (action_q_->goal.get() == nullptr || !action_q_->goal->run) {
+    LOG(INFO, "Waiting for an action request.\n");
+    action_q_->goal.FetchNextBlocking();
+    LOG_STRUCT(DEBUG, "goal queue ", *action_q_->goal);
+    if (!action_q_->goal->run) {
+      if (!action_q_->status.MakeWithBuilder()
+               .running(0)
+               .last_running(0)
+               .Send()) {
+        LOG(ERROR, "Failed to send the status.\n");
+      }
+    }
+  }
+}
+
+template <class T>
+uint32_t ActorBase<T>::RunIteration() {
+  CHECK(action_q_->goal.get() != nullptr);
+  const uint32_t running_id = action_q_->goal->run;
+  LOG(INFO, "Starting action %" PRIx32 "\n", running_id);
+  if (!action_q_->status.MakeWithBuilder()
+           .running(running_id)
+           .last_running(0)
+           .Send()) {
+    LOG(ERROR, "Failed to send the status.\n");
+  }
+  RunAction();
+  LOG(INFO, "Done with action %" PRIx32 "\n", running_id);
+
+  // If we have a new one to run, we shouldn't say we're stopped in between.
+  if (action_q_->goal->run == 0 || action_q_->goal->run == running_id) {
+    if (!action_q_->status.MakeWithBuilder()
+             .running(0)
+             .last_running(running_id)
+             .Send()) {
+      LOG(ERROR, "Failed to send the status.\n");
+    } else {
+      LOG(INFO, "Sending Done status %" PRIx32 "\n", running_id);
+    }
+  } else {
+    LOG(INFO, "skipping sending stopped status for %" PRIx32 "\n", running_id);
+  }
+
+  return running_id;
+}
+
+template <class T>
+void ActorBase<T>::WaitForStop(uint32_t running_id) {
+  assert(action_q_->goal.get() != nullptr);
+  while (action_q_->goal->run == running_id) {
+    LOG(INFO, "Waiting for the action (%" PRIx32 ") to be stopped.\n",
+        running_id);
+    action_q_->goal.FetchNextBlocking();
+    LOG_STRUCT(DEBUG, "goal queue ", *action_q_->goal);
+  }
+}
+
+template <class T>
+void ActorBase<T>::Run() {
+
+  // Make sure the last job is done and we have a signal.
+  CheckInitialRunning();
+
+  if (!action_q_->status.MakeWithBuilder().running(0).last_running(0).Send()) {
+    LOG(ERROR, "Failed to send the status.\n");
+  }
+
+  LOG(DEBUG, "action %" PRIx32 " was stopped\n", action_q_->goal->run);
+
+  while (true) {
+    // Wait for a request to come in before starting.
+    WaitForActionRequest();
+
+    // Perform the action once.
+    uint32_t running_id = RunIteration();
+
+    // Don't start again until asked.
+    WaitForStop(running_id);
+    LOG(DEBUG, "action %" PRIx32 " was stopped\n", running_id);
+  }
+}
+
+template <class T>
+bool ActorBase<T>::WaitUntil(::std::function<bool(void)> done_condition,
+                             const ::aos::time::Time& end_time) {
+  while (!done_condition()) {
+    if (ShouldCancel() || abort_) {
+      // Clear abort bit as we have just aborted.
+      abort_ = false;
+      return true;
+    }
+    if (end_time != ::aos::time::Time(0, 0) &&
+        ::aos::time::Time::Now() >= end_time) {
+      LOG(INFO, "WaitUntil timed out\n");
+      return true;
+    }
+  }
+  if (ShouldCancel() || abort_) {
+    // Clear abort bit as we have just aborted.
+    abort_ = false;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+template <class T>
+bool ActorBase<T>::ShouldCancel() {
+  if (action_q_->goal.FetchNext()) {
+    LOG_STRUCT(DEBUG, "goal queue ", *action_q_->goal);
+  }
+  bool ans = !action_q_->goal->run;
+  if (ans) {
+    LOG(INFO, "Time to stop action\n");
+  }
+  return ans;
+}
+
+}  // namespace actions
+}  // namespace common
+}  // namespace aos
+
+#endif  // AOS_COMMON_ACTIONS_ACTOR_H_
diff --git a/aos/common/actions/test_action.q b/aos/common/actions/test_action.q
new file mode 100644
index 0000000..c46e182
--- /dev/null
+++ b/aos/common/actions/test_action.q
@@ -0,0 +1,17 @@
+package aos.common.actions;
+
+import "aos/common/actions/actions.q";
+
+queue_group TestActionQueueGroup {
+  implements aos.common.actions.ActionQueueGroup;
+
+  message Goal {
+    uint32_t run;
+    double test_value;
+  };
+
+  queue Goal goal;
+  queue aos.common.actions.Status status;
+};
+
+queue_group TestActionQueueGroup test_action;