blob: 3cc4d9eb59944c777e2d897a9107087d0ab4db1b [file] [log] [blame]
#ifndef AOS_ACTIONS_ACTOR_H_
#define AOS_ACTIONS_ACTOR_H_
#include <stdio.h>
#include <inttypes.h>
#include <chrono>
#include <functional>
#include "aos/actions/actions_generated.h"
#include "aos/controls/control_loop.h"
#include "aos/logging/logging.h"
#include "aos/time/time.h"
#include "aos/util/phased_loop.h"
namespace aos {
namespace common {
namespace actions {
template <class T>
class ActorBase {
public:
typedef T GoalType;
typedef typename std::remove_reference<decltype(
*static_cast<GoalType *>(nullptr)->params())>::type ParamType;
ActorBase(::aos::EventLoop *event_loop, const ::std::string &name)
: event_loop_(event_loop),
status_sender_(event_loop->MakeSender<Status>(name)),
goal_fetcher_(event_loop->MakeFetcher<GoalType>(name)) {
AOS_LOG(INFO, "Constructing action %s\n", name.c_str());
event_loop->MakeWatcher(name,
[this](const GoalType &goal) { HandleGoal(goal); });
// Send out an inital status saying we aren't running to wake up any users
// who might be waiting forever for the previous action.
event_loop->OnRun([this]() {
auto builder = status_sender_.MakeBuilder();
Status::Builder status_builder = builder.template MakeBuilder<Status>();
status_builder.add_running(0);
status_builder.add_last_running(0);
status_builder.add_success(!abort_);
if (!builder.Send(status_builder.Finish())) {
AOS_LOG(ERROR, "Failed to send the status.\n");
}
});
}
// Waits for a certain amount of time from when this method is called.
// Returns false if the action was canceled or failed, and true if the wait
// succeeded.
bool WaitOrCancel(monotonic_clock::duration duration) {
return !WaitUntil([]() { return false; },
event_loop_->monotonic_now() + duration);
}
// Returns true if the action should be canceled.
bool ShouldCancel();
enum class State {
WAITING_FOR_ACTION,
RUNNING_ACTION,
WAITING_FOR_STOPPED,
};
// Returns the number of times we have run.
int running_count() const { return running_count_; }
// Returns the current action id being run, or 0 if stopped.
uint32_t current_id() const { return current_id_; }
protected:
// Will run until the done condition is met or times out.
// Will return false if successful or end_time is reached and true if the
// action was canceled or failed.
// Done condition are defined as functions that return true when done
// end_time is when to stop and return true. Time(0, 0) (the default) means
// never time out.
// This will be polled at ::aos::controls::kLoopFrequency
bool WaitUntil(::std::function<bool(void)> done_condition,
::aos::monotonic_clock::time_point end_time =
::aos::monotonic_clock::min_time);
// Set to true when we should stop ASAP.
bool abort_ = false;
::aos::EventLoop *event_loop() { return event_loop_; }
::aos::monotonic_clock::time_point monotonic_now() {
return event_loop_->monotonic_now();
}
private:
// Checks if an action was initially running when the thread started.
bool CheckInitialRunning();
// Will return true if finished or asked to cancel.
// Will return false if it failed accomplish its goal
// due to a problem with the system.
virtual bool RunAction(const ParamType *params) = 0;
void HandleGoal(const GoalType &goal);
::aos::EventLoop *event_loop_;
// Number of times we've run.
int running_count_ = 0;
uint32_t current_id_ = 0;
::aos::Sender<Status> status_sender_;
::aos::Fetcher<GoalType> goal_fetcher_;
State state_ = State::WAITING_FOR_ACTION;
};
template <class T>
void ActorBase<T>::HandleGoal(const GoalType &goal) {
VLOG(1) << "action goal " << FlatbufferToJson(&goal);
switch (state_) {
case State::WAITING_FOR_ACTION:
if (goal.run()) {
state_ = State::RUNNING_ACTION;
} else {
auto builder = status_sender_.MakeBuilder();
Status::Builder status_builder = builder.template MakeBuilder<Status>();
status_builder.add_running(0);
status_builder.add_last_running(0);
status_builder.add_success(!abort_);
if (!builder.Send(status_builder.Finish())) {
AOS_LOG(ERROR, "Failed to send the status.\n");
}
break;
}
[[fallthrough]];
case State::RUNNING_ACTION: {
++running_count_;
const uint32_t running_id = goal.run();
current_id_ = running_id;
AOS_LOG(INFO, "Starting action %" PRIx32 "\n", running_id);
{
auto builder = status_sender_.MakeBuilder();
Status::Builder status_builder = builder.template MakeBuilder<Status>();
status_builder.add_running(running_id);
status_builder.add_last_running(0);
status_builder.add_success(!abort_);
if (!builder.Send(status_builder.Finish())) {
AOS_LOG(ERROR, "Failed to send the status.\n");
}
}
VLOG(1) << "goal " << FlatbufferToJson(&goal);
abort_ = !RunAction(goal.params());
AOS_LOG(INFO, "Done with action %" PRIx32 "\n", running_id);
current_id_ = 0u;
{
auto builder = status_sender_.MakeBuilder();
Status::Builder status_builder = builder.template MakeBuilder<Status>();
status_builder.add_running(0);
status_builder.add_last_running(running_id);
status_builder.add_success(!abort_);
if (!builder.Send(status_builder.Finish())) {
AOS_LOG(ERROR, "Failed to send the status.\n");
} else {
AOS_LOG(INFO, "Sending Done status %" PRIx32 "\n", running_id);
}
}
state_ = State::WAITING_FOR_STOPPED;
AOS_LOG(INFO, "Waiting for the action (%" PRIx32 ") to be stopped.\n",
running_id);
} break;
case State::WAITING_FOR_STOPPED:
if (goal.run() == 0) {
AOS_LOG(INFO, "Action stopped.\n");
state_ = State::WAITING_FOR_ACTION;
}
break;
}
}
template <class T>
bool ActorBase<T>::WaitUntil(::std::function<bool(void)> done_condition,
::aos::monotonic_clock::time_point end_time) {
::aos::time::PhasedLoop phased_loop(::aos::controls::kLoopFrequency,
event_loop_->monotonic_now(),
::std::chrono::milliseconds(5) / 2);
while (!done_condition()) {
if (ShouldCancel() || abort_) {
// Clear abort bit as we have just aborted.
abort_ = false;
return true;
}
if (end_time != ::aos::monotonic_clock::min_time &&
::aos::monotonic_clock::now() >= end_time) {
AOS_LOG(DEBUG, "WaitUntil timed out\n");
return false;
}
phased_loop.SleepUntilNext();
}
if (ShouldCancel() || abort_) {
// Clear abort bit as we have just aborted.
abort_ = false;
return true;
} else {
return false;
}
}
template <class T>
bool ActorBase<T>::ShouldCancel() {
if (goal_fetcher_.Fetch()) {
VLOG(1) << "goal queue " << FlatbufferToJson(goal_fetcher_.get());
}
bool ans = !goal_fetcher_->run() || goal_fetcher_->run() != current_id_;
if (ans) {
AOS_LOG(INFO, "Time to stop action\n");
}
return ans;
}
} // namespace actions
} // namespace common
} // namespace aos
#endif // AOS_ACTIONS_ACTOR_H_