Convert actions to event loops
The logic gets significantly simpler due to Watchers. But we also get
to port all the users over as well.
Change-Id: Ib4e75951e65f7431acc6c1548b7f1d20da3da295
diff --git a/aos/actions/BUILD b/aos/actions/BUILD
index c4082ab..25cc6e3 100644
--- a/aos/actions/BUILD
+++ b/aos/actions/BUILD
@@ -1,59 +1,60 @@
-package(default_visibility = ['//visibility:public'])
+package(default_visibility = ["//visibility:public"])
-load('//aos/build:queues.bzl', 'queue_library')
+load("//aos/build:queues.bzl", "queue_library")
cc_library(
- name = 'action_lib',
- srcs = [
- 'actions.cc',
- 'actor.cc',
- ],
- hdrs = [
- 'actions.h',
- 'actor.h',
- ],
- deps = [
- '//aos/logging',
- '//aos:queues',
- '//aos/logging:queue_logging',
- '//aos/time:time',
- '//aos/controls:control_loop',
- '//aos/util:phased_loop',
- ],
+ name = "action_lib",
+ srcs = [
+ "actions.cc",
+ "actor.cc",
+ ],
+ hdrs = [
+ "actions.h",
+ "actor.h",
+ ],
+ deps = [
+ "//aos:queues",
+ "//aos/controls:control_loop",
+ "//aos/logging",
+ "//aos/logging:queue_logging",
+ "//aos/time",
+ "//aos/util:phased_loop",
+ ],
)
queue_library(
- name = 'action_queue',
- srcs = [
- 'actions.q',
- ],
+ name = "action_queue",
+ srcs = [
+ "actions.q",
+ ],
)
queue_library(
- name = 'test_action_queue',
- srcs = [
- 'test_action.q',
- ],
- deps = [
- ':action_queue',
- ],
+ name = "test_action_queue",
+ srcs = [
+ "test_action.q",
+ ],
+ deps = [
+ ":action_queue",
+ ],
)
cc_test(
- name = 'action_test',
- srcs = [
- 'action_test.cc',
- ],
- deps = [
- '//aos/testing:googletest',
- ':action_lib',
- ':test_action_queue',
- '//aos/testing:test_shm',
- '//aos/logging',
- '//aos/logging:queue_logging',
- '//aos:queues',
- '//aos/time:time',
- ':action_queue',
- '//aos:event',
- ],
+ name = "action_test",
+ srcs = [
+ "action_test.cc",
+ ],
+ deps = [
+ ":action_lib",
+ ":action_queue",
+ ":test_action_queue",
+ "//aos:event",
+ "//aos:queues",
+ "//aos/events:simulated_event_loop",
+ "//aos/logging",
+ "//aos/logging:queue_logging",
+ "//aos/testing:googletest",
+ "//aos/testing:test_shm",
+ "//aos/time",
+ ],
)
diff --git a/aos/actions/action_test.cc b/aos/actions/action_test.cc
index 09e279c..45cfd62 100644
--- a/aos/actions/action_test.cc
+++ b/aos/actions/action_test.cc
@@ -6,12 +6,13 @@
#include "gtest/gtest.h"
-#include "aos/queue.h"
-#include "aos/actions/actor.h"
#include "aos/actions/actions.h"
#include "aos/actions/actions.q.h"
+#include "aos/actions/actor.h"
#include "aos/actions/test_action.q.h"
-#include "aos/event.h"
+#include "aos/events/simulated-event-loop.h"
+#include "aos/queue.h"
+#include "aos/testing/test_logging.h"
#include "aos/testing/test_shm.h"
namespace aos {
@@ -25,8 +26,15 @@
class TestActorIndex
: public aos::common::actions::ActorBase<actions::TestActionQueueGroup> {
public:
- explicit TestActorIndex(actions::TestActionQueueGroup *s)
- : aos::common::actions::ActorBase<actions::TestActionQueueGroup>(s) {}
+ typedef TypedActionFactory<actions::TestActionQueueGroup> Factory;
+
+ explicit TestActorIndex(::aos::EventLoop *event_loop)
+ : aos::common::actions::ActorBase<actions::TestActionQueueGroup>(
+ event_loop, ".aos.common.actions.test_action") {}
+
+ static Factory MakeFactory(::aos::EventLoop *event_loop) {
+ return Factory(event_loop, ".aos.common.actions.test_action");
+ }
bool RunAction(const uint32_t &new_index) override {
index = new_index;
@@ -36,35 +44,34 @@
uint32_t index = 0;
};
-::std::unique_ptr<
- aos::common::actions::TypedAction<actions::TestActionQueueGroup>>
-MakeTestActionIndex(uint32_t index) {
- return ::std::unique_ptr<
- aos::common::actions::TypedAction<actions::TestActionQueueGroup>>(
- new aos::common::actions::TypedAction<actions::TestActionQueueGroup>(
- &actions::test_action, index));
-}
-
class TestActorNOP
: public aos::common::actions::ActorBase<actions::TestActionQueueGroup> {
public:
- explicit TestActorNOP(actions::TestActionQueueGroup *s)
- : actions::ActorBase<actions::TestActionQueueGroup>(s) {}
+ typedef TypedActionFactory<actions::TestActionQueueGroup> Factory;
+
+ explicit TestActorNOP(::aos::EventLoop *event_loop)
+ : actions::ActorBase<actions::TestActionQueueGroup>(
+ event_loop, ".aos.common.actions.test_action") {}
+
+ static Factory MakeFactory(::aos::EventLoop *event_loop) {
+ return Factory(event_loop, ".aos.common.actions.test_action");
+ }
bool RunAction(const uint32_t &) override { return true; }
};
-::std::unique_ptr<
- aos::common::actions::TypedAction<actions::TestActionQueueGroup>>
-MakeTestActionNOP() {
- return MakeTestActionIndex(0);
-}
-
class TestActorShouldCancel
: public aos::common::actions::ActorBase<actions::TestActionQueueGroup> {
public:
- explicit TestActorShouldCancel(actions::TestActionQueueGroup *s)
- : aos::common::actions::ActorBase<actions::TestActionQueueGroup>(s) {}
+ typedef TypedActionFactory<actions::TestActionQueueGroup> Factory;
+
+ explicit TestActorShouldCancel(::aos::EventLoop *event_loop)
+ : aos::common::actions::ActorBase<actions::TestActionQueueGroup>(
+ event_loop, ".aos.common.actions.test_action") {}
+
+ static Factory MakeFactory(::aos::EventLoop *event_loop) {
+ return Factory(event_loop, ".aos.common.actions.test_action");
+ }
bool RunAction(const uint32_t &) override {
while (!ShouldCancel()) {
@@ -74,407 +81,370 @@
}
};
-::std::unique_ptr<
- aos::common::actions::TypedAction<actions::TestActionQueueGroup>>
-MakeTestActionShouldCancel() {
- return MakeTestActionIndex(0);
-}
-
class TestActor2Nop
: public aos::common::actions::ActorBase<actions::TestAction2QueueGroup> {
public:
- explicit TestActor2Nop(actions::TestAction2QueueGroup *s)
- : actions::ActorBase<actions::TestAction2QueueGroup>(s) {}
+ typedef TypedActionFactory<actions::TestAction2QueueGroup> Factory;
+
+ explicit TestActor2Nop(::aos::EventLoop *event_loop)
+ : actions::ActorBase<actions::TestAction2QueueGroup>(
+ event_loop, ".aos.common.actions.test_action2") {}
+
+ static Factory MakeFactory(::aos::EventLoop *event_loop) {
+ return Factory(event_loop, ".aos.common.actions.test_action2");
+ }
bool RunAction(const actions::MyParams &) { return true; }
};
-::std::unique_ptr<
- aos::common::actions::TypedAction<actions::TestAction2QueueGroup>>
-MakeTestAction2NOP(const actions::MyParams ¶ms) {
- return ::std::unique_ptr<
- aos::common::actions::TypedAction<actions::TestAction2QueueGroup>>(
- new aos::common::actions::TypedAction<actions::TestAction2QueueGroup>(
- &actions::test_action2, params));
-}
-
class ActionTest : public ::testing::Test {
protected:
- ActionTest() {
- actions::test_action.goal.Clear();
- actions::test_action.status.Clear();
- actions::test_action2.goal.Clear();
- actions::test_action2.status.Clear();
- }
-
- virtual ~ActionTest() {
- actions::test_action.goal.Clear();
- actions::test_action.status.Clear();
- actions::test_action2.goal.Clear();
- actions::test_action2.status.Clear();
+ ActionTest()
+ : actor1_event_loop_(event_loop_factory_.MakeEventLoop()),
+ actor2_event_loop_(event_loop_factory_.MakeEventLoop()),
+ test_event_loop_(event_loop_factory_.MakeEventLoop()) {
+ ::aos::testing::EnableTestLogging();
}
// Bring up and down Core.
- ::aos::testing::TestSharedMemory my_shm_;
- ::aos::common::actions::ActionQueue action_queue_;
+ ::aos::SimulatedEventLoopFactory event_loop_factory_;
+
+ ::std::unique_ptr<::aos::EventLoop> actor1_event_loop_;
+ ::std::unique_ptr<::aos::EventLoop> actor2_event_loop_;
+ ::std::unique_ptr<::aos::EventLoop> test_event_loop_;
+
};
// Tests that the the actions exist in a safe state at startup.
TEST_F(ActionTest, DoesNothing) {
+ ActionQueue action_queue;
// Tick an empty queue and make sure it was not running.
- EXPECT_FALSE(action_queue_.Running());
- action_queue_.Tick();
- EXPECT_FALSE(action_queue_.Running());
+ EXPECT_FALSE(action_queue.Running());
+ action_queue.Tick();
+ EXPECT_FALSE(action_queue.Running());
}
// Tests that starting with an old run message in the goal queue actually works.
// This used to result in the client hanging, waiting for a response to its
// cancel message.
TEST_F(ActionTest, StartWithOldGoal) {
- ASSERT_TRUE(actions::test_action.goal.MakeWithBuilder().run(971).Send());
+ ::std::unique_ptr<::aos::EventLoop> test2_event_loop =
+ event_loop_factory_.MakeEventLoop();
+ ::aos::Sender<TestActionQueueGroup::Goal> goal_sender =
+ test2_event_loop->MakeSender<TestActionQueueGroup::Goal>(
+ ".aos.common.actions.test_action.goal");
+ ::aos::Fetcher<Status> status_fetcher = test2_event_loop->MakeFetcher<Status>(
+ ".aos.common.actions.test_action.status");
- TestActorNOP nop_act(&actions::test_action);
+ TestActorIndex::Factory nop_actor_factory =
+ TestActorNOP::MakeFactory(test_event_loop_.get());
- ASSERT_FALSE(actions::test_action.status.FetchLatest());
- ::std::thread init_thread([&nop_act]() { nop_act.Initialize(); });
- ::std::this_thread::sleep_for(chrono::milliseconds(100));
- ASSERT_TRUE(actions::test_action.goal.MakeWithBuilder().run(1).Send());
- init_thread.join();
- ASSERT_TRUE(actions::test_action.status.FetchLatest());
- EXPECT_EQ(0u, actions::test_action.status->running);
- EXPECT_EQ(0u, actions::test_action.status->last_running);
+ ActionQueue action_queue;
- action_queue_.EnqueueAction(MakeTestActionNOP());
- nop_act.WaitForActionRequest();
+ {
+ auto goal_message = goal_sender.MakeMessage();
+ goal_message->run = 971;
+ ASSERT_TRUE(goal_message.Send());
+ }
+
+ TestActorNOP nop_act(actor1_event_loop_.get());
+
+ ASSERT_FALSE(status_fetcher.Fetch());
+
+ event_loop_factory_.RunFor(chrono::seconds(1));
+
+ ASSERT_TRUE(status_fetcher.Fetch());
+ EXPECT_EQ(0u, status_fetcher->running);
+ EXPECT_EQ(0u, status_fetcher->last_running);
+
+ action_queue.EnqueueAction(nop_actor_factory.Make(0));
// We started an action and it should be running.
- EXPECT_TRUE(action_queue_.Running());
+ EXPECT_TRUE(action_queue.Running());
- action_queue_.CancelAllActions();
- action_queue_.Tick();
+ action_queue.CancelAllActions();
+ action_queue.Tick();
- EXPECT_TRUE(action_queue_.Running());
+ EXPECT_TRUE(action_queue.Running());
// Run the action so it can signal completion.
- nop_act.RunIteration();
- action_queue_.Tick();
+ event_loop_factory_.RunFor(chrono::seconds(1));
+ action_queue.Tick();
// Make sure it stopped.
- EXPECT_FALSE(action_queue_.Running());
-}
-
-// Tests that the queues are properly configured for testing. Tests that queues
-// work exactly as used in the tests.
-TEST_F(ActionTest, QueueCheck) {
- actions::TestActionQueueGroup *send_side = &actions::test_action;
- actions::TestActionQueueGroup *recv_side = &actions::test_action;
-
- send_side->goal.MakeWithBuilder().run(1).Send();
-
- EXPECT_TRUE(recv_side->goal.FetchLatest());
- EXPECT_TRUE(recv_side->goal->run);
-
- send_side->goal.MakeWithBuilder().run(0).Send();
-
- EXPECT_TRUE(recv_side->goal.FetchLatest());
- EXPECT_FALSE(recv_side->goal->run);
-
- send_side->status.MakeWithBuilder().running(5).last_running(6).Send();
-
- EXPECT_TRUE(recv_side->status.FetchLatest());
- EXPECT_EQ(5, static_cast<int>(recv_side->status->running));
- EXPECT_EQ(6, static_cast<int>(recv_side->status->last_running));
+ EXPECT_FALSE(action_queue.Running());
}
// Tests that an action starts and stops.
TEST_F(ActionTest, ActionQueueWasRunning) {
- TestActorNOP nop_act(&actions::test_action);
+ TestActorNOP nop_act(actor1_event_loop_.get());
+
+ TestActorIndex::Factory nop_actor_factory =
+ TestActorNOP::MakeFactory(test_event_loop_.get());
+
+ ActionQueue action_queue;
// Tick an empty queue and make sure it was not running.
- action_queue_.Tick();
- EXPECT_FALSE(action_queue_.Running());
+ event_loop_factory_.RunFor(chrono::seconds(1));
+ action_queue.Tick();
+ EXPECT_FALSE(action_queue.Running());
- action_queue_.EnqueueAction(MakeTestActionNOP());
- nop_act.WaitForActionRequest();
+ action_queue.EnqueueAction(nop_actor_factory.Make(0));
// We started an action and it should be running.
- EXPECT_TRUE(action_queue_.Running());
+ EXPECT_TRUE(action_queue.Running());
// Tick it and make sure it is still running.
- action_queue_.Tick();
- EXPECT_TRUE(action_queue_.Running());
+ action_queue.Tick();
+ EXPECT_TRUE(action_queue.Running());
// Run the action so it can signal completion.
- nop_act.RunIteration();
- action_queue_.Tick();
+ event_loop_factory_.RunFor(chrono::seconds(1));
+ action_queue.Tick();
// Make sure it stopped.
- EXPECT_FALSE(action_queue_.Running());
+ EXPECT_FALSE(action_queue.Running());
}
// Tests that we can cancel two actions and have them both stop.
TEST_F(ActionTest, ActionQueueCancelAll) {
- TestActorNOP nop_act(&actions::test_action);
+ TestActorNOP nop_act(actor1_event_loop_.get());
- // Tick an empty queue and make sure it was not running.
- action_queue_.Tick();
- EXPECT_FALSE(action_queue_.Running());
+ TestActorIndex::Factory nop_actor_factory =
+ TestActorNOP::MakeFactory(test_event_loop_.get());
+
+ ActionQueue action_queue;
+
+ // Let the actor and action queue start up and confirm nothing is running.
+ event_loop_factory_.RunFor(chrono::seconds(1));
+ action_queue.Tick();
+
+ EXPECT_FALSE(action_queue.Running());
// Enqueue two actions to test both cancel. We can have an action and a next
// action so we want to test that.
- action_queue_.EnqueueAction(MakeTestActionNOP());
- action_queue_.EnqueueAction(MakeTestActionNOP());
- nop_act.WaitForActionRequest();
- action_queue_.Tick();
+ action_queue.EnqueueAction(nop_actor_factory.Make(0));
+ action_queue.EnqueueAction(nop_actor_factory.Make(0));
+
+ action_queue.Tick();
// Check that current and next exist.
- EXPECT_TRUE(action_queue_.GetCurrentActionState(nullptr, nullptr, nullptr,
- nullptr, nullptr, nullptr));
- EXPECT_TRUE(action_queue_.GetNextActionState(nullptr, nullptr, nullptr,
- nullptr, nullptr, nullptr));
+ EXPECT_TRUE(action_queue.GetCurrentActionState(nullptr, nullptr, nullptr,
+ nullptr, nullptr, nullptr));
+ EXPECT_TRUE(action_queue.GetNextActionState(nullptr, nullptr, nullptr,
+ nullptr, nullptr, nullptr));
- action_queue_.CancelAllActions();
- action_queue_.Tick();
+ action_queue.CancelAllActions();
+ action_queue.Tick();
// It should still be running as the actor could not have signaled.
- EXPECT_TRUE(action_queue_.Running());
+ EXPECT_TRUE(action_queue.Running());
bool sent_started, sent_cancel, interrupted;
- EXPECT_TRUE(action_queue_.GetCurrentActionState(
+ EXPECT_TRUE(action_queue.GetCurrentActionState(
nullptr, &sent_started, &sent_cancel, &interrupted, nullptr, nullptr));
EXPECT_TRUE(sent_started);
EXPECT_TRUE(sent_cancel);
EXPECT_FALSE(interrupted);
- EXPECT_FALSE(action_queue_.GetNextActionState(nullptr, nullptr, nullptr,
- nullptr, nullptr, nullptr));
+ EXPECT_FALSE(action_queue.GetNextActionState(nullptr, nullptr, nullptr,
+ nullptr, nullptr, nullptr));
// Run the action so it can signal completion.
- nop_act.RunIteration();
- action_queue_.Tick();
+ event_loop_factory_.RunFor(chrono::seconds(1));
+
+ action_queue.Tick();
// Make sure it stopped.
- EXPECT_FALSE(action_queue_.Running());
+ EXPECT_FALSE(action_queue.Running());
+ EXPECT_EQ(1, nop_act.running_count());
}
// Tests that an action that would block forever stops when canceled.
TEST_F(ActionTest, ActionQueueCancelOne) {
- TestActorShouldCancel cancel_act(&actions::test_action);
+ TestActorShouldCancel cancel_act(actor1_event_loop_.get());
+
+ TestActorShouldCancel::Factory cancel_action_factory =
+ TestActorShouldCancel::MakeFactory(test_event_loop_.get());
+
+ ActionQueue action_queue;
+
+ // Let the actor and action queue start up.
+ event_loop_factory_.RunFor(chrono::seconds(1));
+ action_queue.Tick();
// Enqueue blocking action.
- action_queue_.EnqueueAction(MakeTestActionShouldCancel());
+ action_queue.EnqueueAction(cancel_action_factory.Make(0));
- cancel_act.WaitForActionRequest();
- action_queue_.Tick();
- EXPECT_TRUE(action_queue_.Running());
+ action_queue.Tick();
+ EXPECT_TRUE(action_queue.Running());
// Tell action to cancel.
- action_queue_.CancelCurrentAction();
- action_queue_.Tick();
+ action_queue.CancelCurrentAction();
+ action_queue.Tick();
// This will block forever on failure.
// TODO(ben): prolly a bad way to fail
- cancel_act.RunIteration();
- action_queue_.Tick();
+ event_loop_factory_.RunFor(chrono::seconds(1));
+ action_queue.Tick();
// It should still be running as the actor could not have signalled.
- EXPECT_FALSE(action_queue_.Running());
+ EXPECT_FALSE(action_queue.Running());
}
-// Tests that an action starts and stops.
+// Tests that 2 actions in a row causes the second one to cancel the first one.
TEST_F(ActionTest, ActionQueueTwoActions) {
- TestActorNOP nop_act(&actions::test_action);
+ TestActorNOP nop_actor(actor1_event_loop_.get());
+ TestActorIndex::Factory nop_actor_factory =
+ TestActorNOP::MakeFactory(test_event_loop_.get());
+
+ ActionQueue action_queue;
// Tick an empty queue and make sure it was not running.
- action_queue_.Tick();
- EXPECT_FALSE(action_queue_.Running());
+ event_loop_factory_.RunFor(chrono::seconds(1));
+ action_queue.Tick();
+ EXPECT_FALSE(action_queue.Running());
// Enqueue action to be canceled.
- action_queue_.EnqueueAction(MakeTestActionNOP());
- nop_act.WaitForActionRequest();
- action_queue_.Tick();
+ action_queue.EnqueueAction(nop_actor_factory.Make(0));
+ action_queue.Tick();
// Should still be running as the actor could not have signalled.
- EXPECT_TRUE(action_queue_.Running());
+ EXPECT_TRUE(action_queue.Running());
// id for the first time run.
- uint32_t nop_act_id = 0;
+ uint32_t nop_actor_id = 0;
// Check the internal state and write down id for later use.
bool sent_started, sent_cancel, interrupted;
- EXPECT_TRUE(action_queue_.GetCurrentActionState(nullptr, &sent_started,
- &sent_cancel, &interrupted,
- &nop_act_id, nullptr));
+ EXPECT_TRUE(action_queue.GetCurrentActionState(nullptr, &sent_started,
+ &sent_cancel, &interrupted,
+ &nop_actor_id, nullptr));
EXPECT_TRUE(sent_started);
EXPECT_FALSE(sent_cancel);
EXPECT_FALSE(interrupted);
- ASSERT_NE(0u, nop_act_id);
+ ASSERT_NE(0u, nop_actor_id);
// Add the next action which should ensure the first stopped.
- action_queue_.EnqueueAction(MakeTestActionNOP());
+ action_queue.EnqueueAction(nop_actor_factory.Make(0));
// id for the second run.
- uint32_t nop_act2_id = 0;
+ uint32_t nop_actor2_id = 0;
// Check the internal state and write down id for later use.
- EXPECT_TRUE(action_queue_.GetNextActionState(nullptr, &sent_started,
- &sent_cancel, &interrupted,
- &nop_act2_id, nullptr));
- EXPECT_NE(nop_act_id, nop_act2_id);
+ EXPECT_TRUE(action_queue.GetNextActionState(nullptr, &sent_started,
+ &sent_cancel, &interrupted,
+ &nop_actor2_id, nullptr));
+ EXPECT_NE(nop_actor_id, nop_actor2_id);
EXPECT_FALSE(sent_started);
EXPECT_FALSE(sent_cancel);
EXPECT_FALSE(interrupted);
- ASSERT_NE(0u, nop_act2_id);
+ ASSERT_NE(0u, nop_actor2_id);
- action_queue_.Tick();
+ action_queue.Tick();
// Run the action so it can signal completion.
- nop_act.RunIteration();
- action_queue_.Tick();
- // Wait for the first id to finish, needed for the correct number of fetches.
- nop_act.WaitForStop(nop_act_id);
+ event_loop_factory_.RunFor(chrono::seconds(1));
- // Start the next action on the actor side.
- nop_act.WaitForActionRequest();
+ action_queue.Tick();
// Check the new action is the right one.
uint32_t test_id = 0;
- EXPECT_TRUE(action_queue_.GetCurrentActionState(
+ EXPECT_TRUE(action_queue.GetCurrentActionState(
nullptr, &sent_started, &sent_cancel, &interrupted, &test_id, nullptr));
EXPECT_TRUE(sent_started);
EXPECT_FALSE(sent_cancel);
EXPECT_FALSE(interrupted);
- EXPECT_EQ(nop_act2_id, test_id);
+ EXPECT_EQ(nop_actor2_id, test_id);
// Make sure it is still going.
- EXPECT_TRUE(action_queue_.Running());
+ EXPECT_TRUE(action_queue.Running());
- // Run the next action so it can accomplish signal completion.
- nop_act.RunIteration();
- action_queue_.Tick();
- nop_act.WaitForStop(nop_act_id);
+ // Now let everything finish.
+ event_loop_factory_.RunFor(chrono::seconds(1));
+ action_queue.Tick();
// Make sure it stopped.
- EXPECT_FALSE(action_queue_.Running());
+ EXPECT_FALSE(action_queue.Running());
}
// Tests that we do get an index with our goal
TEST_F(ActionTest, ActionIndex) {
- TestActorIndex idx_act(&actions::test_action);
+ TestActorIndex idx_actor(actor1_event_loop_.get());
- // Tick an empty queue and make sure it was not running.
- action_queue_.Tick();
- EXPECT_FALSE(action_queue_.Running());
+ TestActorIndex::Factory test_actor_index_factory =
+ TestActorIndex::MakeFactory(test_event_loop_.get());
+
+ ActionQueue action_queue;
+ // Tick an empty queue and make sure it was not running. Also tick the
+ // factory to allow it to send out the initial cancel message.
+ event_loop_factory_.RunFor(chrono::seconds(1));
+ action_queue.Tick();
+
+ EXPECT_FALSE(action_queue.Running());
// Enqueue action to post index.
- action_queue_.EnqueueAction(MakeTestActionIndex(5));
- EXPECT_TRUE(actions::test_action.goal.FetchLatest());
- EXPECT_EQ(5u, actions::test_action.goal->params);
- EXPECT_EQ(0u, idx_act.index);
+ action_queue.EnqueueAction(test_actor_index_factory.Make(5));
+ ::aos::Fetcher<actions::TestActionQueueGroup::Goal> goal_fetcher_ =
+ test_event_loop_->MakeFetcher<actions::TestActionQueueGroup::Goal>(
+ ".aos.common.actions.test_action.goal");
- idx_act.WaitForActionRequest();
- action_queue_.Tick();
+ ASSERT_TRUE(goal_fetcher_.Fetch());
+ EXPECT_EQ(5u, goal_fetcher_->params);
+ EXPECT_EQ(0u, idx_actor.index);
- // Check the new action is the right one.
- uint32_t test_id = 0;
- EXPECT_TRUE(action_queue_.GetCurrentActionState(nullptr, nullptr, nullptr,
- nullptr, &test_id, nullptr));
+ action_queue.Tick();
// Run the next action so it can accomplish signal completion.
- idx_act.RunIteration();
- action_queue_.Tick();
- idx_act.WaitForStop(test_id);
- EXPECT_EQ(5u, idx_act.index);
+ event_loop_factory_.RunFor(chrono::seconds(1));
+
+ action_queue.Tick();
+ EXPECT_EQ(5u, idx_actor.index);
// Enqueue action to post index.
- action_queue_.EnqueueAction(MakeTestActionIndex(3));
- EXPECT_TRUE(actions::test_action.goal.FetchLatest());
- EXPECT_EQ(3u, actions::test_action.goal->params);
+ action_queue.EnqueueAction(test_actor_index_factory.Make(3));
+ ASSERT_TRUE(goal_fetcher_.Fetch());
+ EXPECT_EQ(3u, goal_fetcher_->params);
// Run the next action so it can accomplish signal completion.
- idx_act.RunIteration();
- action_queue_.Tick();
- idx_act.WaitForStop(test_id);
- EXPECT_EQ(3u, idx_act.index);
+ event_loop_factory_.RunFor(chrono::seconds(1));
+
+ action_queue.Tick();
+ EXPECT_EQ(3u, idx_actor.index);
}
// Tests that an action with a structure params works.
TEST_F(ActionTest, StructParamType) {
- TestActor2Nop nop_act(&actions::test_action2);
+ TestActor2Nop nop_actor(actor2_event_loop_.get());
+ TestActor2Nop::Factory test_action_2_nop_factory =
+ TestActor2Nop::MakeFactory(test_event_loop_.get());
+
+ ActionQueue action_queue;
// Tick an empty queue and make sure it was not running.
- action_queue_.Tick();
- EXPECT_FALSE(action_queue_.Running());
+ action_queue.Tick();
+ EXPECT_FALSE(action_queue.Running());
actions::MyParams p;
p.param1 = 5.0;
p.param2 = 7;
- action_queue_.EnqueueAction(MakeTestAction2NOP(p));
- nop_act.WaitForActionRequest();
+ action_queue.EnqueueAction(test_action_2_nop_factory.Make(p));
// We started an action and it should be running.
- EXPECT_TRUE(action_queue_.Running());
+ EXPECT_TRUE(action_queue.Running());
// Tick it and make sure it is still running.
- action_queue_.Tick();
- EXPECT_TRUE(action_queue_.Running());
+ action_queue.Tick();
+ EXPECT_TRUE(action_queue.Running());
// Run the action so it can signal completion.
- nop_act.RunIteration();
- action_queue_.Tick();
+ // The actor takes no time, but running for a second is the best way to get it
+ // to go.
+ event_loop_factory_.RunFor(chrono::seconds(1));
+
+ action_queue.Tick();
// Make sure it stopped.
- EXPECT_FALSE(action_queue_.Running());
-}
-
-// Tests that cancelling an action before the message confirming it started is
-// received works.
-// Situations like this used to lock the action queue up waiting for an action
-// to report that it successfully cancelled.
-// This situation is kind of a race condition, but it happens very consistently
-// when hitting buttons while the robot is in teleop-disabled. To hit the race
-// condition consistently in the test, there are a couple of Events inserted in
-// between various things running.
-TEST_F(ActionTest, CancelBeforeStart) {
- Event thread_ready, ready_to_start, ready_to_stop;
- ::std::thread action_thread(
- [this, &thread_ready, &ready_to_start, &ready_to_stop]() {
- TestActorNOP nop_act(&actions::test_action);
- nop_act.Initialize();
- thread_ready.Set();
- ready_to_start.Wait();
- nop_act.WaitForActionRequest();
- LOG(DEBUG, "got a request to run\n");
- const uint32_t running_id = nop_act.RunIteration();
- LOG(DEBUG, "waiting for %" PRIx32 " to be stopped\n", running_id);
- ready_to_stop.Set();
- nop_act.WaitForStop(running_id);
- });
-
- action_queue_.CancelAllActions();
- EXPECT_FALSE(action_queue_.Running());
- thread_ready.Wait();
- LOG(DEBUG, "starting action\n");
- action_queue_.EnqueueAction(MakeTestActionNOP());
- action_queue_.Tick();
- action_queue_.CancelAllActions();
- ready_to_start.Set();
- LOG(DEBUG, "started action\n");
- EXPECT_TRUE(action_queue_.Running());
- ready_to_stop.Wait();
- EXPECT_TRUE(action_queue_.Running());
- LOG(DEBUG, "action is ready to stop\n");
-
- action_queue_.Tick();
- action_queue_.CancelAllActions();
- EXPECT_FALSE(action_queue_.Running());
- action_queue_.Tick();
- action_queue_.CancelAllActions();
- ASSERT_FALSE(action_queue_.Running());
- action_thread.join();
-
- action_queue_.Tick();
- action_queue_.CancelAllActions();
- ASSERT_FALSE(action_queue_.Running());
+ EXPECT_FALSE(action_queue.Running());
}
} // namespace testing
diff --git a/aos/actions/actions.h b/aos/actions/actions.h
index 55dcd26..430a978 100644
--- a/aos/actions/actions.h
+++ b/aos/actions/actions.h
@@ -9,9 +9,10 @@
#include <atomic>
#include <memory>
+#include "aos/events/event-loop.h"
#include "aos/logging/logging.h"
-#include "aos/queue.h"
#include "aos/logging/queue_logging.h"
+#include "aos/queue.h"
namespace aos {
namespace common {
@@ -69,12 +70,9 @@
// Starts the action.
void Start() { DoStart(); }
- // Waits until the action has finished.
- void WaitUntilDone() { DoWaitUntilDone(); }
-
// Run all the checks for one iteration of waiting. Will return true when the
// action has completed, successfully or not. This is non-blocking.
- bool CheckIteration() { return DoCheckIteration(false); }
+ bool CheckIteration() { return DoCheckIteration(); }
// Retrieves the internal state of the action for testing.
// See comments on the private members of TypedAction<T, S> for details.
@@ -93,10 +91,8 @@
virtual bool DoRunning() = 0;
// Starts the action if a goal has been created.
virtual void DoStart() = 0;
- // Blocks until complete.
- virtual void DoWaitUntilDone() = 0;
// Updates status for one cycle of waiting
- virtual bool DoCheckIteration(bool blocking) = 0;
+ virtual bool DoCheckIteration() = 0;
// For testing we will need to get the internal state.
// See comments on the private members of TypedAction<T, S> for details.
virtual void DoGetState(bool* has_started, bool* sent_started,
@@ -111,12 +107,17 @@
// A convenient way to refer to the type of our goals.
typedef typename std::remove_reference<decltype(
*(static_cast<T*>(nullptr)->goal.MakeMessage().get()))>::type GoalType;
+ typedef typename std::remove_reference<decltype(
+ *(static_cast<T*>(nullptr)->status.MakeMessage().get()))>::type StatusType;
typedef typename std::remove_reference<
decltype(static_cast<GoalType*>(nullptr)->params)>::type ParamType;
- TypedAction(T* queue_group, const ParamType ¶ms)
- : queue_group_(queue_group),
- goal_(queue_group_->goal.MakeMessage()),
+ TypedAction(typename ::aos::Fetcher<StatusType> *status_fetcher,
+ typename ::aos::Sender<GoalType> *goal_sender,
+ const ParamType ¶ms)
+ : status_fetcher_(status_fetcher),
+ goal_sender_(goal_sender),
+ goal_(goal_sender_->MakeMessage()),
// This adds 1 to the counter (atomically because it's potentially
// shared across threads) and then bitwise-ORs the bottom of the PID to
// differentiate it from other processes's values (ie a unique id).
@@ -124,11 +125,11 @@
((getpid() & 0xFFFF) << 16)),
params_(params) {
LOG(DEBUG, "Action %" PRIx32 " created on queue %s\n", run_value_,
- queue_group_->goal.name());
+ goal_sender_->name());
// Clear out any old status messages from before now.
- queue_group_->status.FetchLatest();
- if (queue_group_->status.get()) {
- LOG_STRUCT(DEBUG, "have status", *queue_group_->status);
+ status_fetcher_->Fetch();
+ if (status_fetcher_->get()) {
+ LOG_STRUCT(DEBUG, "have status", *status_fetcher_->get());
}
}
@@ -137,17 +138,12 @@
DoCancel();
}
- // Returns the current goal that will be sent when the action is sent.
- GoalType* GetGoal() { return goal_.get(); }
-
private:
void DoCancel() override;
bool DoRunning() override;
- void DoWaitUntilDone() override;
-
- bool DoCheckIteration(bool blocking) override;
+ bool DoCheckIteration() override;
// Sets the started flag (also possibly the interrupted flag).
void CheckStarted();
@@ -168,8 +164,9 @@
if (old_run_value != nullptr) *old_run_value = old_run_value_;
}
- T* const queue_group_;
- ::aos::ScopedMessagePtr<GoalType> goal_;
+ typename ::aos::Fetcher<StatusType> *status_fetcher_;
+ typename ::aos::Sender<GoalType> *goal_sender_;
+ typename ::aos::Sender<GoalType>::Message goal_;
// Track if we have seen a response to the start message.
bool has_started_ = false;
@@ -197,26 +194,62 @@
};
template <typename T>
+class TypedActionFactory {
+ public:
+ typedef typename std::remove_reference<decltype(
+ *(static_cast<T*>(nullptr)->goal.MakeMessage().get()))>::type GoalType;
+ typedef typename std::remove_reference<decltype(
+ *(static_cast<T*>(nullptr)->status.MakeMessage().get()))>::type StatusType;
+ typedef typename std::remove_reference<decltype(
+ static_cast<GoalType *>(nullptr)->params)>::type ParamType;
+
+ explicit TypedActionFactory(::aos::EventLoop *event_loop,
+ const ::std::string &name)
+ : name_(name),
+ status_fetcher_(event_loop->MakeFetcher<StatusType>(name + ".status")),
+ goal_sender_(event_loop->MakeSender<GoalType>(name + ".goal")) {}
+
+ ::std::unique_ptr<TypedAction<T>> Make(const ParamType ¶m) {
+ return ::std::unique_ptr<TypedAction<T>>(
+ new TypedAction<T>(&status_fetcher_, &goal_sender_, param));
+ }
+
+ TypedActionFactory(TypedActionFactory &&other)
+ : name_(::std::move(other.name_)),
+ status_fetcher_(::std::move(other.status_fetcher_)),
+ goal_sender_(::std::move(other.goal_sender_)) {}
+
+ private:
+ const ::std::string name_;
+ typename ::aos::Fetcher<StatusType> status_fetcher_;
+ typename ::aos::Sender<GoalType> goal_sender_;
+};
+
+template <typename T>
::std::atomic<uint16_t> TypedAction<T>::run_counter_{0};
template <typename T>
void TypedAction<T>::DoCancel() {
if (!sent_started_) {
LOG(INFO, "Action %" PRIx32 " on queue %s was never started\n", run_value_,
- queue_group_->goal.name());
+ goal_sender_->name());
} else {
if (interrupted_) {
LOG(INFO,
"Action %" PRIx32 " on queue %s was interrupted -> not cancelling\n",
- run_value_, queue_group_->goal.name());
+ run_value_, goal_sender_->name());
} else {
if (sent_cancel_) {
LOG(DEBUG, "Action %" PRIx32 " on queue %s already cancelled\n",
- run_value_, queue_group_->goal.name());
+ run_value_, goal_sender_->name());
} else {
LOG(DEBUG, "Canceling action %" PRIx32 " on queue %s\n", run_value_,
- queue_group_->goal.name());
- queue_group_->goal.MakeWithBuilder().run(0).Send();
+ goal_sender_->name());
+ {
+ auto goal_message = goal_sender_->MakeMessage();
+ goal_message->run = 0;
+ goal_message.Send();
+ }
sent_cancel_ = true;
}
}
@@ -230,11 +263,11 @@
return false;
}
if (has_started_) {
- queue_group_->status.FetchNext();
+ status_fetcher_->FetchNext();
CheckInterrupted();
} else {
- while (queue_group_->status.FetchNext()) {
- LOG_STRUCT(DEBUG, "got status", *queue_group_->status);
+ while (status_fetcher_->FetchNext()) {
+ LOG_STRUCT(DEBUG, "got status", *status_fetcher_->get());
CheckStarted();
if (has_started_) CheckInterrupted();
}
@@ -243,41 +276,23 @@
// We've asked it to start but haven't gotten confirmation that it's started
// yet.
if (!has_started_) return true;
- return queue_group_->status.get() &&
- queue_group_->status->running == run_value_;
+ return status_fetcher_->get() &&
+ status_fetcher_->get()->running == run_value_;
}
template <typename T>
-void TypedAction<T>::DoWaitUntilDone() {
- CHECK(sent_started_);
- queue_group_->status.FetchNext();
- LOG_STRUCT(DEBUG, "got status", *queue_group_->status);
- CheckInterrupted();
- while (true) {
- if (DoCheckIteration(true)) {
- return;
- }
- }
-}
-
-template <typename T>
-bool TypedAction<T>::DoCheckIteration(bool blocking) {
+bool TypedAction<T>::DoCheckIteration() {
CHECK(sent_started_);
if (interrupted_) return true;
CheckStarted();
- if (blocking) {
- queue_group_->status.FetchNextBlocking();
- LOG_STRUCT(DEBUG, "got status", *queue_group_->status);
- } else {
- if (!queue_group_->status.FetchNext()) {
- return false;
- }
- LOG_STRUCT(DEBUG, "got status", *queue_group_->status);
+ if (!status_fetcher_->FetchNext()) {
+ return false;
}
+ LOG_STRUCT(DEBUG, "got status", *status_fetcher_->get());
CheckStarted();
CheckInterrupted();
- if (has_started_ && (queue_group_->status.get() &&
- queue_group_->status->running != run_value_)) {
+ if (has_started_ && (status_fetcher_->get() &&
+ status_fetcher_->get()->running != run_value_)) {
return true;
}
return false;
@@ -286,21 +301,21 @@
template <typename T>
void TypedAction<T>::CheckStarted() {
if (has_started_) return;
- if (queue_group_->status.get()) {
- if (queue_group_->status->running == run_value_ ||
- (queue_group_->status->running == 0 &&
- queue_group_->status->last_running == run_value_)) {
+ if (status_fetcher_->get()) {
+ if (status_fetcher_->get()->running == run_value_ ||
+ (status_fetcher_->get()->running == 0 &&
+ status_fetcher_->get()->last_running == run_value_)) {
// It's currently running our instance.
has_started_ = true;
LOG(DEBUG, "Action %" PRIx32 " on queue %s has been started\n",
- run_value_, queue_group_->goal.name());
+ run_value_, goal_sender_->name());
} else if (old_run_value_ != 0 &&
- queue_group_->status->running == old_run_value_) {
+ status_fetcher_->get()->running == old_run_value_) {
LOG(DEBUG, "still running old instance %" PRIx32 "\n", old_run_value_);
} else {
LOG(WARNING, "Action %" PRIx32 " on queue %s interrupted by %" PRIx32
" before starting\n",
- run_value_, queue_group_->goal.name(), queue_group_->status->running);
+ run_value_, goal_sender_->name(), status_fetcher_->get()->running);
has_started_ = true;
interrupted_ = true;
}
@@ -311,43 +326,43 @@
template <typename T>
void TypedAction<T>::CheckInterrupted() {
- if (!interrupted_ && has_started_ && queue_group_->status.get()) {
- if (queue_group_->status->running != 0 &&
- queue_group_->status->running != run_value_) {
+ if (!interrupted_ && has_started_ && status_fetcher_->get()) {
+ if (status_fetcher_->get()->running != 0 &&
+ status_fetcher_->get()->running != run_value_) {
LOG(WARNING, "Action %" PRIx32 " on queue %s interrupted by %" PRIx32
" after starting\n",
- run_value_, queue_group_->goal.name(), queue_group_->status->running);
+ run_value_, goal_sender_->name(), status_fetcher_->get()->running);
}
}
}
template <typename T>
void TypedAction<T>::DoStart() {
- if (goal_) {
+ if (!sent_started_) {
LOG(DEBUG, "Starting action %" PRIx32 "\n", run_value_);
goal_->run = run_value_;
goal_->params = params_;
sent_started_ = true;
if (!goal_.Send()) {
LOG(ERROR, "sending goal for action %" PRIx32 " on queue %s failed\n",
- run_value_, queue_group_->goal.name());
+ run_value_, goal_sender_->name());
// Don't wait to see a message with it.
has_started_ = true;
}
- queue_group_->status.FetchNext();
- if (queue_group_->status.get()) {
- LOG_STRUCT(DEBUG, "got status", *queue_group_->status);
+ status_fetcher_->FetchNext();
+ if (status_fetcher_->get()) {
+ LOG_STRUCT(DEBUG, "got status", *status_fetcher_->get());
}
- if (queue_group_->status.get() && queue_group_->status->running != 0) {
- old_run_value_ = queue_group_->status->running;
+ if (status_fetcher_->get() && status_fetcher_->get()->running != 0) {
+ old_run_value_ = status_fetcher_->get()->running;
LOG(INFO, "Action %" PRIx32 " on queue %s already running\n",
- old_run_value_, queue_group_->goal.name());
+ old_run_value_, goal_sender_->name());
} else {
old_run_value_ = 0;
}
} else {
LOG(WARNING, "Action %" PRIx32 " on queue %s already started\n", run_value_,
- queue_group_->goal.name());
+ goal_sender_->name());
}
}
diff --git a/aos/actions/actor.h b/aos/actions/actor.h
index 5c4ebf6..57a2ecb 100644
--- a/aos/actions/actor.h
+++ b/aos/actions/actor.h
@@ -21,46 +21,32 @@
class ActorBase {
public:
typedef typename std::remove_reference<decltype(
- *(static_cast<T*>(nullptr)->goal.MakeMessage().get()))>::type GoalType;
- typedef typename std::remove_reference<
- decltype(static_cast<GoalType*>(nullptr)->params)>::type ParamType;
+ *(static_cast<T *>(nullptr)->goal.MakeMessage().get()))>::type GoalType;
+ typedef typename std::remove_reference<decltype(*(
+ static_cast<T *>(nullptr)->status.MakeMessage().get()))>::type StatusType;
+ typedef typename std::remove_reference<decltype(
+ static_cast<GoalType *>(nullptr)->params)>::type ParamType;
- ActorBase(T* acq) : action_q_(acq) {}
+ ActorBase(::aos::EventLoop *event_loop, const ::std::string &name)
+ : event_loop_(event_loop),
+ status_sender_(event_loop->MakeSender<StatusType>(name + ".status")),
+ goal_fetcher_(event_loop->MakeFetcher<GoalType>(name + ".goal")) {
+ LOG(INFO, "Constructing action %s\n", name.c_str());
+ event_loop->MakeWatcher(name + ".goal",
+ [this](const GoalType &goal) { HandleGoal(goal); });
- // Will return true if finished or asked to cancel.
- // Will return false if it failed accomplish its goal
- // due to a problem with the system.
- virtual bool RunAction(const ParamType& params) = 0;
-
- // Runs action while enabled.
- void Run();
-
- // Gets ready to run actions.
- void Initialize();
-
- // Checks if an action was initially running when the thread started.
- bool CheckInitialRunning();
-
- // Wait here until someone asks us to go.
- void WaitForActionRequest();
-
- // Do work son.
- uint32_t RunIteration();
-
- // Wait for stop is signalled.
- void WaitForStop(uint32_t running_id);
-
- // Will run until the done condition is met or times out.
- // Will return false if successful or end_time is reached and true if the
- // action was canceled or failed.
- // Done condition are defined as functions that return true when done and have
- // some sort of blocking statement (such as FetchNextBlocking) to throttle
- // spin rate.
- // end_time is when to stop and return true. Time(0, 0) (the default) means
- // never time out.
- bool WaitUntil(::std::function<bool(void)> done_condition,
- ::aos::monotonic_clock::time_point end_time =
- ::aos::monotonic_clock::min_time);
+ // Send out an inital status saying we aren't running to wake up any users
+ // who might be waiting forever for the previous action.
+ event_loop->OnRun([this]() {
+ auto status_message = status_sender_.MakeMessage();
+ status_message->running = 0;
+ status_message->last_running = 0;
+ status_message->success = !abort_;
+ if (!status_message.Send()) {
+ LOG(ERROR, "Failed to send the status.\n");
+ }
+ });
+ }
// Waits for a certain amount of time from when this method is called.
// Returns false if the action was canceled or failed, and true if the wait
@@ -73,138 +59,123 @@
phased_loop.SleepUntilNext();
return false;
},
- ::aos::monotonic_clock::now() + duration);
+ event_loop_->monotonic_now() + duration);
}
// Returns true if the action should be canceled.
bool ShouldCancel();
+ enum class State {
+ WAITING_FOR_ACTION,
+ RUNNING_ACTION,
+ WAITING_FOR_STOPPED,
+ };
+
+ // Returns the number of times we have run.
+ int running_count() const { return running_count_; }
+
+ // Returns the current action id being run, or 0 if stopped.
+ uint32_t current_id() const { return current_id_; }
+
protected:
+ // Will run until the done condition is met or times out.
+ // Will return false if successful or end_time is reached and true if the
+ // action was canceled or failed.
+ // Done condition are defined as functions that return true when done and have
+ // some sort of blocking statement to throttle spin rate.
+ // end_time is when to stop and return true. Time(0, 0) (the default) means
+ // never time out.
+ bool WaitUntil(::std::function<bool(void)> done_condition,
+ ::aos::monotonic_clock::time_point end_time =
+ ::aos::monotonic_clock::min_time);
+
// Set to true when we should stop ASAP.
bool abort_ = false;
- // The queue for this action.
- T* action_q_;
+ private:
+ // Checks if an action was initially running when the thread started.
+ bool CheckInitialRunning();
+
+ // Will return true if finished or asked to cancel.
+ // Will return false if it failed accomplish its goal
+ // due to a problem with the system.
+ virtual bool RunAction(const ParamType& params) = 0;
+
+ void HandleGoal(const GoalType &goal);
+
+ ::aos::EventLoop *event_loop_;
+
+ // Number of times we've run.
+ int running_count_ = 0;
+
+ uint32_t current_id_ = 0;
+
+ ::aos::Sender<StatusType> status_sender_;
+ ::aos::Fetcher<GoalType> goal_fetcher_;
+
+ State state_ = State::WAITING_FOR_ACTION;
};
template <class T>
-bool ActorBase<T>::CheckInitialRunning() {
- LOG(DEBUG, "Waiting for input to start\n");
-
- if (action_q_->goal.FetchLatest()) {
- LOG_STRUCT(DEBUG, "goal queue", *action_q_->goal);
- const uint32_t initially_running = action_q_->goal->run;
- if (initially_running != 0) {
- while (action_q_->goal->run == initially_running) {
- LOG(INFO, "run is still %" PRIx32 "\n", initially_running);
- action_q_->goal.FetchNextBlocking();
- LOG_STRUCT(DEBUG, "goal queue", *action_q_->goal);
+void ActorBase<T>::HandleGoal(const GoalType &goal) {
+ LOG_STRUCT(DEBUG, "action goal", goal);
+ switch (state_) {
+ case State::WAITING_FOR_ACTION:
+ if (goal.run) {
+ state_ = State::RUNNING_ACTION;
+ } else {
+ auto status_message = status_sender_.MakeMessage();
+ status_message->running = 0;
+ status_message->last_running = 0;
+ status_message->success = !abort_;
+ if (!status_message.Send()) {
+ LOG(ERROR, "Failed to send the status.\n");
+ }
+ break;
}
- }
- LOG(DEBUG, "Done waiting, goal\n");
- return true;
- }
- LOG(DEBUG, "Done waiting, no goal\n");
- return false;
-}
-
-template <class T>
-void ActorBase<T>::WaitForActionRequest() {
- while (action_q_->goal.get() == nullptr || !action_q_->goal->run) {
- LOG(INFO, "Waiting for an action request.\n");
- action_q_->goal.FetchNextBlocking();
- LOG_STRUCT(DEBUG, "goal queue", *action_q_->goal);
- if (!action_q_->goal->run) {
- if (!action_q_->status.MakeWithBuilder()
- .running(0)
- .last_running(0)
- .success(!abort_)
- .Send()) {
- LOG(ERROR, "Failed to send the status.\n");
+ case State::RUNNING_ACTION: {
+ ++running_count_;
+ const uint32_t running_id = goal.run;
+ current_id_ = running_id;
+ LOG(INFO, "Starting action %" PRIx32 "\n", running_id);
+ {
+ auto status_message = status_sender_.MakeMessage();
+ status_message->running = running_id;
+ status_message->last_running = 0;
+ status_message->success = !abort_;
+ if (!status_message.Send()) {
+ LOG(ERROR, "Failed to send the status.\n");
+ }
}
- }
- }
-}
-template <class T>
-uint32_t ActorBase<T>::RunIteration() {
- CHECK(action_q_->goal.get() != nullptr);
- const uint32_t running_id = action_q_->goal->run;
- LOG(INFO, "Starting action %" PRIx32 "\n", running_id);
- if (!action_q_->status.MakeWithBuilder()
- .running(running_id)
- .last_running(0)
- .success(!abort_)
- .Send()) {
- LOG(ERROR, "Failed to send the status.\n");
- }
- LOG_STRUCT(INFO, "goal", *action_q_->goal);
- abort_ = !RunAction(action_q_->goal->params);
- LOG(INFO, "Done with action %" PRIx32 "\n", running_id);
+ LOG_STRUCT(INFO, "goal", goal);
+ abort_ = !RunAction(goal.params);
+ LOG(INFO, "Done with action %" PRIx32 "\n", running_id);
+ current_id_ = 0u;
- // If we have a new one to run, we shouldn't say we're stopped in between.
- if (action_q_->goal->run == 0 || action_q_->goal->run == running_id) {
- if (!action_q_->status.MakeWithBuilder()
- .running(0)
- .last_running(running_id)
- .success(!abort_)
- .Send()) {
- LOG(ERROR, "Failed to send the status.\n");
- } else {
- LOG(INFO, "Sending Done status %" PRIx32 "\n", running_id);
- }
- } else {
- LOG(INFO, "skipping sending stopped status for %" PRIx32 "\n", running_id);
- }
+ {
+ auto status_message = status_sender_.MakeMessage();
+ status_message->running = 0;
+ status_message->last_running = running_id;
+ status_message->success = !abort_;
- return running_id;
-}
+ if (!status_message.Send()) {
+ LOG(ERROR, "Failed to send the status.\n");
+ } else {
+ LOG(INFO, "Sending Done status %" PRIx32 "\n", running_id);
+ }
+ }
-template <class T>
-void ActorBase<T>::WaitForStop(uint32_t running_id) {
- assert(action_q_->goal.get() != nullptr);
- while (action_q_->goal->run == running_id) {
- LOG(INFO, "Waiting for the action (%" PRIx32 ") to be stopped.\n",
- running_id);
- action_q_->goal.FetchNextBlocking();
- LOG_STRUCT(DEBUG, "goal queue", *action_q_->goal);
- }
-}
-
-template <class T>
-void ActorBase<T>::Run() {
- Initialize();
-
- while (true) {
- // Wait for a request to come in before starting.
- WaitForActionRequest();
-
- LOG_STRUCT(INFO, "running with goal", *action_q_->goal);
-
- // Perform the action once.
- uint32_t running_id = RunIteration();
-
- LOG(INFO, "done running\n");
-
- // Don't start again until asked.
- WaitForStop(running_id);
- LOG(DEBUG, "action %" PRIx32 " was stopped\n", running_id);
- }
-}
-
-template <class T>
-void ActorBase<T>::Initialize() {
- // Make sure the last job is done and we have a signal.
- if (CheckInitialRunning()) {
- LOG(DEBUG, "action %" PRIx32 " was stopped\n", action_q_->goal->run);
- }
-
- if (!action_q_->status.MakeWithBuilder()
- .running(0)
- .last_running(0)
- .success(!abort_)
- .Send()) {
- LOG(ERROR, "Failed to send the status.\n");
+ state_ = State::WAITING_FOR_STOPPED;
+ LOG(INFO, "Waiting for the action (%" PRIx32 ") to be stopped.\n",
+ running_id);
+ } break;
+ case State::WAITING_FOR_STOPPED:
+ if (goal.run == 0) {
+ LOG(INFO, "Action stopped.\n");
+ state_ = State::WAITING_FOR_ACTION;
+ }
+ break;
}
}
@@ -234,10 +205,10 @@
template <class T>
bool ActorBase<T>::ShouldCancel() {
- if (action_q_->goal.FetchNext()) {
- LOG_STRUCT(DEBUG, "goal queue", *action_q_->goal);
+ if (goal_fetcher_.Fetch()) {
+ LOG_STRUCT(DEBUG, "goal queue", *goal_fetcher_);
}
- bool ans = !action_q_->goal->run;
+ bool ans = !goal_fetcher_->run || goal_fetcher_->run != current_id_;
if (ans) {
LOG(INFO, "Time to stop action\n");
}
diff --git a/aos/actions/test_action.q b/aos/actions/test_action.q
index 2ed3b4a..f2d268d 100644
--- a/aos/actions/test_action.q
+++ b/aos/actions/test_action.q
@@ -30,6 +30,3 @@
queue Goal goal;
queue aos.common.actions.Status status;
};
-
-queue_group TestActionQueueGroup test_action;
-queue_group TestAction2QueueGroup test_action2;
diff --git a/aos/input/action_joystick_input.cc b/aos/input/action_joystick_input.cc
index 1679a7d..4c3fef1 100644
--- a/aos/input/action_joystick_input.cc
+++ b/aos/input/action_joystick_input.cc
@@ -50,7 +50,7 @@
void ActionJoystickInput::StartAuto() {
LOG(INFO, "Starting auto mode\n");
action_queue_.EnqueueAction(
- ::frc971::autonomous::MakeAutonomousAction(GetAutonomousMode()));
+ autonomous_action_factory_.Make(GetAutonomousMode()));
auto_action_running_ = true;
}
diff --git a/aos/input/action_joystick_input.h b/aos/input/action_joystick_input.h
index ec7b02c..0160468 100644
--- a/aos/input/action_joystick_input.h
+++ b/aos/input/action_joystick_input.h
@@ -33,8 +33,11 @@
const InputConfig &input_config)
: ::aos::input::JoystickInput(event_loop),
input_config_(input_config),
- drivetrain_input_reader_(DrivetrainInputReader::Make(
- input_type, dt_config)) {}
+ drivetrain_input_reader_(
+ DrivetrainInputReader::Make(input_type, dt_config)),
+ autonomous_action_factory_(
+ ::frc971::autonomous::BaseAutonomousActor::MakeFactory(
+ event_loop)) {}
virtual ~ActionJoystickInput() {}
@@ -78,6 +81,8 @@
bool auto_was_running_ = false;
::std::unique_ptr<DrivetrainInputReader> drivetrain_input_reader_;
::aos::common::actions::ActionQueue action_queue_;
+
+ ::frc971::autonomous::BaseAutonomousActor::Factory autonomous_action_factory_;
};
} // namespace input
diff --git a/aos/logging/implementations.cc b/aos/logging/implementations.cc
index 78b6724..f37f0ca 100644
--- a/aos/logging/implementations.cc
+++ b/aos/logging/implementations.cc
@@ -101,12 +101,14 @@
} // namespace
-void FillInMessageStructure(log_level level,
+void FillInMessageStructure(bool add_to_type_cache, log_level level,
const ::std::string &message_string, size_t size,
const MessageType *type,
const ::std::function<size_t(char *)> &serialize,
LogMessage *message) {
- type_cache::AddShm(type->id);
+ if (add_to_type_cache) {
+ type_cache::AddShm(type->id);
+ }
message->structure.type_id = type->id;
FillInMessageBase(level, message);
@@ -280,8 +282,8 @@
log_level level, const ::std::string &message_string, size_t size,
const MessageType *type, const ::std::function<size_t(char *)> &serialize) {
LogMessage message;
- internal::FillInMessageStructure(level, message_string, size, type, serialize,
- &message);
+ internal::FillInMessageStructure(fill_type_cache(), level, message_string,
+ size, type, serialize, &message);
HandleMessage(message);
}
@@ -403,8 +405,8 @@
size_t size, const MessageType *type,
const ::std::function<size_t(char *)> &serialize) override {
LogMessage *message = GetMessageOrDie();
- internal::FillInMessageStructure(level, message_string, size, type,
- serialize, message);
+ internal::FillInMessageStructure(fill_type_cache(), level, message_string,
+ size, type, serialize, message);
Write(message);
}
diff --git a/aos/logging/interface.h b/aos/logging/interface.h
index fafc9e2..50237fc 100644
--- a/aos/logging/interface.h
+++ b/aos/logging/interface.h
@@ -60,6 +60,8 @@
// logger other than this one available while this is called.
virtual void set_next(LogImplementation *next) { next_ = next; }
+ virtual bool fill_type_cache() { return true; }
+
protected:
// Actually logs the given message. Implementations should somehow create a
// LogMessage and then call internal::FillInMessage.
diff --git a/aos/testing/test_logging.cc b/aos/testing/test_logging.cc
index 34af54d..fa70d47 100644
--- a/aos/testing/test_logging.cc
+++ b/aos/testing/test_logging.cc
@@ -51,6 +51,8 @@
}
}
+ bool fill_type_cache() override { return false; }
+
void PrintMessagesAsTheyComeIn() { print_as_messages_come_in_ = true; }
private: