blob: d2d05fcaf977489932e1f518911b431561e0936d [file] [log] [blame]
#ifndef AOS_COMMON_ACTIONS_ACTOR_H_
#define AOS_COMMON_ACTIONS_ACTOR_H_
#include <stdio.h>
#include <inttypes.h>
#include <chrono>
#include <functional>
#include "aos/common/controls/control_loop.h"
#include "aos/common/logging/logging.h"
#include "aos/common/logging/queue_logging.h"
#include "aos/common/time.h"
#include "aos/common/util/phased_loop.h"
namespace aos {
namespace common {
namespace actions {
template <class T>
class ActorBase {
public:
typedef typename std::remove_reference<decltype(
*(static_cast<T*>(nullptr)->goal.MakeMessage().get()))>::type GoalType;
typedef typename std::remove_reference<
decltype(static_cast<GoalType*>(nullptr)->params)>::type ParamType;
ActorBase(T* acq) : action_q_(acq) {}
// Will return true if finished or asked to cancel.
// Will return false if it failed accomplish its goal
// due to a problem with the system.
virtual bool RunAction(const ParamType& params) = 0;
// Runs action while enabled.
void Run();
// Gets ready to run actions.
void Initialize();
// Checks if an action was initially running when the thread started.
bool CheckInitialRunning();
// Wait here until someone asks us to go.
void WaitForActionRequest();
// Do work son.
uint32_t RunIteration();
// Wait for stop is signalled.
void WaitForStop(uint32_t running_id);
// Will run until the done condition is met or times out.
// Will return false if successful or end_time is reached and true if the
// action was canceled or failed.
// Done condition are defined as functions that return true when done and have
// some sort of blocking statement (such as FetchNextBlocking) to throttle
// spin rate.
// end_time is when to stop and return true. Time(0, 0) (the default) means
// never time out.
bool WaitUntil(::std::function<bool(void)> done_condition,
::aos::monotonic_clock::time_point end_time =
::aos::monotonic_clock::min_time);
// Waits for a certain amount of time from when this method is called.
// Returns false if the action was canceled or failed, and true if the wait
// succeeded.
bool WaitOrCancel(monotonic_clock::duration duration) {
::aos::time::PhasedLoop phased_loop(::aos::controls::kLoopFrequency,
::std::chrono::milliseconds(5) / 2);
return !WaitUntil(
[&phased_loop]() {
phased_loop.SleepUntilNext();
return false;
},
::aos::monotonic_clock::now() + duration);
}
// Returns true if the action should be canceled.
bool ShouldCancel();
protected:
// Set to true when we should stop ASAP.
bool abort_ = false;
// The queue for this action.
T* action_q_;
};
template <class T>
bool ActorBase<T>::CheckInitialRunning() {
LOG(DEBUG, "Waiting for input to start\n");
if (action_q_->goal.FetchLatest()) {
LOG_STRUCT(DEBUG, "goal queue", *action_q_->goal);
const uint32_t initially_running = action_q_->goal->run;
if (initially_running != 0) {
while (action_q_->goal->run == initially_running) {
LOG(INFO, "run is still %" PRIx32 "\n", initially_running);
action_q_->goal.FetchNextBlocking();
LOG_STRUCT(DEBUG, "goal queue", *action_q_->goal);
}
}
LOG(DEBUG, "Done waiting, goal\n");
return true;
}
LOG(DEBUG, "Done waiting, no goal\n");
return false;
}
template <class T>
void ActorBase<T>::WaitForActionRequest() {
while (action_q_->goal.get() == nullptr || !action_q_->goal->run) {
LOG(INFO, "Waiting for an action request.\n");
action_q_->goal.FetchNextBlocking();
LOG_STRUCT(DEBUG, "goal queue", *action_q_->goal);
if (!action_q_->goal->run) {
if (!action_q_->status.MakeWithBuilder()
.running(0)
.last_running(0)
.success(!abort_)
.Send()) {
LOG(ERROR, "Failed to send the status.\n");
}
}
}
}
template <class T>
uint32_t ActorBase<T>::RunIteration() {
CHECK(action_q_->goal.get() != nullptr);
const uint32_t running_id = action_q_->goal->run;
LOG(INFO, "Starting action %" PRIx32 "\n", running_id);
if (!action_q_->status.MakeWithBuilder()
.running(running_id)
.last_running(0)
.success(!abort_)
.Send()) {
LOG(ERROR, "Failed to send the status.\n");
}
LOG_STRUCT(INFO, "goal", *action_q_->goal);
abort_ = !RunAction(action_q_->goal->params);
LOG(INFO, "Done with action %" PRIx32 "\n", running_id);
// If we have a new one to run, we shouldn't say we're stopped in between.
if (action_q_->goal->run == 0 || action_q_->goal->run == running_id) {
if (!action_q_->status.MakeWithBuilder()
.running(0)
.last_running(running_id)
.success(!abort_)
.Send()) {
LOG(ERROR, "Failed to send the status.\n");
} else {
LOG(INFO, "Sending Done status %" PRIx32 "\n", running_id);
}
} else {
LOG(INFO, "skipping sending stopped status for %" PRIx32 "\n", running_id);
}
return running_id;
}
template <class T>
void ActorBase<T>::WaitForStop(uint32_t running_id) {
assert(action_q_->goal.get() != nullptr);
while (action_q_->goal->run == running_id) {
LOG(INFO, "Waiting for the action (%" PRIx32 ") to be stopped.\n",
running_id);
action_q_->goal.FetchNextBlocking();
LOG_STRUCT(DEBUG, "goal queue", *action_q_->goal);
}
}
template <class T>
void ActorBase<T>::Run() {
Initialize();
while (true) {
// Wait for a request to come in before starting.
WaitForActionRequest();
LOG_STRUCT(INFO, "running with goal", *action_q_->goal);
// Perform the action once.
uint32_t running_id = RunIteration();
LOG(INFO, "done running\n");
// Don't start again until asked.
WaitForStop(running_id);
LOG(DEBUG, "action %" PRIx32 " was stopped\n", running_id);
}
}
template <class T>
void ActorBase<T>::Initialize() {
// Make sure the last job is done and we have a signal.
if (CheckInitialRunning()) {
LOG(DEBUG, "action %" PRIx32 " was stopped\n", action_q_->goal->run);
}
if (!action_q_->status.MakeWithBuilder()
.running(0)
.last_running(0)
.success(!abort_)
.Send()) {
LOG(ERROR, "Failed to send the status.\n");
}
}
template <class T>
bool ActorBase<T>::WaitUntil(::std::function<bool(void)> done_condition,
::aos::monotonic_clock::time_point end_time) {
while (!done_condition()) {
if (ShouldCancel() || abort_) {
// Clear abort bit as we have just aborted.
abort_ = false;
return true;
}
if (end_time != ::aos::monotonic_clock::min_time &&
::aos::monotonic_clock::now() >= end_time) {
LOG(DEBUG, "WaitUntil timed out\n");
return false;
}
}
if (ShouldCancel() || abort_) {
// Clear abort bit as we have just aborted.
abort_ = false;
return true;
} else {
return false;
}
}
template <class T>
bool ActorBase<T>::ShouldCancel() {
if (action_q_->goal.FetchNext()) {
LOG_STRUCT(DEBUG, "goal queue", *action_q_->goal);
}
bool ans = !action_q_->goal->run;
if (ans) {
LOG(INFO, "Time to stop action\n");
}
return ans;
}
} // namespace actions
} // namespace common
} // namespace aos
#endif // AOS_COMMON_ACTIONS_ACTOR_H_