blob: 07e569e9630bce50ee97b8bbdd30d7a2fb4eb0b7 [file] [log] [blame]
#ifndef AOS_COMMON_ACTIONS_ACTIONS_H_
#define AOS_COMMON_ACTIONS_ACTIONS_H_
#include <inttypes.h>
#include <sys/types.h>
#include <unistd.h>
#include <type_traits>
#include <atomic>
#include <memory>
#include "aos/common/logging/logging.h"
#include "aos/common/queue.h"
#include "aos/common/logging/queue_logging.h"
namespace aos {
namespace common {
namespace actions {
class Action;
// A queue which queues Actions, verifies a single action is running, and
// cancels Actions.
class ActionQueue {
public:
// Queues up an action for sending.
void EnqueueAction(::std::unique_ptr<Action> action);
// Cancels the current action, and runs the next one after the current one has
// finished and the action queue ticks again.
void CancelCurrentAction();
// Cancels all running actions.
void CancelAllActions();
// Runs the next action when the current one is finished running and polls the
// running action state for use by Running.
void Tick();
// Returns true if any action is running or could be running.
bool Running();
// Retrieves the internal state of the current action for testing.
// See comments on the private members of TypedAction<T, S> for details.
bool GetCurrentActionState(bool* has_started, bool* sent_started,
bool* sent_cancel, bool* interrupted,
uint32_t* run_value, uint32_t* old_run_value);
// Retrieves the internal state of the next action for testing.
// See comments on the private members of TypedAction<T, S> for details.
bool GetNextActionState(bool* has_started, bool* sent_started,
bool* sent_cancel, bool* interrupted,
uint32_t* run_value, uint32_t* old_run_value);
private:
::std::unique_ptr<Action> current_action_;
::std::unique_ptr<Action> next_action_;
};
// The basic interface an ActionQueue can access.
class Action {
public:
virtual ~Action() {}
// Cancels the action.
void Cancel() { DoCancel(); }
// Returns true if the action is currently running.
bool Running() { return DoRunning(); }
// Starts the action.
void Start() { DoStart(); }
// Waits until the action has finished.
void WaitUntilDone() { DoWaitUntilDone(); }
// Run all the checks for one iteration of waiting. Will return true when the
// action has completed, successfully or not. This is non-blocking.
bool CheckIteration() { return DoCheckIteration(false); }
// Retrieves the internal state of the action for testing.
// See comments on the private members of TypedAction<T, S> for details.
void GetState(bool* has_started, bool* sent_started, bool* sent_cancel,
bool* interrupted, uint32_t* run_value,
uint32_t* old_run_value) {
DoGetState(has_started, sent_started, sent_cancel, interrupted, run_value,
old_run_value);
}
private:
// Cancels the action.
virtual void DoCancel() = 0;
// Returns true if the action is running or we don't have an initial response
// back from it to signal whether or not it is running.
virtual bool DoRunning() = 0;
// Starts the action if a goal has been created.
virtual void DoStart() = 0;
// Blocks until complete.
virtual void DoWaitUntilDone() = 0;
// Updates status for one cycle of waiting
virtual bool DoCheckIteration(bool blocking) = 0;
// For testing we will need to get the internal state.
// See comments on the private members of TypedAction<T, S> for details.
virtual void DoGetState(bool* has_started, bool* sent_started,
bool* sent_cancel, bool* interrupted,
uint32_t* run_value, uint32_t* old_run_value) = 0;
};
// Templated subclass to hold the type information.
template <typename T>
class TypedAction : public Action {
public:
// A convenient way to refer to the type of our goals.
typedef typename std::remove_reference<decltype(
*(static_cast<T*>(nullptr)->goal.MakeMessage().get()))>::type GoalType;
typedef typename std::remove_reference<
decltype(static_cast<GoalType*>(nullptr)->params)>::type ParamType;
TypedAction(T* queue_group, const ParamType &params)
: queue_group_(queue_group),
goal_(queue_group_->goal.MakeMessage()),
// This adds 1 to the counter (atomically because it's potentially
// shared across threads) and then bitwise-ORs the bottom of the PID to
// differentiate it from other processes's values (ie a unique id).
run_value_(run_counter_.fetch_add(1, ::std::memory_order_relaxed) |
((getpid() & 0xFFFF) << 16)),
params_(params) {
LOG(DEBUG, "Action %" PRIx32 " created on queue %s\n", run_value_,
queue_group_->goal.name());
// Clear out any old status messages from before now.
queue_group_->status.FetchLatest();
if (queue_group_->status.get()) {
LOG_STRUCT(DEBUG, "have status", *queue_group_->status);
}
}
virtual ~TypedAction() {
LOG(DEBUG, "Calling destructor of %" PRIx32 "\n", run_value_);
DoCancel();
}
// Returns the current goal that will be sent when the action is sent.
GoalType* GetGoal() { return goal_.get(); }
private:
void DoCancel() override;
bool DoRunning() override;
void DoWaitUntilDone() override;
bool DoCheckIteration(bool blocking) override;
// Sets the started flag (also possibly the interrupted flag).
void CheckStarted();
// Checks for interrupt.
void CheckInterrupted();
void DoStart() override;
void DoGetState(bool* has_started, bool* sent_started, bool* sent_cancel,
bool* interrupted, uint32_t* run_value,
uint32_t* old_run_value) override {
if (has_started != nullptr) *has_started = has_started_;
if (sent_started != nullptr) *sent_started = sent_started_;
if (sent_cancel != nullptr) *sent_cancel = sent_cancel_;
if (interrupted != nullptr) *interrupted = interrupted_;
if (run_value != nullptr) *run_value = run_value_;
if (old_run_value != nullptr) *old_run_value = old_run_value_;
}
T* const queue_group_;
::aos::ScopedMessagePtr<GoalType> goal_;
// Track if we have seen a response to the start message.
bool has_started_ = false;
// Track if we have sent an initial start message.
bool sent_started_ = false;
bool sent_cancel_ = false;
// Gets set to true if we ever see somebody else's value in running.
bool interrupted_ = false;
// The value we're going to use for goal.run etc.
const uint32_t run_value_;
// flag passed to action in order to have differing types
const ParamType params_;
// The old value for running that we may have seen. If we see any value other
// than this or run_value_, somebody else got in the way and we're done. 0 if
// there was nothing there to start with. Only valid after sent_started_
// changes to true.
uint32_t old_run_value_;
static ::std::atomic<uint16_t> run_counter_;
};
template <typename T>
::std::atomic<uint16_t> TypedAction<T>::run_counter_{0};
template <typename T>
void TypedAction<T>::DoCancel() {
if (!sent_started_) {
LOG(INFO, "Action %" PRIx32 " on queue %s was never started\n", run_value_,
queue_group_->goal.name());
} else {
if (interrupted_) {
LOG(INFO,
"Action %" PRIx32 " on queue %s was interrupted -> not cancelling\n",
run_value_, queue_group_->goal.name());
} else {
if (sent_cancel_) {
LOG(DEBUG, "Action %" PRIx32 " on queue %s already cancelled\n",
run_value_, queue_group_->goal.name());
} else {
LOG(DEBUG, "Canceling action %" PRIx32 " on queue %s\n", run_value_,
queue_group_->goal.name());
queue_group_->goal.MakeWithBuilder().run(0).Send();
sent_cancel_ = true;
}
}
}
}
template <typename T>
bool TypedAction<T>::DoRunning() {
if (!sent_started_) {
LOG(DEBUG, "haven't sent start message yet\n");
return false;
}
if (has_started_) {
queue_group_->status.FetchNext();
CheckInterrupted();
} else {
while (queue_group_->status.FetchNext()) {
LOG_STRUCT(DEBUG, "got status", *queue_group_->status);
CheckStarted();
if (has_started_) CheckInterrupted();
}
}
if (interrupted_) return false;
// We've asked it to start but haven't gotten confirmation that it's started
// yet.
if (!has_started_) return true;
return queue_group_->status.get() &&
queue_group_->status->running == run_value_;
}
template <typename T>
void TypedAction<T>::DoWaitUntilDone() {
CHECK(sent_started_);
queue_group_->status.FetchNext();
LOG_STRUCT(DEBUG, "got status", *queue_group_->status);
CheckInterrupted();
while (true) {
if (DoCheckIteration(true)) {
return;
}
}
}
template <typename T>
bool TypedAction<T>::DoCheckIteration(bool blocking) {
CHECK(sent_started_);
if (interrupted_) return true;
CheckStarted();
if (blocking) {
queue_group_->status.FetchNextBlocking();
LOG_STRUCT(DEBUG, "got status", *queue_group_->status);
} else {
if (!queue_group_->status.FetchNext()) {
return false;
}
LOG_STRUCT(DEBUG, "got status", *queue_group_->status);
}
CheckStarted();
CheckInterrupted();
if (has_started_ && (queue_group_->status.get() &&
queue_group_->status->running != run_value_)) {
return true;
}
return false;
}
template <typename T>
void TypedAction<T>::CheckStarted() {
if (has_started_) return;
if (queue_group_->status.get()) {
if (queue_group_->status->running == run_value_ ||
(queue_group_->status->running == 0 &&
queue_group_->status->last_running == run_value_)) {
// It's currently running our instance.
has_started_ = true;
LOG(DEBUG, "Action %" PRIx32 " on queue %s has been started\n",
run_value_, queue_group_->goal.name());
} else if (old_run_value_ != 0 &&
queue_group_->status->running == old_run_value_) {
LOG(DEBUG, "still running old instance %" PRIx32 "\n", old_run_value_);
} else {
LOG(WARNING, "Action %" PRIx32 " on queue %s interrupted by %" PRIx32
" before starting\n",
run_value_, queue_group_->goal.name(), queue_group_->status->running);
has_started_ = true;
interrupted_ = true;
}
} else {
LOG(WARNING, "No status message recieved.\n");
}
}
template <typename T>
void TypedAction<T>::CheckInterrupted() {
if (!interrupted_ && has_started_ && queue_group_->status.get()) {
if (queue_group_->status->running != 0 &&
queue_group_->status->running != run_value_) {
LOG(WARNING, "Action %" PRIx32 " on queue %s interrupted by %" PRIx32
" after starting\n",
run_value_, queue_group_->goal.name(), queue_group_->status->running);
}
}
}
template <typename T>
void TypedAction<T>::DoStart() {
if (goal_) {
LOG(DEBUG, "Starting action %" PRIx32 "\n", run_value_);
goal_->run = run_value_;
goal_->params = params_;
sent_started_ = true;
if (!goal_.Send()) {
LOG(ERROR, "sending goal for action %" PRIx32 " on queue %s failed\n",
run_value_, queue_group_->goal.name());
// Don't wait to see a message with it.
has_started_ = true;
}
queue_group_->status.FetchNext();
if (queue_group_->status.get()) {
LOG_STRUCT(DEBUG, "got status", *queue_group_->status);
}
if (queue_group_->status.get() && queue_group_->status->running != 0) {
old_run_value_ = queue_group_->status->running;
LOG(INFO, "Action %" PRIx32 " on queue %s already running\n",
old_run_value_, queue_group_->goal.name());
} else {
old_run_value_ = 0;
}
} else {
LOG(WARNING, "Action %" PRIx32 " on queue %s already started\n", run_value_,
queue_group_->goal.name());
}
}
} // namespace actions
} // namespace common
} // namespace aos
#endif // AOS_COMMON_ACTIONS_ACTIONS_H_