Convert aos over to flatbuffers
Everything builds, and all the tests pass. I suspect that some entries
are missing from the config files, but those will be found pretty
quickly on startup.
There is no logging or live introspection of queue messages.
Change-Id: I496ee01ed68f202c7851bed7e8786cee30df29f5
diff --git a/aos/actions/BUILD b/aos/actions/BUILD
index 25cc6e3..fe79a4a 100644
--- a/aos/actions/BUILD
+++ b/aos/actions/BUILD
@@ -1,6 +1,6 @@
package(default_visibility = ["//visibility:public"])
-load("//aos/build:queues.bzl", "queue_library")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
cc_library(
name = "action_lib",
@@ -13,30 +13,22 @@
"actor.h",
],
deps = [
- "//aos:queues",
+ ":actions_fbs",
"//aos/controls:control_loop",
"//aos/logging",
- "//aos/logging:queue_logging",
"//aos/time",
"//aos/util:phased_loop",
],
)
-queue_library(
- name = "action_queue",
- srcs = [
- "actions.q",
- ],
+flatbuffer_cc_library(
+ name = "actions_fbs",
+ srcs = ["actions.fbs"],
)
-queue_library(
- name = "test_action_queue",
- srcs = [
- "test_action.q",
- ],
- deps = [
- ":action_queue",
- ],
+flatbuffer_cc_library(
+ name = "test_action_fbs",
+ srcs = ["test_action.fbs"],
)
cc_test(
@@ -44,15 +36,14 @@
srcs = [
"action_test.cc",
],
+ data = ["action_test_config.json"],
deps = [
":action_lib",
- ":action_queue",
- ":test_action_queue",
+ ":actions_fbs",
+ ":test_action_fbs",
"//aos:event",
- "//aos:queues",
"//aos/events:simulated_event_loop",
"//aos/logging",
- "//aos/logging:queue_logging",
"//aos/testing:googletest",
"//aos/testing:test_shm",
"//aos/time",
diff --git a/aos/actions/action_test.cc b/aos/actions/action_test.cc
index 178b2b0..aa0e6d2 100644
--- a/aos/actions/action_test.cc
+++ b/aos/actions/action_test.cc
@@ -7,11 +7,10 @@
#include "gtest/gtest.h"
#include "aos/actions/actions.h"
-#include "aos/actions/actions.q.h"
+#include "aos/actions/actions_generated.h"
#include "aos/actions/actor.h"
-#include "aos/actions/test_action.q.h"
-#include "aos/events/simulated-event-loop.h"
-#include "aos/queue.h"
+#include "aos/actions/test_action_generated.h"
+#include "aos/events/simulated_event_loop.h"
#include "aos/testing/test_logging.h"
#include "aos/testing/test_shm.h"
@@ -20,24 +19,24 @@
namespace actions {
namespace testing {
-
namespace chrono = ::std::chrono;
class TestActorIndex
- : public aos::common::actions::ActorBase<actions::TestActionQueueGroup> {
+ : public aos::common::actions::ActorBase<actions::TestActionGoal> {
public:
- typedef TypedActionFactory<actions::TestActionQueueGroup> Factory;
+ typedef TypedActionFactory<actions::TestActionGoal> Factory;
explicit TestActorIndex(::aos::EventLoop *event_loop)
- : aos::common::actions::ActorBase<actions::TestActionQueueGroup>(
- event_loop, ".aos.common.actions.test_action") {}
+ : aos::common::actions::ActorBase<actions::TestActionGoal>(
+ event_loop, "/test_action") {}
static Factory MakeFactory(::aos::EventLoop *event_loop) {
- return Factory(event_loop, ".aos.common.actions.test_action");
+ return Factory(event_loop, "/test_action");
}
- bool RunAction(const uint32_t &new_index) override {
- index = new_index;
+ bool RunAction(const UInt *new_index) override {
+ VLOG(1) << "New index " << FlatbufferToJson(new_index);
+ index = new_index->val();
return true;
}
@@ -45,35 +44,35 @@
};
class TestActorNOP
- : public aos::common::actions::ActorBase<actions::TestActionQueueGroup> {
+ : public aos::common::actions::ActorBase<actions::TestActionGoal> {
public:
- typedef TypedActionFactory<actions::TestActionQueueGroup> Factory;
+ typedef TypedActionFactory<actions::TestActionGoal> Factory;
explicit TestActorNOP(::aos::EventLoop *event_loop)
- : actions::ActorBase<actions::TestActionQueueGroup>(
- event_loop, ".aos.common.actions.test_action") {}
+ : actions::ActorBase<actions::TestActionGoal>(
+ event_loop, "/test_action") {}
static Factory MakeFactory(::aos::EventLoop *event_loop) {
- return Factory(event_loop, ".aos.common.actions.test_action");
+ return Factory(event_loop, "/test_action");
}
- bool RunAction(const uint32_t &) override { return true; }
+ bool RunAction(const UInt *) override { return true; }
};
class TestActorShouldCancel
- : public aos::common::actions::ActorBase<actions::TestActionQueueGroup> {
+ : public aos::common::actions::ActorBase<actions::TestActionGoal> {
public:
- typedef TypedActionFactory<actions::TestActionQueueGroup> Factory;
+ typedef TypedActionFactory<actions::TestActionGoal> Factory;
explicit TestActorShouldCancel(::aos::EventLoop *event_loop)
- : aos::common::actions::ActorBase<actions::TestActionQueueGroup>(
- event_loop, ".aos.common.actions.test_action") {}
+ : aos::common::actions::ActorBase<actions::TestActionGoal>(
+ event_loop, "/test_action") {}
static Factory MakeFactory(::aos::EventLoop *event_loop) {
- return Factory(event_loop, ".aos.common.actions.test_action");
+ return Factory(event_loop, "/test_action");
}
- bool RunAction(const uint32_t &) override {
+ bool RunAction(const UInt *) override {
while (!ShouldCancel()) {
AOS_LOG(FATAL, "NOT CANCELED!!\n");
}
@@ -82,37 +81,41 @@
};
class TestActor2Nop
- : public aos::common::actions::ActorBase<actions::TestAction2QueueGroup> {
+ : public aos::common::actions::ActorBase<actions::TestAction2Goal> {
public:
- typedef TypedActionFactory<actions::TestAction2QueueGroup> Factory;
+ typedef TypedActionFactory<actions::TestAction2Goal> Factory;
explicit TestActor2Nop(::aos::EventLoop *event_loop)
- : actions::ActorBase<actions::TestAction2QueueGroup>(
- event_loop, ".aos.common.actions.test_action2") {}
+ : actions::ActorBase<actions::TestAction2Goal>(
+ event_loop, "/test_action2") {}
static Factory MakeFactory(::aos::EventLoop *event_loop) {
- return Factory(event_loop, ".aos.common.actions.test_action2");
+ return Factory(event_loop, "/test_action2");
}
- bool RunAction(const actions::MyParams &) { return true; }
+ bool RunAction(const actions::MyParams *) { return true; }
};
class ActionTest : public ::testing::Test {
protected:
ActionTest()
- : actor1_event_loop_(event_loop_factory_.MakeEventLoop()),
+ : configuration_(
+ configuration::ReadConfig("aos/actions/action_test_config.json")),
+ event_loop_factory_(&configuration_.message()),
+ actor1_event_loop_(event_loop_factory_.MakeEventLoop()),
actor2_event_loop_(event_loop_factory_.MakeEventLoop()),
test_event_loop_(event_loop_factory_.MakeEventLoop()) {
::aos::testing::EnableTestLogging();
}
+ FlatbufferDetachedBuffer<Configuration> configuration_;
+
// Bring up and down Core.
::aos::SimulatedEventLoopFactory event_loop_factory_;
::std::unique_ptr<::aos::EventLoop> actor1_event_loop_;
::std::unique_ptr<::aos::EventLoop> actor2_event_loop_;
::std::unique_ptr<::aos::EventLoop> test_event_loop_;
-
};
// Tests that the the actions exist in a safe state at startup.
@@ -130,11 +133,10 @@
TEST_F(ActionTest, StartWithOldGoal) {
::std::unique_ptr<::aos::EventLoop> test2_event_loop =
event_loop_factory_.MakeEventLoop();
- ::aos::Sender<TestActionQueueGroup::Goal> goal_sender =
- test2_event_loop->MakeSender<TestActionQueueGroup::Goal>(
- ".aos.common.actions.test_action.goal");
- ::aos::Fetcher<Status> status_fetcher = test2_event_loop->MakeFetcher<Status>(
- ".aos.common.actions.test_action.status");
+ ::aos::Sender<TestActionGoal> goal_sender =
+ test2_event_loop->MakeSender<TestActionGoal>("/test_action");
+ ::aos::Fetcher<Status> status_fetcher =
+ test2_event_loop->MakeFetcher<Status>("/test_action");
TestActorIndex::Factory nop_actor_factory =
TestActorNOP::MakeFactory(test_event_loop_.get());
@@ -142,9 +144,14 @@
ActionQueue action_queue;
{
- auto goal_message = goal_sender.MakeMessage();
- goal_message->run = 971;
- ASSERT_TRUE(goal_message.Send());
+ ::aos::Sender<TestActionGoal>::Builder builder =
+ goal_sender.MakeBuilder();
+
+ TestActionGoal::Builder goal_builder =
+ builder.MakeBuilder<TestActionGoal>();
+
+ goal_builder.add_run(971);
+ ASSERT_TRUE(builder.Send(goal_builder.Finish()));
}
TestActorNOP nop_act(actor1_event_loop_.get());
@@ -154,10 +161,14 @@
event_loop_factory_.RunFor(chrono::seconds(1));
ASSERT_TRUE(status_fetcher.Fetch());
- EXPECT_EQ(0u, status_fetcher->running);
- EXPECT_EQ(0u, status_fetcher->last_running);
+ EXPECT_EQ(0u, status_fetcher->running());
+ EXPECT_EQ(0u, status_fetcher->last_running());
- action_queue.EnqueueAction(nop_actor_factory.Make(0));
+ {
+ UIntT uint;
+ uint.val = 0;
+ action_queue.EnqueueAction(nop_actor_factory.Make(uint));
+ }
// We started an action and it should be running.
EXPECT_TRUE(action_queue.Running());
@@ -189,7 +200,11 @@
action_queue.Tick();
EXPECT_FALSE(action_queue.Running());
- action_queue.EnqueueAction(nop_actor_factory.Make(0));
+ {
+ UIntT uint;
+ uint.val = 0;
+ action_queue.EnqueueAction(nop_actor_factory.Make(uint));
+ }
// We started an action and it should be running.
EXPECT_TRUE(action_queue.Running());
@@ -223,8 +238,12 @@
// Enqueue two actions to test both cancel. We can have an action and a next
// action so we want to test that.
- action_queue.EnqueueAction(nop_actor_factory.Make(0));
- action_queue.EnqueueAction(nop_actor_factory.Make(0));
+ {
+ UIntT uint;
+ uint.val = 0;
+ action_queue.EnqueueAction(nop_actor_factory.Make(uint));
+ action_queue.EnqueueAction(nop_actor_factory.Make(uint));
+ }
action_queue.Tick();
@@ -274,7 +293,11 @@
action_queue.Tick();
// Enqueue blocking action.
- action_queue.EnqueueAction(cancel_action_factory.Make(0));
+ {
+ UIntT uint;
+ uint.val = 0;
+ action_queue.EnqueueAction(cancel_action_factory.Make(uint));
+ }
action_queue.Tick();
EXPECT_TRUE(action_queue.Running());
@@ -306,7 +329,11 @@
EXPECT_FALSE(action_queue.Running());
// Enqueue action to be canceled.
- action_queue.EnqueueAction(nop_actor_factory.Make(0));
+ {
+ UIntT uint;
+ uint.val = 0;
+ action_queue.EnqueueAction(nop_actor_factory.Make(uint));
+ }
action_queue.Tick();
// Should still be running as the actor could not have signalled.
@@ -325,7 +352,11 @@
ASSERT_NE(0u, nop_actor_id);
// Add the next action which should ensure the first stopped.
- action_queue.EnqueueAction(nop_actor_factory.Make(0));
+ {
+ UIntT uint;
+ uint.val = 0;
+ action_queue.EnqueueAction(nop_actor_factory.Make(uint));
+ }
// id for the second run.
uint32_t nop_actor2_id = 0;
@@ -382,13 +413,17 @@
EXPECT_FALSE(action_queue.Running());
// Enqueue action to post index.
- action_queue.EnqueueAction(test_actor_index_factory.Make(5));
- ::aos::Fetcher<actions::TestActionQueueGroup::Goal> goal_fetcher_ =
- test_event_loop_->MakeFetcher<actions::TestActionQueueGroup::Goal>(
- ".aos.common.actions.test_action.goal");
+ {
+ UIntT uint;
+ uint.val = 5;
+ action_queue.EnqueueAction(test_actor_index_factory.Make(uint));
+ }
+ ::aos::Fetcher<actions::TestActionGoal> goal_fetcher_ =
+ test_event_loop_->MakeFetcher<actions::TestActionGoal>(
+ "/test_action");
ASSERT_TRUE(goal_fetcher_.Fetch());
- EXPECT_EQ(5u, goal_fetcher_->params);
+ EXPECT_EQ(5u, goal_fetcher_->params()->val());
EXPECT_EQ(0u, idx_actor.index);
action_queue.Tick();
@@ -400,9 +435,13 @@
EXPECT_EQ(5u, idx_actor.index);
// Enqueue action to post index.
- action_queue.EnqueueAction(test_actor_index_factory.Make(3));
+ {
+ UIntT uint;
+ uint.val = 3;
+ action_queue.EnqueueAction(test_actor_index_factory.Make(uint));
+ }
ASSERT_TRUE(goal_fetcher_.Fetch());
- EXPECT_EQ(3u, goal_fetcher_->params);
+ EXPECT_EQ(3u, goal_fetcher_->params()->val());
// Run the next action so it can accomplish signal completion.
event_loop_factory_.RunFor(chrono::seconds(1));
@@ -423,7 +462,7 @@
action_queue.Tick();
EXPECT_FALSE(action_queue.Running());
- actions::MyParams p;
+ actions::MyParamsT p;
p.param1 = 5.0;
p.param2 = 7;
diff --git a/aos/actions/action_test_config.json b/aos/actions/action_test_config.json
new file mode 100644
index 0000000..e9b6f16
--- /dev/null
+++ b/aos/actions/action_test_config.json
@@ -0,0 +1,20 @@
+{
+ "channels": [
+ {
+ "name": "/test_action",
+ "type": "aos.common.actions.TestActionGoal"
+ },
+ {
+ "name": "/test_action",
+ "type": "aos.common.actions.Status"
+ },
+ {
+ "name": "/test_action2",
+ "type": "aos.common.actions.TestAction2Goal"
+ },
+ {
+ "name": "/test_action2",
+ "type": "aos.common.actions.Status"
+ }
+ ]
+}
diff --git a/aos/actions/actions.fbs b/aos/actions/actions.fbs
new file mode 100644
index 0000000..0ce41fc
--- /dev/null
+++ b/aos/actions/actions.fbs
@@ -0,0 +1,24 @@
+namespace aos.common.actions;
+
+table Status {
+ // The run value of the instance we're currently running or 0.
+ running:uint;
+ // A run value we were previously running or 0.
+ last_running:uint;
+ // If false the action failed to complete and may be in a bad state,
+ // this is a critical problem not a cancellation.
+ success:bool;
+}
+
+table DoubleParam {
+ val:double;
+}
+
+table Goal {
+ // The unique value to put into status.running while running this instance or
+ // 0 to cancel.
+ run:uint;
+ // Default parameter. The more useful thing to do would be to define your own
+ // goal type to change param to a useful structure.
+ params:DoubleParam;
+}
diff --git a/aos/actions/actions.h b/aos/actions/actions.h
index e2b9ca8..d01b3f5 100644
--- a/aos/actions/actions.h
+++ b/aos/actions/actions.h
@@ -9,10 +9,10 @@
#include <atomic>
#include <memory>
-#include "aos/events/event-loop.h"
+#include "aos/actions/actions_generated.h"
+#include "aos/events/event_loop.h"
+#include "aos/json_to_flatbuffer.h"
#include "aos/logging/logging.h"
-#include "aos/logging/queue_logging.h"
-#include "aos/queue.h"
namespace aos {
namespace common {
@@ -105,31 +105,28 @@
class TypedAction : public Action {
public:
// A convenient way to refer to the type of our goals.
+ typedef T GoalType;
typedef typename std::remove_reference<decltype(
- *(static_cast<T*>(nullptr)->goal.MakeMessage().get()))>::type GoalType;
- typedef typename std::remove_reference<decltype(
- *(static_cast<T*>(nullptr)->status.MakeMessage().get()))>::type StatusType;
- typedef typename std::remove_reference<
- decltype(static_cast<GoalType*>(nullptr)->params)>::type ParamType;
+ *static_cast<GoalType *>(nullptr)->params())>::type ParamType;
- TypedAction(typename ::aos::Fetcher<StatusType> *status_fetcher,
+ TypedAction(typename ::aos::Fetcher<Status> *status_fetcher,
typename ::aos::Sender<GoalType> *goal_sender,
- const ParamType ¶ms)
+ const typename ParamType::NativeTableType ¶ms)
: status_fetcher_(status_fetcher),
goal_sender_(goal_sender),
- goal_(goal_sender_->MakeMessage()),
// This adds 1 to the counter (atomically because it's potentially
// shared across threads) and then bitwise-ORs the bottom of the PID to
// differentiate it from other processes's values (ie a unique id).
run_value_(run_counter_.fetch_add(1, ::std::memory_order_relaxed) |
((getpid() & 0xFFFF) << 16)),
params_(params) {
- AOS_LOG(DEBUG, "Action %" PRIx32 " created on queue %s\n", run_value_,
- goal_sender_->name());
+ AOS_LOG(DEBUG, "Action %" PRIx32 " created on queue %.*s\n", run_value_,
+ static_cast<int>(goal_sender_->name().size()),
+ goal_sender_->name().data());
// Clear out any old status messages from before now.
status_fetcher_->Fetch();
if (status_fetcher_->get()) {
- AOS_LOG_STRUCT(DEBUG, "have status", *status_fetcher_->get());
+ VLOG(1) << "have status" << FlatbufferToJson(status_fetcher_->get());
}
}
@@ -164,9 +161,8 @@
if (old_run_value != nullptr) *old_run_value = old_run_value_;
}
- typename ::aos::Fetcher<StatusType> *status_fetcher_;
+ typename ::aos::Fetcher<Status> *status_fetcher_;
typename ::aos::Sender<GoalType> *goal_sender_;
- typename ::aos::Sender<GoalType>::Message goal_;
// Track if we have seen a response to the start message.
bool has_started_ = false;
@@ -182,7 +178,7 @@
const uint32_t run_value_;
// flag passed to action in order to have differing types
- const ParamType params_;
+ const typename ParamType::NativeTableType params_;
// The old value for running that we may have seen. If we see any value other
// than this or run_value_, somebody else got in the way and we're done. 0 if
@@ -196,20 +192,18 @@
template <typename T>
class TypedActionFactory {
public:
+ typedef T GoalType;
typedef typename std::remove_reference<decltype(
- *(static_cast<T*>(nullptr)->goal.MakeMessage().get()))>::type GoalType;
- typedef typename std::remove_reference<decltype(
- *(static_cast<T*>(nullptr)->status.MakeMessage().get()))>::type StatusType;
- typedef typename std::remove_reference<decltype(
- static_cast<GoalType *>(nullptr)->params)>::type ParamType;
+ *static_cast<GoalType *>(nullptr)->params())>::type ParamType;
explicit TypedActionFactory(::aos::EventLoop *event_loop,
const ::std::string &name)
: name_(name),
- status_fetcher_(event_loop->MakeFetcher<StatusType>(name + ".status")),
- goal_sender_(event_loop->MakeSender<GoalType>(name + ".goal")) {}
+ status_fetcher_(event_loop->MakeFetcher<Status>(name)),
+ goal_sender_(event_loop->MakeSender<GoalType>(name)) {}
- ::std::unique_ptr<TypedAction<T>> Make(const ParamType ¶m) {
+ ::std::unique_ptr<TypedAction<T>> Make(
+ const typename ParamType::NativeTableType ¶m) {
return ::std::unique_ptr<TypedAction<T>>(
new TypedAction<T>(&status_fetcher_, &goal_sender_, param));
}
@@ -221,7 +215,7 @@
private:
const ::std::string name_;
- typename ::aos::Fetcher<StatusType> status_fetcher_;
+ typename ::aos::Fetcher<Status> status_fetcher_;
typename ::aos::Sender<GoalType> goal_sender_;
};
@@ -231,25 +225,31 @@
template <typename T>
void TypedAction<T>::DoCancel() {
if (!sent_started_) {
- AOS_LOG(INFO, "Action %" PRIx32 " on queue %s was never started\n",
- run_value_, goal_sender_->name());
+ AOS_LOG(INFO, "Action %" PRIx32 " on queue %.*s was never started\n",
+ run_value_, static_cast<int>(goal_sender_->name().size()),
+ goal_sender_->name().data());
} else {
if (interrupted_) {
AOS_LOG(INFO,
"Action %" PRIx32
- " on queue %s was interrupted -> not cancelling\n",
- run_value_, goal_sender_->name());
+ " on queue %.*s was interrupted -> not cancelling\n",
+ run_value_, static_cast<int>(goal_sender_->name().size()),
+ goal_sender_->name().data());
} else {
if (sent_cancel_) {
- AOS_LOG(DEBUG, "Action %" PRIx32 " on queue %s already cancelled\n",
- run_value_, goal_sender_->name());
+ AOS_LOG(DEBUG, "Action %" PRIx32 " on queue %.*s already cancelled\n",
+ run_value_, static_cast<int>(goal_sender_->name().size()),
+ goal_sender_->name().data());
} else {
- AOS_LOG(DEBUG, "Canceling action %" PRIx32 " on queue %s\n", run_value_,
- goal_sender_->name());
+ AOS_LOG(DEBUG, "Canceling action %" PRIx32 " on queue %.*s\n",
+ run_value_, static_cast<int>(goal_sender_->name().size()),
+ goal_sender_->name().data());
{
- auto goal_message = goal_sender_->MakeMessage();
- goal_message->run = 0;
- goal_message.Send();
+ auto builder = goal_sender_->MakeBuilder();
+ typename GoalType::Builder goal_builder =
+ builder.template MakeBuilder<GoalType>();
+ goal_builder.add_run(0);
+ builder.Send(goal_builder.Finish());
}
sent_cancel_ = true;
}
@@ -268,7 +268,7 @@
CheckInterrupted();
} else {
while (status_fetcher_->FetchNext()) {
- AOS_LOG_STRUCT(DEBUG, "got status", *status_fetcher_->get());
+ VLOG(1) << "got status" << FlatbufferToJson(status_fetcher_->get());
CheckStarted();
if (has_started_) CheckInterrupted();
}
@@ -278,7 +278,7 @@
// yet.
if (!has_started_) return true;
return status_fetcher_->get() &&
- status_fetcher_->get()->running == run_value_;
+ status_fetcher_->get()->running() == run_value_;
}
template <typename T>
@@ -289,11 +289,11 @@
if (!status_fetcher_->FetchNext()) {
return false;
}
- AOS_LOG_STRUCT(DEBUG, "got status", *status_fetcher_->get());
+ VLOG(1) << "got status" << FlatbufferToJson(status_fetcher_->get());
CheckStarted();
CheckInterrupted();
if (has_started_ && (status_fetcher_->get() &&
- status_fetcher_->get()->running != run_value_)) {
+ status_fetcher_->get()->running() != run_value_)) {
return true;
}
return false;
@@ -303,23 +303,24 @@
void TypedAction<T>::CheckStarted() {
if (has_started_) return;
if (status_fetcher_->get()) {
- if (status_fetcher_->get()->running == run_value_ ||
- (status_fetcher_->get()->running == 0 &&
- status_fetcher_->get()->last_running == run_value_)) {
+ if (status_fetcher_->get()->running() == run_value_ ||
+ (status_fetcher_->get()->running() == 0 &&
+ status_fetcher_->get()->last_running() == run_value_)) {
// It's currently running our instance.
has_started_ = true;
- AOS_LOG(DEBUG, "Action %" PRIx32 " on queue %s has been started\n",
- run_value_, goal_sender_->name());
+ AOS_LOG(DEBUG, "Action %" PRIx32 " on queue %.*s has been started\n",
+ run_value_, static_cast<int>(goal_sender_->name().size()),
+ goal_sender_->name().data());
} else if (old_run_value_ != 0 &&
- status_fetcher_->get()->running == old_run_value_) {
+ status_fetcher_->get()->running() == old_run_value_) {
AOS_LOG(DEBUG, "still running old instance %" PRIx32 "\n",
old_run_value_);
} else {
AOS_LOG(WARNING,
- "Action %" PRIx32 " on queue %s interrupted by %" PRIx32
+ "Action %" PRIx32 " on queue %.*s interrupted by %" PRIx32
" before starting\n",
- run_value_, goal_sender_->name(),
- status_fetcher_->get()->running);
+ run_value_, static_cast<int>(goal_sender_->name().size()),
+ goal_sender_->name().data(), status_fetcher_->get()->running());
has_started_ = true;
interrupted_ = true;
}
@@ -331,13 +332,13 @@
template <typename T>
void TypedAction<T>::CheckInterrupted() {
if (!interrupted_ && has_started_ && status_fetcher_->get()) {
- if (status_fetcher_->get()->running != 0 &&
- status_fetcher_->get()->running != run_value_) {
+ if (status_fetcher_->get()->running() != 0 &&
+ status_fetcher_->get()->running() != run_value_) {
AOS_LOG(WARNING,
- "Action %" PRIx32 " on queue %s interrupted by %" PRIx32
+ "Action %" PRIx32 " on queue %.*s interrupted by %" PRIx32
" after starting\n",
- run_value_, goal_sender_->name(),
- status_fetcher_->get()->running);
+ run_value_, static_cast<int>(goal_sender_->name().size()),
+ goal_sender_->name().data(), status_fetcher_->get()->running());
}
}
}
@@ -346,29 +347,38 @@
void TypedAction<T>::DoStart() {
if (!sent_started_) {
AOS_LOG(DEBUG, "Starting action %" PRIx32 "\n", run_value_);
- goal_->run = run_value_;
- goal_->params = params_;
+ auto builder = goal_sender_->MakeBuilder();
+ auto params_offset = ParamType::Pack(*builder.fbb(), ¶ms_);
+
+ auto goal_builder = builder.template MakeBuilder<GoalType>();
+ goal_builder.add_params(params_offset);
+ goal_builder.add_run(run_value_);
+
sent_started_ = true;
- if (!goal_.Send()) {
- AOS_LOG(ERROR, "sending goal for action %" PRIx32 " on queue %s failed\n",
- run_value_, goal_sender_->name());
+ if (!builder.Send(goal_builder.Finish())) {
+ AOS_LOG(ERROR,
+ "sending goal for action %" PRIx32 " on queue %.*s failed\n",
+ run_value_, static_cast<int>(goal_sender_->name().size()),
+ goal_sender_->name().data());
// Don't wait to see a message with it.
has_started_ = true;
}
status_fetcher_->FetchNext();
if (status_fetcher_->get()) {
- AOS_LOG_STRUCT(DEBUG, "got status", *status_fetcher_->get());
+ VLOG(1) << "got status" << FlatbufferToJson(status_fetcher_->get());
}
- if (status_fetcher_->get() && status_fetcher_->get()->running != 0) {
- old_run_value_ = status_fetcher_->get()->running;
- AOS_LOG(INFO, "Action %" PRIx32 " on queue %s already running\n",
- old_run_value_, goal_sender_->name());
+ if (status_fetcher_->get() && status_fetcher_->get()->running() != 0) {
+ old_run_value_ = status_fetcher_->get()->running();
+ AOS_LOG(INFO, "Action %" PRIx32 " on queue %.*s already running\n",
+ old_run_value_, static_cast<int>(goal_sender_->name().size()),
+ goal_sender_->name().data());
} else {
old_run_value_ = 0;
}
} else {
- AOS_LOG(WARNING, "Action %" PRIx32 " on queue %s already started\n",
- run_value_, goal_sender_->name());
+ AOS_LOG(WARNING, "Action %" PRIx32 " on queue %.*s already started\n",
+ run_value_, static_cast<int>(goal_sender_->name().size()),
+ goal_sender_->name().data());
}
}
diff --git a/aos/actions/actions.q b/aos/actions/actions.q
deleted file mode 100644
index e3046c9..0000000
--- a/aos/actions/actions.q
+++ /dev/null
@@ -1,35 +0,0 @@
-package aos.common.actions;
-
-interface StatusInterface {
- // 0 if the action isn't running or the value from goal.run.
- uint32_t running;
-};
-
-interface GoalInterface {
- // 0 to stop or an arbitrary value to put in status.running.
- uint32_t run;
-};
-
-message Status {
- // The run value of the instance we're currently running or 0.
- uint32_t running;
- // A run value we were previously running or 0.
- uint32_t last_running;
- // If false the action failed to complete and may be in a bad state,
- // this is a critical problem not a cancellation.
- bool success;
-};
-
-message Goal {
- // The unique value to put into status.running while running this instance or
- // 0 to cancel.
- uint32_t run;
- // Default parameter. The more useful thing to do would be to define your own
- // goal type to change param to a useful structure.
- double params;
-};
-
-interface ActionQueueGroup {
- queue Status status;
- queue Goal goal;
-};
diff --git a/aos/actions/actor.h b/aos/actions/actor.h
index a9b75e6..f5629a6 100644
--- a/aos/actions/actor.h
+++ b/aos/actions/actor.h
@@ -7,9 +7,9 @@
#include <chrono>
#include <functional>
+#include "aos/actions/actions_generated.h"
#include "aos/controls/control_loop.h"
#include "aos/logging/logging.h"
-#include "aos/logging/queue_logging.h"
#include "aos/time/time.h"
#include "aos/util/phased_loop.h"
@@ -20,29 +20,27 @@
template <class T>
class ActorBase {
public:
+ typedef T GoalType;
typedef typename std::remove_reference<decltype(
- *(static_cast<T *>(nullptr)->goal.MakeMessage().get()))>::type GoalType;
- typedef typename std::remove_reference<decltype(*(
- static_cast<T *>(nullptr)->status.MakeMessage().get()))>::type StatusType;
- typedef typename std::remove_reference<decltype(
- static_cast<GoalType *>(nullptr)->params)>::type ParamType;
+ *static_cast<GoalType *>(nullptr)->params())>::type ParamType;
ActorBase(::aos::EventLoop *event_loop, const ::std::string &name)
: event_loop_(event_loop),
- status_sender_(event_loop->MakeSender<StatusType>(name + ".status")),
- goal_fetcher_(event_loop->MakeFetcher<GoalType>(name + ".goal")) {
+ status_sender_(event_loop->MakeSender<Status>(name)),
+ goal_fetcher_(event_loop->MakeFetcher<GoalType>(name)) {
AOS_LOG(INFO, "Constructing action %s\n", name.c_str());
- event_loop->MakeWatcher(name + ".goal",
+ event_loop->MakeWatcher(name,
[this](const GoalType &goal) { HandleGoal(goal); });
// Send out an inital status saying we aren't running to wake up any users
// who might be waiting forever for the previous action.
event_loop->OnRun([this]() {
- auto status_message = status_sender_.MakeMessage();
- status_message->running = 0;
- status_message->last_running = 0;
- status_message->success = !abort_;
- if (!status_message.Send()) {
+ auto builder = status_sender_.MakeBuilder();
+ Status::Builder status_builder = builder.template MakeBuilder<Status>();
+ status_builder.add_running(0);
+ status_builder.add_last_running(0);
+ status_builder.add_success(!abort_);
+ if (!builder.Send(status_builder.Finish())) {
AOS_LOG(ERROR, "Failed to send the status.\n");
}
});
@@ -99,7 +97,7 @@
// Will return true if finished or asked to cancel.
// Will return false if it failed accomplish its goal
// due to a problem with the system.
- virtual bool RunAction(const ParamType& params) = 0;
+ virtual bool RunAction(const ParamType *params) = 0;
void HandleGoal(const GoalType &goal);
@@ -110,7 +108,7 @@
uint32_t current_id_ = 0;
- ::aos::Sender<StatusType> status_sender_;
+ ::aos::Sender<Status> status_sender_;
::aos::Fetcher<GoalType> goal_fetcher_;
State state_ = State::WAITING_FOR_ACTION;
@@ -118,48 +116,51 @@
template <class T>
void ActorBase<T>::HandleGoal(const GoalType &goal) {
- AOS_LOG_STRUCT(DEBUG, "action goal", goal);
+ VLOG(1) << "action goal " << FlatbufferToJson(&goal);
switch (state_) {
case State::WAITING_FOR_ACTION:
- if (goal.run) {
+ if (goal.run()) {
state_ = State::RUNNING_ACTION;
} else {
- auto status_message = status_sender_.MakeMessage();
- status_message->running = 0;
- status_message->last_running = 0;
- status_message->success = !abort_;
- if (!status_message.Send()) {
+ auto builder = status_sender_.MakeBuilder();
+ Status::Builder status_builder = builder.template MakeBuilder<Status>();
+ status_builder.add_running(0);
+ status_builder.add_last_running(0);
+ status_builder.add_success(!abort_);
+ if (!builder.Send(status_builder.Finish())) {
AOS_LOG(ERROR, "Failed to send the status.\n");
}
break;
}
case State::RUNNING_ACTION: {
++running_count_;
- const uint32_t running_id = goal.run;
+ const uint32_t running_id = goal.run();
current_id_ = running_id;
AOS_LOG(INFO, "Starting action %" PRIx32 "\n", running_id);
{
- auto status_message = status_sender_.MakeMessage();
- status_message->running = running_id;
- status_message->last_running = 0;
- status_message->success = !abort_;
- if (!status_message.Send()) {
+ auto builder = status_sender_.MakeBuilder();
+ Status::Builder status_builder = builder.template MakeBuilder<Status>();
+ status_builder.add_running(running_id);
+ status_builder.add_last_running(0);
+ status_builder.add_success(!abort_);
+ if (!builder.Send(status_builder.Finish())) {
AOS_LOG(ERROR, "Failed to send the status.\n");
}
}
- AOS_LOG_STRUCT(INFO, "goal", goal);
- abort_ = !RunAction(goal.params);
+ VLOG(1) << "goal " << FlatbufferToJson(&goal);
+ abort_ = !RunAction(goal.params());
AOS_LOG(INFO, "Done with action %" PRIx32 "\n", running_id);
current_id_ = 0u;
{
- auto status_message = status_sender_.MakeMessage();
- status_message->running = 0;
- status_message->last_running = running_id;
- status_message->success = !abort_;
+ auto builder = status_sender_.MakeBuilder();
+ Status::Builder status_builder = builder.template MakeBuilder<Status>();
+ status_builder.add_running(0);
+ status_builder.add_last_running(running_id);
+ status_builder.add_success(!abort_);
- if (!status_message.Send()) {
+ if (!builder.Send(status_builder.Finish())) {
AOS_LOG(ERROR, "Failed to send the status.\n");
} else {
AOS_LOG(INFO, "Sending Done status %" PRIx32 "\n", running_id);
@@ -171,7 +172,7 @@
running_id);
} break;
case State::WAITING_FOR_STOPPED:
- if (goal.run == 0) {
+ if (goal.run() == 0) {
AOS_LOG(INFO, "Action stopped.\n");
state_ = State::WAITING_FOR_ACTION;
}
@@ -211,9 +212,9 @@
template <class T>
bool ActorBase<T>::ShouldCancel() {
if (goal_fetcher_.Fetch()) {
- AOS_LOG_STRUCT(DEBUG, "goal queue", *goal_fetcher_);
+ VLOG(1) << "goal queue " << FlatbufferToJson(goal_fetcher_.get());
}
- bool ans = !goal_fetcher_->run || goal_fetcher_->run != current_id_;
+ bool ans = !goal_fetcher_->run() || goal_fetcher_->run() != current_id_;
if (ans) {
AOS_LOG(INFO, "Time to stop action\n");
}
diff --git a/aos/actions/test_action.fbs b/aos/actions/test_action.fbs
new file mode 100644
index 0000000..f7f8052
--- /dev/null
+++ b/aos/actions/test_action.fbs
@@ -0,0 +1,20 @@
+namespace aos.common.actions;
+
+table UInt {
+ val:uint;
+}
+
+table TestActionGoal {
+ run:uint;
+ params:UInt;
+}
+
+table MyParams {
+ param1:double;
+ param2:int;
+}
+
+table TestAction2Goal {
+ run:uint;
+ params:MyParams;
+}
diff --git a/aos/actions/test_action.q b/aos/actions/test_action.q
deleted file mode 100644
index f2d268d..0000000
--- a/aos/actions/test_action.q
+++ /dev/null
@@ -1,32 +0,0 @@
-package aos.common.actions;
-
-import "aos/actions/actions.q";
-
-queue_group TestActionQueueGroup {
- implements aos.common.actions.ActionQueueGroup;
-
- message Goal {
- uint32_t run;
- uint32_t params;
- };
-
- queue Goal goal;
- queue aos.common.actions.Status status;
-};
-
-struct MyParams {
- double param1;
- int32_t param2;
-};
-
-queue_group TestAction2QueueGroup {
- implements aos.common.actions.ActionQueueGroup;
-
- message Goal {
- uint32_t run;
- MyParams params;
- };
-
- queue Goal goal;
- queue aos.common.actions.Status status;
-};