blob: 08d94f7e882b839902b956a40386a635ece92284 [file] [log] [blame]
Ben Fredricksond69f38b2015-01-28 20:06:15 -08001#ifndef AOS_COMMON_ACTIONS_ACTIONS_H_
2#define AOS_COMMON_ACTIONS_ACTIONS_H_
3
4#include <inttypes.h>
5#include <sys/types.h>
6#include <unistd.h>
7
8#include <type_traits>
9#include <atomic>
10#include <memory>
11
12#include "aos/common/logging/logging.h"
13#include "aos/common/queue.h"
14
15namespace aos {
16namespace common {
17namespace actions {
18
19class Action;
20
21// A queue which queues Actions, verifies a single action is running, and
22// cancels Actions.
23class ActionQueue {
24 public:
25 // Queues up an action for sending.
26 void EnqueueAction(::std::unique_ptr<Action> action);
27
28 // Cancels the current action, and runs the next one after the current one has
29 // finished and the action queue ticks again.
30 void CancelCurrentAction();
31
32 // Cancels all running actions.
33 void CancelAllActions();
34
35 // Runs the next action when the current one is finished running and handles
36 // periodically updating various other information.
37 void Tick();
38
39 // Returns true if any action is running or could be running.
40 // For a one cycle faster response, call Tick before running this.
41 // Daniel can think of at least one case (current_action_ is cancelled and
42 // there is no next_action_) in which calling this without running Tick()
43 // first would return a wrong answer.
44 bool Running();
45
46 // Retrieves the internal state of the current action for testing.
47 // See comments on the private members of TypedAction<T> for details.
48 bool GetCurrentActionState(bool* has_started, bool* sent_started,
49 bool* sent_cancel, bool* interrupted,
50 uint32_t* run_value, uint32_t* old_run_value);
51
52 // Retrieves the internal state of the next action for testing.
53 // See comments on the private members of TypedAction<T> for details.
54 bool GetNextActionState(bool* has_started, bool* sent_started,
55 bool* sent_cancel, bool* interrupted,
56 uint32_t* run_value, uint32_t* old_run_value);
57
58 private:
59 ::std::unique_ptr<Action> current_action_;
60 ::std::unique_ptr<Action> next_action_;
61};
62
63// The basic interface an ActionQueue can access.
64class Action {
65 public:
66 virtual ~Action() {}
67
68 // Cancels the action.
69 void Cancel() { DoCancel(); }
70 // Returns true if the action is currently running.
71 bool Running() { return DoRunning(); }
72 // Starts the action.
73 void Start() { DoStart(); }
74
75 // Waits until the action has finished.
76 void WaitUntilDone() { DoWaitUntilDone(); }
77
78 // Retrieves the internal state of the action for testing.
79 // See comments on the private members of TypedAction<T> for details.
80 void GetState(bool* has_started, bool* sent_started, bool* sent_cancel,
81 bool* interrupted, uint32_t* run_value,
82 uint32_t* old_run_value) {
83 DoGetState(has_started, sent_started, sent_cancel, interrupted, run_value,
84 old_run_value);
85 }
86
87 private:
88 // Cancels the action.
89 virtual void DoCancel() = 0;
90 // Returns true if the action is running or we don't have an initial response
91 // back from it to signal whether or not it is running.
92 virtual bool DoRunning() = 0;
93 // Starts the action if a goal has been created.
94 virtual void DoStart() = 0;
95 // Blocks until complete.
96 virtual void DoWaitUntilDone() = 0;
97 // For testing we will need to get the internal state.
98 // See comments on the private members of TypedAction<T> for details.
99 virtual void DoGetState(bool* has_started, bool* sent_started,
100 bool* sent_cancel, bool* interrupted,
101 uint32_t* run_value, uint32_t* old_run_value) = 0;
102};
103
104// Templated subclass to hold the type information.
105template <typename T>
106class TypedAction : public Action {
107 public:
108 // A convenient way to refer to the type of our goals.
109 typedef typename std::remove_reference<decltype(
110 *(static_cast<T*>(NULL)->goal.MakeMessage().get()))>::type GoalType;
111
112 TypedAction(T* queue_group)
113 : queue_group_(queue_group),
114 goal_(queue_group_->goal.MakeMessage()),
115 // This adds 1 to the counter (atomically because it's potentially
116 // shared across threads) and then bitwise-ORs the bottom of the PID to
117 // differentiate it from other processes's values (ie a unique id).
118 run_value_(run_counter_.fetch_add(1, ::std::memory_order_relaxed) |
119 ((getpid() & 0xFFFF) << 16)) {
120 LOG(INFO, "Action %" PRIx32 " created on queue %s\n", run_value_,
121 queue_group_->goal.name());
122 // Clear out any old status messages from before now.
123 queue_group_->status.FetchLatest();
124 }
125
126 virtual ~TypedAction() {
127 LOG(DEBUG, "Calling destructor of %" PRIx32 "\n", run_value_);
128 DoCancel();
129 }
130
131 // Returns the current goal that will be sent when the action is sent.
132 GoalType* GetGoal() { return goal_.get(); }
133
134 private:
135 void DoCancel() override;
136
137 bool DoRunning() override;
138
139 void DoWaitUntilDone() override;
140
141 // Sets the started flag (also possibly the interrupted flag).
142 void CheckStarted();
143
144 // Checks for interrupt.
145 void CheckInterrupted();
146
147 void DoStart() override;
148
149 void DoGetState(bool* has_started, bool* sent_started, bool* sent_cancel,
150 bool* interrupted, uint32_t* run_value,
151 uint32_t* old_run_value) override {
152 if (has_started != nullptr) *has_started = has_started_;
153 if (sent_started != nullptr) *sent_started = sent_started_;
154 if (sent_cancel != nullptr) *sent_cancel = sent_cancel_;
155 if (interrupted != nullptr) *interrupted = interrupted_;
156 if (run_value != nullptr) *run_value = run_value_;
157 if (old_run_value != nullptr) *old_run_value = old_run_value_;
158 }
159
160 T* const queue_group_;
161 ::aos::ScopedMessagePtr<GoalType> goal_;
162
163 // Track if we have seen a response to the start message.
164 bool has_started_ = false;
165 // Track if we have sent an initial start message.
166 bool sent_started_ = false;
167
168 bool sent_cancel_ = false;
169
170 // Gets set to true if we ever see somebody else's value in running.
171 bool interrupted_ = false;
172
173 // The value we're going to use for goal.run etc.
174 const uint32_t run_value_;
175
176 // The old value for running that we may have seen. If we see any value other
177 // than this or run_value_, somebody else got in the way and we're done. 0 if
178 // there was nothing there to start with. Only valid after sent_started_
179 // changes to true.
180 uint32_t old_run_value_;
181
182 static ::std::atomic<uint16_t> run_counter_;
183};
184
185template <typename T>
186::std::atomic<uint16_t> TypedAction<T>::run_counter_{0};
187
188template <typename T>
189void TypedAction<T>::DoCancel() {
190 if (!sent_started_) {
191 LOG(INFO, "Action %" PRIx32 " was never started\n", run_value_);
192 } else {
193 if (interrupted_) {
194 LOG(INFO, "Action %" PRIx32 " was interrupted -> not cancelling\n",
195 run_value_);
196 } else {
197 if (sent_cancel_) {
198 LOG(INFO, "Action %" PRIx32 " already cancelled\n", run_value_);
199 } else {
200 LOG(INFO, "Canceling action %" PRIx32 " on queue %s\n", run_value_,
201 queue_group_->goal.name());
202 queue_group_->goal.MakeWithBuilder().run(0).Send();
203 sent_cancel_ = true;
204 }
205 }
206 }
207}
208
209template <typename T>
210bool TypedAction<T>::DoRunning() {
211 if (!sent_started_) return false;
212 if (has_started_) {
213 queue_group_->status.FetchNext();
214 CheckInterrupted();
215 } else if (queue_group_->status.FetchLatest()) {
216 CheckStarted();
217 }
218 if (interrupted_) return false;
219 // We've asked it to start but haven't gotten confirmation that it's started
220 // yet.
221 if (!has_started_) return true;
222 return queue_group_->status.get() &&
223 queue_group_->status->running == run_value_;
224}
225
226template <typename T>
227void TypedAction<T>::DoWaitUntilDone() {
228 CHECK(sent_started_);
229 queue_group_->status.FetchNext();
230 CheckInterrupted();
231 while (true) {
232 if (interrupted_) return;
233 CheckStarted();
234 queue_group_->status.FetchNextBlocking();
235 CheckStarted();
236 CheckInterrupted();
237 if (has_started_ && (queue_group_->status.get() &&
238 queue_group_->status->running != run_value_)) {
239 return;
240 }
241 }
242}
243
244template <typename T>
245void TypedAction<T>::CheckStarted() {
246 if (has_started_) return;
247 if (queue_group_->status.get()) {
248 if (queue_group_->status->running == run_value_ ||
249 (queue_group_->status->running == 0 &&
250 queue_group_->status->last_running == run_value_)) {
251 // It's currently running our instance.
252 has_started_ = true;
253 LOG(DEBUG, "Action %" PRIx32 " has been started\n", run_value_);
254 } else if (queue_group_->status->running == old_run_value_) {
255 // It's still running an old instance (or still doing nothing).
256 } else {
257 LOG(WARNING,
258 "Action %" PRIx32 " interrupted by %" PRIx32 " before starting\n",
259 run_value_, queue_group_->status->running);
260 has_started_ = true;
261 interrupted_ = true;
262 }
263 } else {
264 LOG(WARNING, "No status message recieved.\n");
265 }
266}
267
268template <typename T>
269void TypedAction<T>::CheckInterrupted() {
270 if (!interrupted_ && has_started_ && queue_group_->status.get()) {
271 if (queue_group_->status->running != 0 &&
272 queue_group_->status->running != run_value_) {
273 LOG(WARNING,
274 "Action %" PRIx32 " interrupted by %" PRIx32 " after starting\n",
275 run_value_, queue_group_->status->running);
276 }
277 }
278}
279
280template <typename T>
281void TypedAction<T>::DoStart() {
282 if (goal_) {
283 LOG(INFO, "Starting action %" PRIx32 "\n", run_value_);
284 goal_->run = run_value_;
285 sent_started_ = true;
286 if (!goal_.Send()) {
287 LOG(ERROR, "sending goal for action %" PRIx32 " failed\n", run_value_);
288 // Don't wait to see a message with it.
289 has_started_ = true;
290 }
291 queue_group_->status.FetchLatest();
292 if (queue_group_->status.get() && queue_group_->status->running != 0) {
293 old_run_value_ = queue_group_->status->running;
294 LOG(INFO, "Action %" PRIx32 " already running\n", old_run_value_);
295 } else {
296 old_run_value_ = 0;
297 }
298 } else {
299 LOG(WARNING, "Action %" PRIx32 " already started\n", run_value_);
300 }
301}
302
303} // namespace actions
304} // namespace common
305} // namespace aos
306
307#endif // AOS_COMMON_ACTIONS_ACTIONS_H_