blob: 776c945aa62ddf287e69e2340d825aa5672c987e [file] [log] [blame]
Ben Fredricksond69f38b2015-01-28 20:06:15 -08001#ifndef AOS_COMMON_ACTIONS_ACTIONS_H_
2#define AOS_COMMON_ACTIONS_ACTIONS_H_
3
4#include <inttypes.h>
5#include <sys/types.h>
6#include <unistd.h>
7
8#include <type_traits>
9#include <atomic>
10#include <memory>
11
12#include "aos/common/logging/logging.h"
13#include "aos/common/queue.h"
14
15namespace aos {
16namespace common {
17namespace actions {
18
19class Action;
20
21// A queue which queues Actions, verifies a single action is running, and
22// cancels Actions.
23class ActionQueue {
24 public:
25 // Queues up an action for sending.
26 void EnqueueAction(::std::unique_ptr<Action> action);
27
28 // Cancels the current action, and runs the next one after the current one has
29 // finished and the action queue ticks again.
30 void CancelCurrentAction();
31
32 // Cancels all running actions.
33 void CancelAllActions();
34
35 // Runs the next action when the current one is finished running and handles
36 // periodically updating various other information.
37 void Tick();
38
39 // Returns true if any action is running or could be running.
40 // For a one cycle faster response, call Tick before running this.
41 // Daniel can think of at least one case (current_action_ is cancelled and
42 // there is no next_action_) in which calling this without running Tick()
43 // first would return a wrong answer.
44 bool Running();
45
46 // Retrieves the internal state of the current action for testing.
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -080047 // See comments on the private members of TypedAction<T, S> for details.
Ben Fredricksond69f38b2015-01-28 20:06:15 -080048 bool GetCurrentActionState(bool* has_started, bool* sent_started,
49 bool* sent_cancel, bool* interrupted,
50 uint32_t* run_value, uint32_t* old_run_value);
51
52 // Retrieves the internal state of the next action for testing.
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -080053 // See comments on the private members of TypedAction<T, S> for details.
Ben Fredricksond69f38b2015-01-28 20:06:15 -080054 bool GetNextActionState(bool* has_started, bool* sent_started,
55 bool* sent_cancel, bool* interrupted,
56 uint32_t* run_value, uint32_t* old_run_value);
57
58 private:
59 ::std::unique_ptr<Action> current_action_;
60 ::std::unique_ptr<Action> next_action_;
61};
62
63// The basic interface an ActionQueue can access.
64class Action {
65 public:
66 virtual ~Action() {}
67
68 // Cancels the action.
69 void Cancel() { DoCancel(); }
70 // Returns true if the action is currently running.
71 bool Running() { return DoRunning(); }
72 // Starts the action.
73 void Start() { DoStart(); }
74
75 // Waits until the action has finished.
76 void WaitUntilDone() { DoWaitUntilDone(); }
77
Ben Fredrickson32e6c252015-02-21 23:53:38 -080078 // Run all the checks for one iteration of waiting. Will return true when the
79 // action has completed, successfully or not. This is non-blocking.
80 bool CheckIteration() { return DoCheckIteration(false); }
81
Ben Fredricksond69f38b2015-01-28 20:06:15 -080082 // Retrieves the internal state of the action for testing.
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -080083 // See comments on the private members of TypedAction<T, S> for details.
Ben Fredricksond69f38b2015-01-28 20:06:15 -080084 void GetState(bool* has_started, bool* sent_started, bool* sent_cancel,
85 bool* interrupted, uint32_t* run_value,
86 uint32_t* old_run_value) {
87 DoGetState(has_started, sent_started, sent_cancel, interrupted, run_value,
88 old_run_value);
89 }
90
91 private:
92 // Cancels the action.
93 virtual void DoCancel() = 0;
94 // Returns true if the action is running or we don't have an initial response
95 // back from it to signal whether or not it is running.
96 virtual bool DoRunning() = 0;
97 // Starts the action if a goal has been created.
98 virtual void DoStart() = 0;
99 // Blocks until complete.
100 virtual void DoWaitUntilDone() = 0;
Ben Fredrickson32e6c252015-02-21 23:53:38 -0800101 // Updates status for one cycle of waiting
102 virtual bool DoCheckIteration(bool blocking) = 0;
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800103 // For testing we will need to get the internal state.
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -0800104 // See comments on the private members of TypedAction<T, S> for details.
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800105 virtual void DoGetState(bool* has_started, bool* sent_started,
106 bool* sent_cancel, bool* interrupted,
107 uint32_t* run_value, uint32_t* old_run_value) = 0;
108};
109
110// Templated subclass to hold the type information.
111template <typename T>
112class TypedAction : public Action {
113 public:
114 // A convenient way to refer to the type of our goals.
115 typedef typename std::remove_reference<decltype(
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -0800116 *(static_cast<T*>(nullptr)->goal.MakeMessage().get()))>::type GoalType;
117 typedef typename std::remove_reference<
118 decltype(static_cast<GoalType*>(nullptr)->params)>::type ParamType;
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800119
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -0800120 TypedAction(T* queue_group, const ParamType &params)
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800121 : queue_group_(queue_group),
122 goal_(queue_group_->goal.MakeMessage()),
123 // This adds 1 to the counter (atomically because it's potentially
124 // shared across threads) and then bitwise-ORs the bottom of the PID to
125 // differentiate it from other processes's values (ie a unique id).
126 run_value_(run_counter_.fetch_add(1, ::std::memory_order_relaxed) |
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -0800127 ((getpid() & 0xFFFF) << 16)),
128 params_(params) {
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800129 LOG(INFO, "Action %" PRIx32 " created on queue %s\n", run_value_,
130 queue_group_->goal.name());
131 // Clear out any old status messages from before now.
132 queue_group_->status.FetchLatest();
133 }
134
135 virtual ~TypedAction() {
136 LOG(DEBUG, "Calling destructor of %" PRIx32 "\n", run_value_);
137 DoCancel();
138 }
139
140 // Returns the current goal that will be sent when the action is sent.
141 GoalType* GetGoal() { return goal_.get(); }
142
143 private:
144 void DoCancel() override;
145
146 bool DoRunning() override;
147
148 void DoWaitUntilDone() override;
149
Ben Fredrickson32e6c252015-02-21 23:53:38 -0800150 bool DoCheckIteration(bool blocking);
151
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800152 // Sets the started flag (also possibly the interrupted flag).
153 void CheckStarted();
154
155 // Checks for interrupt.
156 void CheckInterrupted();
157
158 void DoStart() override;
159
160 void DoGetState(bool* has_started, bool* sent_started, bool* sent_cancel,
161 bool* interrupted, uint32_t* run_value,
162 uint32_t* old_run_value) override {
163 if (has_started != nullptr) *has_started = has_started_;
164 if (sent_started != nullptr) *sent_started = sent_started_;
165 if (sent_cancel != nullptr) *sent_cancel = sent_cancel_;
166 if (interrupted != nullptr) *interrupted = interrupted_;
167 if (run_value != nullptr) *run_value = run_value_;
168 if (old_run_value != nullptr) *old_run_value = old_run_value_;
169 }
170
171 T* const queue_group_;
172 ::aos::ScopedMessagePtr<GoalType> goal_;
173
174 // Track if we have seen a response to the start message.
175 bool has_started_ = false;
176 // Track if we have sent an initial start message.
177 bool sent_started_ = false;
178
179 bool sent_cancel_ = false;
180
181 // Gets set to true if we ever see somebody else's value in running.
182 bool interrupted_ = false;
183
184 // The value we're going to use for goal.run etc.
185 const uint32_t run_value_;
186
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -0800187 // flag passed to action in order to have differing types
188 const ParamType params_;
189
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800190 // The old value for running that we may have seen. If we see any value other
191 // than this or run_value_, somebody else got in the way and we're done. 0 if
192 // there was nothing there to start with. Only valid after sent_started_
193 // changes to true.
194 uint32_t old_run_value_;
195
196 static ::std::atomic<uint16_t> run_counter_;
197};
198
199template <typename T>
200::std::atomic<uint16_t> TypedAction<T>::run_counter_{0};
201
202template <typename T>
203void TypedAction<T>::DoCancel() {
204 if (!sent_started_) {
205 LOG(INFO, "Action %" PRIx32 " was never started\n", run_value_);
206 } else {
207 if (interrupted_) {
208 LOG(INFO, "Action %" PRIx32 " was interrupted -> not cancelling\n",
209 run_value_);
210 } else {
211 if (sent_cancel_) {
212 LOG(INFO, "Action %" PRIx32 " already cancelled\n", run_value_);
213 } else {
214 LOG(INFO, "Canceling action %" PRIx32 " on queue %s\n", run_value_,
215 queue_group_->goal.name());
216 queue_group_->goal.MakeWithBuilder().run(0).Send();
217 sent_cancel_ = true;
218 }
219 }
220 }
221}
222
223template <typename T>
224bool TypedAction<T>::DoRunning() {
225 if (!sent_started_) return false;
226 if (has_started_) {
227 queue_group_->status.FetchNext();
228 CheckInterrupted();
229 } else if (queue_group_->status.FetchLatest()) {
230 CheckStarted();
231 }
232 if (interrupted_) return false;
233 // We've asked it to start but haven't gotten confirmation that it's started
234 // yet.
235 if (!has_started_) return true;
236 return queue_group_->status.get() &&
237 queue_group_->status->running == run_value_;
238}
239
240template <typename T>
241void TypedAction<T>::DoWaitUntilDone() {
242 CHECK(sent_started_);
243 queue_group_->status.FetchNext();
244 CheckInterrupted();
245 while (true) {
Ben Fredrickson32e6c252015-02-21 23:53:38 -0800246 if (DoCheckIteration(true)) {
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800247 return;
248 }
249 }
250}
251
252template <typename T>
Ben Fredrickson32e6c252015-02-21 23:53:38 -0800253bool TypedAction<T>::DoCheckIteration(bool blocking) {
254 CHECK(sent_started_);
255 if (interrupted_) return true;
256 CheckStarted();
257 if (blocking) {
258 queue_group_->status.FetchAnother();
259 } else {
260 if (!queue_group_->status.FetchNext()) {
261 return false;
262 }
263 }
264 CheckStarted();
265 CheckInterrupted();
266 if (has_started_ && (queue_group_->status.get() &&
267 queue_group_->status->running != run_value_)) {
268 return true;
269 }
270 return false;
271}
272
273template <typename T>
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800274void TypedAction<T>::CheckStarted() {
275 if (has_started_) return;
276 if (queue_group_->status.get()) {
277 if (queue_group_->status->running == run_value_ ||
278 (queue_group_->status->running == 0 &&
279 queue_group_->status->last_running == run_value_)) {
280 // It's currently running our instance.
281 has_started_ = true;
282 LOG(DEBUG, "Action %" PRIx32 " has been started\n", run_value_);
283 } else if (queue_group_->status->running == old_run_value_) {
284 // It's still running an old instance (or still doing nothing).
285 } else {
286 LOG(WARNING,
287 "Action %" PRIx32 " interrupted by %" PRIx32 " before starting\n",
288 run_value_, queue_group_->status->running);
289 has_started_ = true;
290 interrupted_ = true;
291 }
292 } else {
293 LOG(WARNING, "No status message recieved.\n");
294 }
295}
296
297template <typename T>
298void TypedAction<T>::CheckInterrupted() {
299 if (!interrupted_ && has_started_ && queue_group_->status.get()) {
300 if (queue_group_->status->running != 0 &&
301 queue_group_->status->running != run_value_) {
302 LOG(WARNING,
303 "Action %" PRIx32 " interrupted by %" PRIx32 " after starting\n",
304 run_value_, queue_group_->status->running);
305 }
306 }
307}
308
309template <typename T>
310void TypedAction<T>::DoStart() {
311 if (goal_) {
312 LOG(INFO, "Starting action %" PRIx32 "\n", run_value_);
313 goal_->run = run_value_;
Ben Fredrickson9fb2ab12015-02-16 16:42:08 -0800314 goal_->params = params_;
Ben Fredricksond69f38b2015-01-28 20:06:15 -0800315 sent_started_ = true;
316 if (!goal_.Send()) {
317 LOG(ERROR, "sending goal for action %" PRIx32 " failed\n", run_value_);
318 // Don't wait to see a message with it.
319 has_started_ = true;
320 }
321 queue_group_->status.FetchLatest();
322 if (queue_group_->status.get() && queue_group_->status->running != 0) {
323 old_run_value_ = queue_group_->status->running;
324 LOG(INFO, "Action %" PRIx32 " already running\n", old_run_value_);
325 } else {
326 old_run_value_ = 0;
327 }
328 } else {
329 LOG(WARNING, "Action %" PRIx32 " already started\n", run_value_);
330 }
331}
332
333} // namespace actions
334} // namespace common
335} // namespace aos
336
337#endif // AOS_COMMON_ACTIONS_ACTIONS_H_