blob: 15054d2900d91139f6943d52785929ab80003b06 [file] [log] [blame]
Ben Fredricksond69f38b2015-01-28 20:06:15 -08001#ifndef AOS_COMMON_ACTIONS_ACTIONS_H_
2#define AOS_COMMON_ACTIONS_ACTIONS_H_
3
4#include <inttypes.h>
5#include <sys/types.h>
6#include <unistd.h>
7
8#include <type_traits>
9#include <atomic>
10#include <memory>
11
12#include "aos/common/logging/logging.h"
13#include "aos/common/queue.h"
14
15namespace aos {
16namespace common {
17namespace actions {
18
19class Action;
20
21// A queue which queues Actions, verifies a single action is running, and
22// cancels Actions.
23class ActionQueue {
24 public:
25 // Queues up an action for sending.
26 void EnqueueAction(::std::unique_ptr<Action> action);
27
28 // Cancels the current action, and runs the next one after the current one has
29 // finished and the action queue ticks again.
30 void CancelCurrentAction();
31
32 // Cancels all running actions.
33 void CancelAllActions();
34
35 // Runs the next action when the current one is finished running and handles
36 // periodically updating various other information.
37 void Tick();
38
39 // Returns true if any action is running or could be running.
40 // For a one cycle faster response, call Tick before running this.
41 // Daniel can think of at least one case (current_action_ is cancelled and
42 // there is no next_action_) in which calling this without running Tick()
43 // first would return a wrong answer.
44 bool Running();
45
46 // Retrieves the internal state of the current action for testing.
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -080047 // See comments on the private members of TypedAction<T, S> for details.
Ben Fredricksond69f38b2015-01-28 20:06:15 -080048 bool GetCurrentActionState(bool* has_started, bool* sent_started,
49 bool* sent_cancel, bool* interrupted,
50 uint32_t* run_value, uint32_t* old_run_value);
51
52 // Retrieves the internal state of the next action for testing.
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -080053 // See comments on the private members of TypedAction<T, S> for details.
Ben Fredricksond69f38b2015-01-28 20:06:15 -080054 bool GetNextActionState(bool* has_started, bool* sent_started,
55 bool* sent_cancel, bool* interrupted,
56 uint32_t* run_value, uint32_t* old_run_value);
57
58 private:
59 ::std::unique_ptr<Action> current_action_;
60 ::std::unique_ptr<Action> next_action_;
61};
62
63// The basic interface an ActionQueue can access.
64class Action {
65 public:
66 virtual ~Action() {}
67
68 // Cancels the action.
69 void Cancel() { DoCancel(); }
70 // Returns true if the action is currently running.
71 bool Running() { return DoRunning(); }
72 // Starts the action.
73 void Start() { DoStart(); }
74
75 // Waits until the action has finished.
76 void WaitUntilDone() { DoWaitUntilDone(); }
77
78 // Retrieves the internal state of the action for testing.
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -080079 // See comments on the private members of TypedAction<T, S> for details.
Ben Fredricksond69f38b2015-01-28 20:06:15 -080080 void GetState(bool* has_started, bool* sent_started, bool* sent_cancel,
81 bool* interrupted, uint32_t* run_value,
82 uint32_t* old_run_value) {
83 DoGetState(has_started, sent_started, sent_cancel, interrupted, run_value,
84 old_run_value);
85 }
86
87 private:
88 // Cancels the action.
89 virtual void DoCancel() = 0;
90 // Returns true if the action is running or we don't have an initial response
91 // back from it to signal whether or not it is running.
92 virtual bool DoRunning() = 0;
93 // Starts the action if a goal has been created.
94 virtual void DoStart() = 0;
95 // Blocks until complete.
96 virtual void DoWaitUntilDone() = 0;
97 // For testing we will need to get the internal state.
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -080098 // See comments on the private members of TypedAction<T, S> for details.
Ben Fredricksond69f38b2015-01-28 20:06:15 -080099 virtual void DoGetState(bool* has_started, bool* sent_started,
100 bool* sent_cancel, bool* interrupted,
101 uint32_t* run_value, uint32_t* old_run_value) = 0;
102};
103
104// Templated subclass to hold the type information.
105template <typename T>
106class TypedAction : public Action {
107 public:
108 // A convenient way to refer to the type of our goals.
109 typedef typename std::remove_reference<decltype(
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -0800110 *(static_cast<T*>(nullptr)->goal.MakeMessage().get()))>::type GoalType;
111 typedef typename std::remove_reference<
112 decltype(static_cast<GoalType*>(nullptr)->params)>::type ParamType;
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800113
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -0800114 TypedAction(T* queue_group, const ParamType &params)
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800115 : queue_group_(queue_group),
116 goal_(queue_group_->goal.MakeMessage()),
117 // This adds 1 to the counter (atomically because it's potentially
118 // shared across threads) and then bitwise-ORs the bottom of the PID to
119 // differentiate it from other processes's values (ie a unique id).
120 run_value_(run_counter_.fetch_add(1, ::std::memory_order_relaxed) |
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -0800121 ((getpid() & 0xFFFF) << 16)),
122 params_(params) {
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800123 LOG(INFO, "Action %" PRIx32 " created on queue %s\n", run_value_,
124 queue_group_->goal.name());
125 // Clear out any old status messages from before now.
126 queue_group_->status.FetchLatest();
127 }
128
129 virtual ~TypedAction() {
130 LOG(DEBUG, "Calling destructor of %" PRIx32 "\n", run_value_);
131 DoCancel();
132 }
133
134 // Returns the current goal that will be sent when the action is sent.
135 GoalType* GetGoal() { return goal_.get(); }
136
137 private:
138 void DoCancel() override;
139
140 bool DoRunning() override;
141
142 void DoWaitUntilDone() override;
143
144 // Sets the started flag (also possibly the interrupted flag).
145 void CheckStarted();
146
147 // Checks for interrupt.
148 void CheckInterrupted();
149
150 void DoStart() override;
151
152 void DoGetState(bool* has_started, bool* sent_started, bool* sent_cancel,
153 bool* interrupted, uint32_t* run_value,
154 uint32_t* old_run_value) override {
155 if (has_started != nullptr) *has_started = has_started_;
156 if (sent_started != nullptr) *sent_started = sent_started_;
157 if (sent_cancel != nullptr) *sent_cancel = sent_cancel_;
158 if (interrupted != nullptr) *interrupted = interrupted_;
159 if (run_value != nullptr) *run_value = run_value_;
160 if (old_run_value != nullptr) *old_run_value = old_run_value_;
161 }
162
163 T* const queue_group_;
164 ::aos::ScopedMessagePtr<GoalType> goal_;
165
166 // Track if we have seen a response to the start message.
167 bool has_started_ = false;
168 // Track if we have sent an initial start message.
169 bool sent_started_ = false;
170
171 bool sent_cancel_ = false;
172
173 // Gets set to true if we ever see somebody else's value in running.
174 bool interrupted_ = false;
175
176 // The value we're going to use for goal.run etc.
177 const uint32_t run_value_;
178
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -0800179 // flag passed to action in order to have differing types
180 const ParamType params_;
181
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800182 // The old value for running that we may have seen. If we see any value other
183 // than this or run_value_, somebody else got in the way and we're done. 0 if
184 // there was nothing there to start with. Only valid after sent_started_
185 // changes to true.
186 uint32_t old_run_value_;
187
188 static ::std::atomic<uint16_t> run_counter_;
189};
190
191template <typename T>
192::std::atomic<uint16_t> TypedAction<T>::run_counter_{0};
193
194template <typename T>
195void TypedAction<T>::DoCancel() {
196 if (!sent_started_) {
197 LOG(INFO, "Action %" PRIx32 " was never started\n", run_value_);
198 } else {
199 if (interrupted_) {
200 LOG(INFO, "Action %" PRIx32 " was interrupted -> not cancelling\n",
201 run_value_);
202 } else {
203 if (sent_cancel_) {
204 LOG(INFO, "Action %" PRIx32 " already cancelled\n", run_value_);
205 } else {
206 LOG(INFO, "Canceling action %" PRIx32 " on queue %s\n", run_value_,
207 queue_group_->goal.name());
208 queue_group_->goal.MakeWithBuilder().run(0).Send();
209 sent_cancel_ = true;
210 }
211 }
212 }
213}
214
215template <typename T>
216bool TypedAction<T>::DoRunning() {
217 if (!sent_started_) return false;
218 if (has_started_) {
219 queue_group_->status.FetchNext();
220 CheckInterrupted();
221 } else if (queue_group_->status.FetchLatest()) {
222 CheckStarted();
223 }
224 if (interrupted_) return false;
225 // We've asked it to start but haven't gotten confirmation that it's started
226 // yet.
227 if (!has_started_) return true;
228 return queue_group_->status.get() &&
229 queue_group_->status->running == run_value_;
230}
231
232template <typename T>
233void TypedAction<T>::DoWaitUntilDone() {
234 CHECK(sent_started_);
235 queue_group_->status.FetchNext();
236 CheckInterrupted();
237 while (true) {
238 if (interrupted_) return;
239 CheckStarted();
240 queue_group_->status.FetchNextBlocking();
241 CheckStarted();
242 CheckInterrupted();
243 if (has_started_ && (queue_group_->status.get() &&
244 queue_group_->status->running != run_value_)) {
245 return;
246 }
247 }
248}
249
250template <typename T>
251void TypedAction<T>::CheckStarted() {
252 if (has_started_) return;
253 if (queue_group_->status.get()) {
254 if (queue_group_->status->running == run_value_ ||
255 (queue_group_->status->running == 0 &&
256 queue_group_->status->last_running == run_value_)) {
257 // It's currently running our instance.
258 has_started_ = true;
259 LOG(DEBUG, "Action %" PRIx32 " has been started\n", run_value_);
260 } else if (queue_group_->status->running == old_run_value_) {
261 // It's still running an old instance (or still doing nothing).
262 } else {
263 LOG(WARNING,
264 "Action %" PRIx32 " interrupted by %" PRIx32 " before starting\n",
265 run_value_, queue_group_->status->running);
266 has_started_ = true;
267 interrupted_ = true;
268 }
269 } else {
270 LOG(WARNING, "No status message recieved.\n");
271 }
272}
273
274template <typename T>
275void TypedAction<T>::CheckInterrupted() {
276 if (!interrupted_ && has_started_ && queue_group_->status.get()) {
277 if (queue_group_->status->running != 0 &&
278 queue_group_->status->running != run_value_) {
279 LOG(WARNING,
280 "Action %" PRIx32 " interrupted by %" PRIx32 " after starting\n",
281 run_value_, queue_group_->status->running);
282 }
283 }
284}
285
286template <typename T>
287void TypedAction<T>::DoStart() {
288 if (goal_) {
289 LOG(INFO, "Starting action %" PRIx32 "\n", run_value_);
290 goal_->run = run_value_;
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -0800291 goal_->params = params_;
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800292 sent_started_ = true;
293 if (!goal_.Send()) {
294 LOG(ERROR, "sending goal for action %" PRIx32 " failed\n", run_value_);
295 // Don't wait to see a message with it.
296 has_started_ = true;
297 }
298 queue_group_->status.FetchLatest();
299 if (queue_group_->status.get() && queue_group_->status->running != 0) {
300 old_run_value_ = queue_group_->status->running;
301 LOG(INFO, "Action %" PRIx32 " already running\n", old_run_value_);
302 } else {
303 old_run_value_ = 0;
304 }
305 } else {
306 LOG(WARNING, "Action %" PRIx32 " already started\n", run_value_);
307 }
308}
309
310} // namespace actions
311} // namespace common
312} // namespace aos
313
314#endif // AOS_COMMON_ACTIONS_ACTIONS_H_