blob: 911bee82093931312e3a8bc83c32ebfa66c83176 [file] [log] [blame]
brians343bc112013-02-10 01:53:46 +00001#include <sys/syscall.h>
2#include <errno.h>
3#include <math.h>
4
5namespace aos {
6
7template<class T, class S> AsyncAction<T, S>::AsyncAction(const std::string name) : local_count(0), local_done_status(0), local_pid(0),
8 done_index(0), status_index(0), next_status_count(0){
9 aos_type_sig async_action_status_sig = {sizeof(async_action_status<S>), 1234, 1};
10 status_queue = aos_fetch_queue(name.c_str(), &async_action_status_sig);
11 aos_type_sig async_action_start_sig = {sizeof(async_action_start<T>), 4321, 1};
12 start_queue = aos_fetch_queue(name.c_str(), &async_action_start_sig);
13}
14
15template<class T, class S> uint16_t AsyncAction<T, S>::Start(T &args){
16 CheckStop();
17 // write something to the start queue and increment the count in the status queue
18 async_action_start<S> *start = (async_action_start<S> *)aos_queue_get_msg(start_queue);
19 if(start == NULL)
20 return 0;
21 memcpy(&start->args, &args, sizeof(T));
22 async_action_status<T> *status = (async_action_status<T> *)aos_queue_read_msg(status_queue, 0); // wait until one starts it if didn't already
23 uint16_t r = 0;
24 aos_resource_entity *entity;
25 if(status == NULL)
26 goto err;
27 r = status->count + 1;
28 local_pid = status->pid;
29 start->count = r;
30 entity = status->resource_entity;
31 aos_queue_free_msg(status_queue, status);
32 status = (async_action_status<T> *)aos_queue_get_msg(status_queue);
33 status->count = r;
34 status->done_status = 0;
35 status->pid = local_pid;
36 status->resource_entity = entity; // the resource_entity of the action, not the caller!
37 start->parent = resource_entity; // if it's NULL, it'll get fixed when starting
38 if(aos_queue_write_msg(status_queue, status, OVERRIDE) < 0){
39 goto err;
40 } else {
41 status = NULL;
42 }
43 if(aos_queue_write_msg(start_queue, start, OVERRIDE) < 0){
44 goto err;
45 } else {
46 start = NULL;
47 }
48err:
49 if(start != NULL)
50 aos_queue_free_msg(start_queue, start);
51 if(status != NULL)
52 aos_queue_free_msg(status_queue, status);
53 local_count = r;
54 return r;
55}
56template<class T, class S> void AsyncAction<T, S>::Stop(int32_t count){
57 CheckStop();
58 async_action_status<S> *status = (async_action_status<S> *)aos_queue_read_msg(status_queue, PEEK | NON_BLOCK);
59 if(status == NULL)
60 return;
61 local_pid = status->pid;
62 local_count = status->count;
63 aos_queue_free_msg(status_queue, status);
64 if(local_count != GetCount(count)) // if it's the wrong one
65 return;
66 async_action_start<S> *start = (async_action_start<S> *)aos_queue_read_msg(start_queue, PEEK | NON_BLOCK);
67 if(start == NULL) // if there isn't something in the start queue (aka one running)
68 return;
69 aos_queue_free_msg(start_queue, start);
70 union sigval sival;
71 sival.sival_int = 0;
72 if(sigqueue(local_pid, STOP_SIGNAL, sival) < 0){ // if sending the signal failed
73 fprintf(stderr, "sigqueue STOP_SIGNAL (which is %d) with pid %d failed with errno %d: ", STOP_SIGNAL, local_pid, errno);
74 perror(NULL);
75 }
76}
77
78template<class T, class S> bool AsyncAction<T, S>::IsDone(int32_t count){
79 CheckStop();
80 async_action_status<S> *status = (async_action_status<S> *)aos_queue_read_msg(status_queue, PEEK | NON_BLOCK);
81 // below = there is one running && it's the right one && it's done
82 bool r = status != NULL && status->count == GetCount(count) && ((status->done_status & 0x02) != 0);
83 aos_queue_free_msg(status_queue, status);
84 return r;
85}
86template<class T, class S> uint16_t AsyncAction<T, S>::Join(int32_t count_in){
87 const uint16_t count = GetCount(count_in);
88 if(count == 0){
89 fprintf(stderr, "not joining non-existent run 0\n");
90 return 0;
91 }
92 async_action_status<S> *status = NULL;
93 done_index = 0; // the queue's message numbering might have been reset
94 do {
95 aos_queue_free_msg(status_queue, status);
96 CheckStop();
97 status = (async_action_status<S> *)aos_queue_read_msg_index(status_queue, FROM_END, &done_index);
98 } while(status != NULL && (status->done_status & 0x02) == 0 && (status->count == count));
99 if(status == NULL){
100 fprintf(stderr, "bad news at %s: %d\n", __FILE__, __LINE__);
101 return 0;
102 }
103 aos_queue_free_msg(status_queue, status);
104 return count;
105}
106template<class T, class S> bool AsyncAction<T, S>::GetNextStatus(S &status_out, int32_t count_in){
107 async_action_status<S> *status = NULL;
108start:
109 CheckStop();
110 const uint16_t count = GetCount(count_in);
111 if(count != next_status_count){ // reset the index if another one gets started in case the queue indexes get reset
112 next_status_count = count;
113 status_index = 0;
114 }
115 status = (async_action_status<S> *)aos_queue_read_msg_index(status_queue, FROM_END, &status_index);
116 if(status == NULL)
117 goto start;
118 if(status->count != count){
119 aos_queue_free_msg(status_queue, status);
120 return false;
121 }
122 bool r = (status->done_status & 0x01) != 0;
123 memcpy(&status_out, &status->status, sizeof(S));
124 aos_queue_free_msg(status_queue, status);
125 return r;
126}
127template<class T, class S> bool AsyncAction<T, S>::GetStatus(S &status_out, int32_t count){
128 CheckStop();
129 async_action_status<S> *status = (async_action_status<S> *)aos_queue_read_msg(status_queue, PEEK | NON_BLOCK);
130 if(status == NULL)
131 return false;
132 if(status->count != GetCount(count)){
133 aos_queue_free_msg(status_queue, status);
134 return false;
135 }
136 bool r = (status->done_status & 0x01) != 0;
137 memcpy(&status_out, &status->status, sizeof(S));
138 aos_queue_free_msg(status_queue, status);
139 return r;
140}
141
142template<class T, class S> void AsyncAction<T, S>::PostStatus(S &status_in){
143 CheckStop();
144 async_action_status<S> *status = (async_action_status<S> *)aos_queue_get_msg(status_queue);
145 if(status == NULL)
146 return;
147 memcpy(&local_status, &status_in, sizeof(S));
148 memcpy(&status->status, &status_in, sizeof(S));
149 status->done_status = 0x01;
150 local_done_status = 0x01;
151 status->pid = local_pid;
152 status->resource_entity = resource_entity;
153 if(aos_queue_write_msg(status_queue, status, OVERRIDE) < 0){
154 local_done_status = 0;
155 memset(&local_status, 0x00, sizeof(S));
156 aos_queue_free_msg(status_queue, status);
157 }
158}
159template<class T, class S> template<int (*O)(aos_resource_entity *, aos_resource *)> inline bool AsyncAction<T, S>::ResourceOp(uint16_t resource){
160 CheckStop();
161 if(resource_entity == NULL){
162 fprintf(stderr, "no started AsyncAction detected in this process\n");
163 return false;
164 }
165 switch(O(resource_entity, aos_resource_get(resource))){
166 case 0:
167 break;
168 case 1:
169 return false;
170 case -1:
171 throw resourceexception();
172 }
173 return true;
174}
175template<class T, class S> void AsyncAction<T, S>::RequestResource(uint16_t resource){
176 if(ResourceOp<aos_resource_request>(resource)){
177 resources[resource] = 1;
178 }else{
179 throw resourceexception(); // if we can't get a resource, we just want to stop
180 }
181}
182template<class T, class S> bool AsyncAction<T, S>::TryRequestResource(uint16_t resource){
183 if(ResourceOp<aos_resource_request>(resource)){
184 resources[resource] = 1;
185 return true;
186 }else{
187 return false;
188 }
189}
190template<class T, class S> void AsyncAction<T, S>::ReleaseResource(uint16_t resource){
191 if(ResourceOp<aos_resource_release>(resource)){
192 if(resources.erase(resource) != 1)
193 fprintf(stderr, "warning: value for resource %d wasn't 1\n", resource);
194 }
195}
196
197#if 0
198// from gcc's (used 4.6.2) libjava/include/i386-signal.h
199/* We use kernel_sigaction here because we're calling the kernel
200 directly rather than via glibc. The sigaction structure that the
201 syscall uses is a different shape from the one in userland and not
202 visible to us in a header file so we define it here. */
203extern "C"
204{
205 struct kernel_sigaction
206 {
207 void (*k_sa_sigaction)(int,siginfo_t *,void *);
208 unsigned long k_sa_flags;
209 void (*k_sa_restorer) (void);
210 sigset_t k_sa_mask;
211 };
212}
213#define RESTORE(name, syscall) RESTORE2 (name, syscall)
214#define RESTORE2(name, syscall) \
215asm \
216 ( \
217 ".text\n" \
218 ".byte 0 # Yes, this really is necessary\n" \
219 " .align 16\n" \
220 "__" #name ":\n" \
221 " movl $" #syscall ", %eax\n" \
222 " int $0x80" \
223 );
224/* The return code for realtime-signals. */
225RESTORE (restore_rt, __NR_rt_sigreturn)
226void restore_rt (void) asm ("__restore_rt")
227 __attribute__ ((visibility ("hidden")));
228#define INIT_SIGNAL(signal, sigaction) \
229do \
230 { \
231 struct kernel_sigaction act; \
232 act.k_sa_sigaction = sigaction; \
233 sigemptyset (&act.k_sa_mask); \
234 act.k_sa_flags = SA_SIGINFO|0x4000000; \
235 act.k_sa_restorer = restore_rt; \
236 syscall (SYS_rt_sigaction, signal, &act, NULL, _NSIG / 8); \
237 } \
238while (0)
239/* Unblock a signal. Unless we do this, the signal may only be sent
240 once. */
241static void
242unblock_signal (int signum __attribute__ ((__unused__)))
243{
244 sigset_t sigs;
245
246 sigemptyset (&sigs);
247 sigaddset (&sigs, signum);
248 sigprocmask (SIG_UNBLOCK, &sigs, NULL);
249}
250#endif
251
252template<class T, class S> void AsyncAction<T, S>::sig_action(int signum, siginfo_t *, void *){
253 // TODO really shouldn't be using stdio in here (check before changing)
254 /*unblock_signal(signum);
255 // MAKE_THROW_FRAME(exception)
256 std::exception *exception = new std::exception;
257 throw exception;*/
258 fprintf(stderr, "received signal %d\n", signum);
259 if(signum == STOP_SIGNAL)
260 interrupt |= 0x01;
261 else if(signum == RESOURCE_KILL_SIGNAL)
262 interrupt |= 0x02;
263 else if(signum == SIGINT || signum == SIGTERM)
264 interrupt |= 0x04;
265 else
266 fprintf(stderr, "unknown signal %d\n", signum);
267}
268template<class T, class S> int AsyncAction<T, S>::Run(uint8_t priority) {
269 interrupt = 0;
270 struct sigaction sigact;
271 sigact.sa_sigaction = sig_action;
272 sigact.sa_flags = 0;
273 sigaction(STOP_SIGNAL, &sigact, NULL);
274 sigaction(RESOURCE_KILL_SIGNAL, &sigact, NULL);
275 sigaction(SIGINT, &sigact, NULL);
276 sigaction(SIGTERM, &sigact, NULL); // kill from the command line default
277
278 if(resource_entity != NULL){
279 fprintf(stderr, "resource_entity isn't already null, which means that this is the second Run being called in this process or something (which isn't allowed...)\n");
280 return -1;
281 }
282 resource_entity = aos_resource_entity_create(priority);
283
284 async_action_status<S> *status = (async_action_status<S> *)aos_queue_get_msg(status_queue);
285 if(status == NULL)
286 return -1;
287 //memset(status + offsetof(aync_action_status<S>, status->status), 0x00, sizeof(S));
288 status->count = 1;
289 status->done_status = 0;
290 local_done_status = 0;
291 local_pid = getpid();
292 fprintf(stderr, "local_pid=%d STOP_SIGNAL is currently %d\n", local_pid, STOP_SIGNAL);
293 status->pid = local_pid;
294 status->resource_entity = resource_entity;
295 // put the initial status in
296 if(aos_queue_write_msg(status_queue, status, OVERRIDE) < 0){
297 aos_queue_free_msg(status_queue, status);
298 return -1;
299 }
300
301 // clear out any pending start messages
302 async_action_start<T> *start = (async_action_start<T> *)aos_queue_read_msg(start_queue, NON_BLOCK);
303 aos_queue_free_msg(start_queue, start);
304
305 OnStart();
306
307 T args;
308 uint16_t current_count;
309 while(true){
310 interrupt = 0;
311 // wait for a new start message
312 start = (async_action_start<T> *)aos_queue_read_msg(start_queue, PEEK);
313 if(start == NULL){
314 if(interrupt & 0x04)
315 break;
316 continue;
317 }
318 memcpy(&args, &start->args, sizeof(T));
319 current_count = start->count;
320 if(start->parent == NULL){ // Start isn't getting called from a process with an AsyncAction implementation running in it
321 start->parent = aos_resource_entity_root_get();
322 }
323 aos_resource_entity_set_parent(resource_entity, start->parent);
324 aos_queue_free_msg(start_queue, start);
325 status = (async_action_status<S> *)aos_queue_get_msg(status_queue);
326 if(status == NULL){
327 fprintf(stderr, "%s: %d: could not get a status message to write initial status to\n", __FILE__, __LINE__);
328 continue;
329 }
330 status->done_status = local_done_status = 0;
331 status->pid = local_pid;
332 status->count = current_count;
333 status->resource_entity = resource_entity;
334 if(aos_queue_write_msg(status_queue, status, OVERRIDE) < 0)
335 aos_queue_free_msg(status_queue, status);
336
337 try {
338 DoAction(args);
339 } catch (stopexception &e) {
340 fprintf(stderr, "caught stopexception\n");
341 } catch (resourceexception &e) {
342 fprintf(stderr, "caught resourceexception\n");
343 } catch (...) {
344 fprintf(stderr, "caught another exception... (bad news)\n");
345 }
346
347 start = (async_action_start<T> *)aos_queue_read_msg(start_queue, NON_BLOCK);
348 if(start == NULL){
349 fprintf(stderr, "somebody else consumed the start message (at %s: %d)\n", __FILE__, __LINE__);
350 } else {
351 aos_queue_free_msg(start_queue, start);
352 }
353
354 status = (async_action_status<S> *)aos_queue_get_msg(status_queue);
355 if(status == NULL){
356 fprintf(stderr, "couldn't get a message to write that we're finished in to %s: %d\n", __FILE__, __LINE__);
357 continue;
358 }
359 memcpy(&status->status, &local_status, sizeof(S));
360 status->done_status = local_done_status | 0x02;
361 status->pid = local_pid;
362 status->count = current_count;
363 status->resource_entity = resource_entity;
364 if(aos_queue_write_msg(status_queue, status, OVERRIDE) < 0)
365 aos_queue_free_msg(status_queue, status);
366
367 std::map<uint16_t, uint8_t>::iterator it;
368 for(it = resources.begin(); it != resources.end(); ++it){
369 ReleaseResource(it->first);
370 }
371 if(!resources.empty()){
372 fprintf(stderr, "resources isn't empty after releasing everything in it. clearing it\n");
373 resources.clear();
374 }
375
376 if(interrupt & 0x04)
377 break;
378 }
379 OnEnd();
380 return 0;
381}
382
383template<class T, class S> void AsyncAction<T, S>::Sleep(double seconds){
384 timespec ts;
385 clock_gettime(CLOCK_MONOTONIC, &ts);
386 ts.tv_sec += (time_t)floor(seconds);
387 ts.tv_nsec += (long)((seconds - floor(seconds)) * 1000000000);
388 if(ts.tv_nsec > 1000000000){
389 ts.tv_nsec -= 1000000000;
390 ts.tv_sec += 1;
391 }
392 int rv;
393 do {
394 CheckStop();
395 rv = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &ts, NULL);
396 } while(rv);
397}
398
399template<class T, class S> void AsyncAction<T, S>::DoAction(__attribute__((unused)) T &args){
400 fprintf(stderr, "this (at %s: %d) should never get run\n", __FILE__, __LINE__);
401 *((int *)NULL) = 0;
402}
403
404} // namespace aos
405