blob: 2c73397c600107a3c7996a739b11392c1f41eed3 [file] [log] [blame]
Austin Schuh87dc7bb2018-10-29 21:53:23 -07001#include "gflags/gflags.h"
Brian Silvermane4d8b282015-12-24 13:44:48 -08002
Austin Schuhf2a50ba2016-12-24 16:16:26 -08003#include <fcntl.h>
4#include <mqueue.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -08005#include <netinet/in.h>
Brian Silvermanfd788882016-09-10 16:56:20 -04006#include <netinet/tcp.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -08007#include <pthread.h>
8#include <semaphore.h>
9#include <stdint.h>
10#include <sys/eventfd.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -080011#include <sys/msg.h>
12#include <sys/sem.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -080013#include <sys/socket.h>
14#include <sys/stat.h>
15#include <sys/types.h>
16#include <sys/un.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -080017
Austin Schuhf2a50ba2016-12-24 16:16:26 -080018#include <atomic>
19#include <chrono>
Brian Silvermane4d8b282015-12-24 13:44:48 -080020#include <memory>
21#include <string>
Austin Schuhf2a50ba2016-12-24 16:16:26 -080022#include <thread>
Brian Silvermane4d8b282015-12-24 13:44:48 -080023
John Park33858a32018-09-28 23:05:48 -070024#include "aos/condition.h"
25#include "aos/event.h"
James Kuszmaul651fc3f2019-05-15 21:14:25 -070026#include "aos/init.h"
27#include "aos/ipc_lib/queue.h"
John Park33858a32018-09-28 23:05:48 -070028#include "aos/logging/implementations.h"
29#include "aos/logging/logging.h"
30#include "aos/mutex/mutex.h"
31#include "aos/time/time.h"
Brian Silvermane4d8b282015-12-24 13:44:48 -080032
33DEFINE_string(method, "", "Which IPC method to use");
34DEFINE_int32(messages, 1000000, "How many messages to send back and forth");
35DEFINE_int32(client_cpu, 0, "CPU to pin client to");
36DEFINE_int32(server_cpu, 0, "CPU to pin server to");
37DEFINE_int32(client_priority, 1,
38 "Realtime priority for client. Negative for don't change");
39DEFINE_int32(server_priority, 1,
40 "Realtime priority for server. Negative for don't change");
41
42namespace aos {
43
Austin Schuhf2a50ba2016-12-24 16:16:26 -080044namespace chrono = ::std::chrono;
45
Brian Silvermane4d8b282015-12-24 13:44:48 -080046// A generic interface for an object which can send some data to another thread
47// and back.
48//
49// One side is called the "server". It should constantly Wait, do something with
50// the result, and then call Pong.
51// The other side is called the "client". It should repeatedly call Ping.
52class PingPongerInterface {
53 public:
54 // A chunk of memory definitely on its own cache line anywhere sane.
55 typedef uint8_t Data[1024] __attribute__((aligned(128)));
56
57 virtual ~PingPongerInterface() {}
58
59 // Returns where the "client" side should write data in preparation to send to
60 // the server.
61 // The result is valid until the next Ping call.
62 virtual Data *PingData() = 0;
63
64 // Sends the data returned from the most recent PingData call to the "server"
65 // side and returns its response.
66 // PingData must be called exactly once before each call of this method.
67 // The result is valid until the next PingData call.
68 virtual const Data *Ping() = 0;
69
70 // Waits for a Ping call and then returns the associated data.
71 // The result is valid until the beginning of the next Pong call.
72 virtual const Data *Wait() = 0;
73
74 // Returns where the "server" side should write data in preparation to send
75 // back to the "client".
76 // The result is valid until the next Pong call.
77 virtual Data *PongData() = 0;
78
79 // Sends data back to an in-progress Ping.
80 // Sends the data returned from the most recent PongData call back to an
81 // in-progress Ping.
82 // PongData must be called exactly once before each call of this method.
83 virtual void Pong() = 0;
84};
85
86// Base class for implementations which simple use a pair of Data objects for
87// all Pings and Pongs.
88class StaticPingPonger : public PingPongerInterface {
89 public:
90 Data *PingData() override { return &ping_data_; }
91 Data *PongData() override { return &pong_data_; }
92
93 private:
94 Data ping_data_, pong_data_;
95};
96
97// Implements ping-pong by sending the data over file descriptors.
98class FDPingPonger : public StaticPingPonger {
99 protected:
100 // Subclasses must override and call Init.
101 FDPingPonger() {}
102
103 // Subclasses must call this in their constructor.
104 // Does not take ownership of any of the file descriptors, any/all of which
105 // may be the same.
106 // {server,client}_read must be open for reading and {server,client}_write
107 // must be open for writing.
108 void Init(int server_read, int server_write, int client_read,
109 int client_write) {
110 server_read_ = server_read;
111 server_write_ = server_write;
112 client_read_ = client_read;
113 client_write_ = client_write;
114 }
115
116 private:
117 const Data *Ping() override {
118 WriteFully(client_write_, *PingData());
119 ReadFully(client_read_, &read_by_client_);
120 return &read_by_client_;
121 }
122
123 const Data *Wait() override {
124 ReadFully(server_read_, &read_by_server_);
125 return &read_by_server_;
126 }
127
128 void Pong() override { WriteFully(server_write_, *PongData()); }
129
130 void ReadFully(int fd, Data *data) {
131 size_t remaining = sizeof(*data);
132 uint8_t *current = &(*data)[0];
133 while (remaining > 0) {
134 const ssize_t result = PCHECK(read(fd, current, remaining));
135 CHECK_LE(static_cast<size_t>(result), remaining);
136 remaining -= result;
137 current += result;
138 }
139 }
140
141 void WriteFully(int fd, const Data &data) {
142 size_t remaining = sizeof(data);
143 const uint8_t *current = &data[0];
144 while (remaining > 0) {
145 const ssize_t result = PCHECK(write(fd, current, remaining));
146 CHECK_LE(static_cast<size_t>(result), remaining);
147 remaining -= result;
148 current += result;
149 }
150 }
151
152 Data read_by_client_, read_by_server_;
153 int server_read_ = -1, server_write_ = -1, client_read_ = -1,
154 client_write_ = -1;
155};
156
157class PipePingPonger : public FDPingPonger {
158 public:
159 PipePingPonger() {
160 PCHECK(pipe(to_server));
161 PCHECK(pipe(from_server));
162 Init(to_server[0], from_server[1], from_server[0], to_server[1]);
163 }
164 ~PipePingPonger() {
165 PCHECK(close(to_server[0]));
166 PCHECK(close(to_server[1]));
167 PCHECK(close(from_server[0]));
168 PCHECK(close(from_server[1]));
169 }
170
171 private:
172 int to_server[2], from_server[2];
173};
174
175class NamedPipePingPonger : public FDPingPonger {
176 public:
177 NamedPipePingPonger() {
178 OpenFifo("/tmp/to_server", &client_write_, &server_read_);
179 OpenFifo("/tmp/from_server", &server_write_, &client_read_);
180
181 Init(server_read_, server_write_, client_read_, client_write_);
182 }
183 ~NamedPipePingPonger() {
184 PCHECK(close(server_read_));
185 PCHECK(close(client_write_));
186 PCHECK(close(client_read_));
187 PCHECK(close(server_write_));
188 }
189
190 private:
191 void OpenFifo(const char *name, int *write, int *read) {
192 {
193 const int ret = unlink(name);
194 if (ret == -1 && errno != ENOENT) {
195 PLOG(FATAL, "unlink(%s)", name);
196 }
197 PCHECK(mkfifo(name, S_IWUSR | S_IRUSR));
198 // Have to open it nonblocking because the other end isn't open yet...
199 *read = PCHECK(open(name, O_RDONLY | O_NONBLOCK));
200 *write = PCHECK(open(name, O_WRONLY));
201 {
202 const int flags = PCHECK(fcntl(*read, F_GETFL));
203 PCHECK(fcntl(*read, F_SETFL, flags & ~O_NONBLOCK));
204 }
205 }
206 }
207
208 int server_read_, server_write_, client_read_, client_write_;
209};
210
211class UnixPingPonger : public FDPingPonger {
212 public:
213 UnixPingPonger(int type) {
214 PCHECK(socketpair(AF_UNIX, type, 0, to_server));
215 PCHECK(socketpair(AF_UNIX, type, 0, from_server));
216 Init(to_server[0], from_server[1], from_server[0], to_server[1]);
217 }
218 ~UnixPingPonger() {
219 PCHECK(close(to_server[0]));
220 PCHECK(close(to_server[1]));
221 PCHECK(close(from_server[0]));
222 PCHECK(close(from_server[1]));
223 }
224
225 private:
226 int to_server[2], from_server[2];
227};
228
229class TCPPingPonger : public FDPingPonger {
230 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400231 TCPPingPonger(bool nodelay) {
Brian Silvermane4d8b282015-12-24 13:44:48 -0800232 server_ = PCHECK(socket(AF_INET, SOCK_STREAM, 0));
Brian Silvermanfd788882016-09-10 16:56:20 -0400233 if (nodelay) {
234 const int yes = 1;
235 PCHECK(setsockopt(server_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
236 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800237 {
238 sockaddr_in server_address;
239 memset(&server_address, 0, sizeof(server_address));
240 server_address.sin_family = AF_INET;
241 server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
242 PCHECK(bind(server_, reinterpret_cast<sockaddr *>(&server_address),
243 sizeof(server_address)));
244 }
245 PCHECK(listen(server_, 1));
246
247 client_ = PCHECK(socket(AF_INET, SOCK_STREAM, 0));
Brian Silvermanfd788882016-09-10 16:56:20 -0400248 if (nodelay) {
249 const int yes = 1;
250 PCHECK(setsockopt(client_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
251 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800252 {
253 sockaddr_in client_address;
254 unsigned int length = sizeof(client_address);
255 PCHECK(getsockname(server_, reinterpret_cast<sockaddr *>(&client_address),
256 &length));
257 PCHECK(connect(client_, reinterpret_cast<sockaddr *>(&client_address),
258 length));
259 }
260 server_connection_ = PCHECK(accept(server_, nullptr, 0));
261
262 Init(server_connection_, server_connection_, client_, client_);
263 }
264 ~TCPPingPonger() {
265 PCHECK(close(client_));
266 PCHECK(close(server_connection_));
267 PCHECK(close(server_));
268 }
269
270 private:
271 int server_, client_, server_connection_;
272};
273
274class UDPPingPonger : public FDPingPonger {
275 public:
276 UDPPingPonger() {
277 CreatePair(&server_read_, &client_write_);
278 CreatePair(&client_read_, &server_write_);
279
280 Init(server_read_, server_write_, client_read_, client_write_);
281 }
282 ~UDPPingPonger() {
283 PCHECK(close(server_read_));
284 PCHECK(close(client_write_));
285 PCHECK(close(client_read_));
286 PCHECK(close(server_write_));
287 }
288
289 private:
290 void CreatePair(int *server, int *client) {
291 *server = PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
292 {
293 sockaddr_in server_address;
294 memset(&server_address, 0, sizeof(server_address));
295 server_address.sin_family = AF_INET;
296 server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
297 // server_address.sin_port = htons(server_ + 3000);
298 PCHECK(bind(*server, reinterpret_cast<sockaddr *>(&server_address),
299 sizeof(server_address)));
300 }
301 *client = PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
302 {
303 sockaddr_in client_address;
304 unsigned int length = sizeof(client_address);
305 PCHECK(getsockname(*server, reinterpret_cast<sockaddr *>(&client_address),
306 &length));
307 PCHECK(connect(*client, reinterpret_cast<sockaddr *>(&client_address),
308 length));
309 }
310 }
311
312 int server_read_, server_write_, client_read_, client_write_;
313};
314
315// Implements ping-pong without copying the data using a condition variable-like
316// interface.
317class ConditionVariablePingPonger : public StaticPingPonger {
318 protected:
319 // Represents a condition variable bundled with a mutex.
320 //
321 // Wait may return spuriously.
322 class ConditionVariableInterface {
323 public:
324 virtual ~ConditionVariableInterface() {}
325
326 // Locks the mutex.
327 virtual void Lock() = 0;
328
329 // Unlocks the mutex.
330 virtual void Unlock() = 0;
331
332 // Waits on the condition variable.
333 //
334 // The mutex must be locked when this is called.
335 virtual void Wait() = 0;
336
337 // Signals the condition variable.
338 //
339 // The mutex does not have to be locked during this.
340 virtual void Signal() = 0;
341 };
342
343 ConditionVariablePingPonger(
344 ::std::unique_ptr<ConditionVariableInterface> ping,
345 ::std::unique_ptr<ConditionVariableInterface> pong)
346 : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
347
348 private:
349 const Data *Ping() override {
350 ping_->Lock();
351 to_server_ = PingData();
352 ping_->Unlock();
353 ping_->Signal();
354 pong_->Lock();
355 while (from_server_ == nullptr) {
356 pong_->Wait();
357 }
358 const Data *r = from_server_;
359 from_server_ = nullptr;
360 pong_->Unlock();
361 return r;
362 }
363
364 const Data *Wait() override {
365 ping_->Lock();
366 while (to_server_ == nullptr) {
367 ping_->Wait();
368 }
369 const Data *r = to_server_;
370 to_server_ = nullptr;
371 ping_->Unlock();
372 return r;
373 }
374
375 void Pong() override {
376 pong_->Lock();
377 from_server_ = PongData();
378 pong_->Unlock();
379 pong_->Signal();
380 }
381
382 const Data *to_server_ = nullptr, *from_server_ = nullptr;
383 const ::std::unique_ptr<ConditionVariableInterface> ping_, pong_;
384};
385
386// Implements ping-pong without copying the data using a semaphore-like
387// interface.
388class SemaphorePingPonger : public StaticPingPonger {
389 protected:
390 // Represents a semaphore, which need only count to 1.
391 //
392 // The behavior when calling Get/Put in anything other than alternating order
393 // is undefined.
394 //
395 // Wait may NOT return spuriously.
396 class SemaphoreInterface {
397 public:
398 virtual ~SemaphoreInterface() {}
399
400 virtual void Get() = 0;
401 virtual void Put() = 0;
402 };
403
404 SemaphorePingPonger(::std::unique_ptr<SemaphoreInterface> ping,
405 ::std::unique_ptr<SemaphoreInterface> pong)
406 : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
407
408 private:
409 const Data *Ping() override {
410 to_server_ = PingData();
411 ping_->Put();
412 pong_->Get();
413 return from_server_;
414 }
415
416 const Data *Wait() override {
417 ping_->Get();
418 return to_server_;
419 }
420
421 void Pong() override {
422 from_server_ = PongData();
423 pong_->Put();
424 }
425
426 const Data *to_server_ = nullptr, *from_server_ = nullptr;
427 const ::std::unique_ptr<SemaphoreInterface> ping_, pong_;
428};
429
Brian Silvermane4d8b282015-12-24 13:44:48 -0800430class AOSMutexPingPonger : public ConditionVariablePingPonger {
431 public:
432 AOSMutexPingPonger()
433 : ConditionVariablePingPonger(
434 ::std::unique_ptr<ConditionVariableInterface>(
435 new AOSConditionVariable()),
436 ::std::unique_ptr<ConditionVariableInterface>(
437 new AOSConditionVariable())) {}
438
439 private:
440 class AOSConditionVariable : public ConditionVariableInterface {
441 public:
442 AOSConditionVariable() : condition_(&mutex_) {}
443
444 private:
445 void Lock() override { CHECK(!mutex_.Lock()); }
446 void Unlock() override { mutex_.Unlock(); }
447 void Wait() override { CHECK(!condition_.Wait()); }
448 void Signal() override { condition_.Broadcast(); }
449
450 Mutex mutex_;
451 Condition condition_;
452 };
453};
454
455class AOSEventPingPonger : public SemaphorePingPonger {
456 public:
457 AOSEventPingPonger()
458 : SemaphorePingPonger(
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700459 ::std::unique_ptr<SemaphoreInterface>(new AOSEventSemaphore()),
460 ::std::unique_ptr<SemaphoreInterface>(new AOSEventSemaphore())) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800461
462 private:
463 class AOSEventSemaphore : public SemaphoreInterface {
464 private:
465 void Get() override {
466 event_.Wait();
467 event_.Clear();
468 }
469 void Put() override { event_.Set(); }
470
471 Event event_;
472 };
473};
474
475class PthreadMutexPingPonger : public ConditionVariablePingPonger {
476 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400477 PthreadMutexPingPonger(int pshared, bool pi)
Brian Silvermane4d8b282015-12-24 13:44:48 -0800478 : ConditionVariablePingPonger(
479 ::std::unique_ptr<ConditionVariableInterface>(
Brian Silvermanfd788882016-09-10 16:56:20 -0400480 new PthreadConditionVariable(pshared, pi)),
Brian Silvermane4d8b282015-12-24 13:44:48 -0800481 ::std::unique_ptr<ConditionVariableInterface>(
Brian Silvermanfd788882016-09-10 16:56:20 -0400482 new PthreadConditionVariable(pshared, pi))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800483
484 private:
485 class PthreadConditionVariable : public ConditionVariableInterface {
486 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400487 PthreadConditionVariable(bool pshared, bool pi) {
488 {
489 pthread_condattr_t cond_attr;
490 PRCHECK(pthread_condattr_init(&cond_attr));
491 if (pshared) {
492 PRCHECK(
493 pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED));
494 }
495 PRCHECK(pthread_cond_init(&condition_, &cond_attr));
496 PRCHECK(pthread_condattr_destroy(&cond_attr));
497 }
498
499 {
500 pthread_mutexattr_t mutex_attr;
501 PRCHECK(pthread_mutexattr_init(&mutex_attr));
502 if (pshared) {
503 PRCHECK(pthread_mutexattr_setpshared(&mutex_attr,
504 PTHREAD_PROCESS_SHARED));
505 }
506 if (pi) {
507 PRCHECK(
508 pthread_mutexattr_setprotocol(&mutex_attr, PTHREAD_PRIO_INHERIT));
509 }
510 PRCHECK(pthread_mutex_init(&mutex_, nullptr));
511 PRCHECK(pthread_mutexattr_destroy(&mutex_attr));
512 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800513 }
514 ~PthreadConditionVariable() {
515 PRCHECK(pthread_mutex_destroy(&mutex_));
516 PRCHECK(pthread_cond_destroy(&condition_));
517 }
518
519 private:
520 void Lock() override { PRCHECK(pthread_mutex_lock(&mutex_)); }
521 void Unlock() override { PRCHECK(pthread_mutex_unlock(&mutex_)); }
522 void Wait() override { PRCHECK(pthread_cond_wait(&condition_, &mutex_)); }
523 void Signal() override { PRCHECK(pthread_cond_broadcast(&condition_)); }
524
525 pthread_cond_t condition_;
526 pthread_mutex_t mutex_;
527 };
528};
529
530class EventFDPingPonger : public SemaphorePingPonger {
531 public:
532 EventFDPingPonger()
533 : SemaphorePingPonger(
534 ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore()),
535 ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore())) {}
536
537 private:
538 class EventFDSemaphore : public SemaphoreInterface {
539 public:
540 EventFDSemaphore() : fd_(PCHECK(eventfd(0, 0))) {}
541 ~EventFDSemaphore() { PCHECK(close(fd_)); }
542
543 private:
544 void Get() override {
545 uint64_t value;
546 if (read(fd_, &value, sizeof(value)) != sizeof(value)) {
547 PLOG(FATAL, "reading from eventfd %d failed\n", fd_);
548 }
549 CHECK_EQ(1u, value);
550 }
551 void Put() override {
552 uint64_t value = 1;
553 if (write(fd_, &value, sizeof(value)) != sizeof(value)) {
554 PLOG(FATAL, "writing to eventfd %d failed\n", fd_);
555 }
556 }
557
558 const int fd_;
559 };
560};
561
562class SysvSemaphorePingPonger : public SemaphorePingPonger {
563 public:
564 SysvSemaphorePingPonger()
565 : SemaphorePingPonger(
566 ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore()),
567 ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore())) {}
568
569 private:
570 class SysvSemaphore : public SemaphoreInterface {
571 public:
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700572 SysvSemaphore() : sem_id_(PCHECK(semget(IPC_PRIVATE, 1, 0600))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800573
574 private:
575 void Get() override {
576 struct sembuf op;
577 op.sem_num = 0;
578 op.sem_op = -1;
579 op.sem_flg = 0;
580 PCHECK(semop(sem_id_, &op, 1));
581 }
582 void Put() override {
583 struct sembuf op;
584 op.sem_num = 0;
585 op.sem_op = 1;
586 op.sem_flg = 0;
587 PCHECK(semop(sem_id_, &op, 1));
588 }
589
590 const int sem_id_;
591 };
592};
593
594class PosixSemaphorePingPonger : public SemaphorePingPonger {
595 protected:
596 PosixSemaphorePingPonger(sem_t *ping_sem, sem_t *pong_sem)
597 : SemaphorePingPonger(
598 ::std::unique_ptr<SemaphoreInterface>(new PosixSemaphore(ping_sem)),
599 ::std::unique_ptr<SemaphoreInterface>(
600 new PosixSemaphore(pong_sem))) {}
601
602 private:
603 class PosixSemaphore : public SemaphoreInterface {
604 public:
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700605 PosixSemaphore(sem_t *sem) : sem_(sem) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800606
607 private:
608 void Get() override { PCHECK(sem_wait(sem_)); }
609 void Put() override { PCHECK(sem_post(sem_)); }
610
611 sem_t *const sem_;
612 };
613};
614
615class SysvQueuePingPonger : public StaticPingPonger {
616 public:
617 SysvQueuePingPonger()
618 : ping_(PCHECK(msgget(IPC_PRIVATE, 0600))),
619 pong_(PCHECK(msgget(IPC_PRIVATE, 0600))) {}
620
621 const Data *Ping() override {
622 {
623 Message to_send;
624 memcpy(&to_send.data, PingData(), sizeof(Data));
625 PCHECK(msgsnd(ping_, &to_send, sizeof(Data), 0));
626 }
627 {
628 Message received;
629 PCHECK(msgrcv(pong_, &received, sizeof(Data), 1, 0));
630 memcpy(&pong_received_, &received.data, sizeof(Data));
631 }
632 return &pong_received_;
633 }
634
635 const Data *Wait() override {
636 {
637 Message received;
638 PCHECK(msgrcv(ping_, &received, sizeof(Data), 1, 0));
639 memcpy(&ping_received_, &received.data, sizeof(Data));
640 }
641 return &ping_received_;
642 }
643
644 virtual void Pong() override {
645 Message to_send;
646 memcpy(&to_send.data, PongData(), sizeof(Data));
647 PCHECK(msgsnd(pong_, &to_send, sizeof(Data), 0));
648 }
649
650 private:
651 struct Message {
652 long mtype = 1;
653 char data[sizeof(Data)];
654 };
655
656 Data ping_received_, pong_received_;
657
658 const int ping_, pong_;
659};
660
661class PosixQueuePingPonger : public StaticPingPonger {
662 public:
663 PosixQueuePingPonger() : ping_(Open("/ping")), pong_(Open("/pong")) {}
664 ~PosixQueuePingPonger() {
665 PCHECK(mq_close(ping_));
666 PCHECK(mq_close(pong_));
667 }
668
669 const Data *Ping() override {
670 PCHECK(mq_send(ping_, static_cast<char *>(static_cast<void *>(PingData())),
671 sizeof(Data), 1));
672 PCHECK(mq_receive(pong_,
673 static_cast<char *>(static_cast<void *>(&pong_received_)),
674 sizeof(Data), nullptr));
675 return &pong_received_;
676 }
677
678 const Data *Wait() override {
679 PCHECK(mq_receive(ping_,
680 static_cast<char *>(static_cast<void *>(&ping_received_)),
681 sizeof(Data), nullptr));
682 return &ping_received_;
683 }
684
685 virtual void Pong() override {
686 PCHECK(mq_send(pong_, static_cast<char *>(static_cast<void *>(PongData())),
687 sizeof(Data), 1));
688 }
689
690 private:
691 mqd_t Open(const char *name) {
692 if (mq_unlink(name) == -1 && errno != ENOENT) {
693 PLOG(FATAL, "mq_unlink(%s) failed", name);
694 }
695 struct mq_attr attr;
696 attr.mq_flags = 0;
697 attr.mq_maxmsg = 1;
698 attr.mq_msgsize = sizeof(Data);
699 attr.mq_curmsgs = 0;
700 const mqd_t r = mq_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, &attr);
701 if (r == reinterpret_cast<mqd_t>(-1)) {
702 PLOG(FATAL, "mq_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
703 }
704 return r;
705 }
706
707 const mqd_t ping_, pong_;
708 Data ping_received_, pong_received_;
709};
710
711class PosixUnnamedSemaphorePingPonger : public PosixSemaphorePingPonger {
712 public:
713 PosixUnnamedSemaphorePingPonger(int pshared)
714 : PosixSemaphorePingPonger(&ping_sem_, &pong_sem_) {
715 PCHECK(sem_init(&ping_sem_, pshared, 0));
716 PCHECK(sem_init(&pong_sem_, pshared, 0));
717 }
718 ~PosixUnnamedSemaphorePingPonger() {
719 PCHECK(sem_destroy(&ping_sem_));
720 PCHECK(sem_destroy(&pong_sem_));
721 }
722
723 private:
724 sem_t ping_sem_, pong_sem_;
725};
726
727class PosixNamedSemaphorePingPonger : public PosixSemaphorePingPonger {
728 public:
729 PosixNamedSemaphorePingPonger()
730 : PosixSemaphorePingPonger(ping_sem_ = Open("/ping"),
731 pong_sem_ = Open("/pong")) {}
732 ~PosixNamedSemaphorePingPonger() {
733 PCHECK(sem_close(ping_sem_));
734 PCHECK(sem_close(pong_sem_));
735 }
736
737 private:
738 sem_t *Open(const char *name) {
739 if (sem_unlink(name) == -1 && errno != ENOENT) {
740 PLOG(FATAL, "shm_unlink(%s) failed", name);
741 }
742 sem_t *const r = sem_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, 0);
743 if (r == SEM_FAILED) {
744 PLOG(FATAL, "sem_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
745 }
746 return r;
747 }
748
749 sem_t *ping_sem_, *pong_sem_;
750};
751
752class AOSQueuePingPonger : public PingPongerInterface {
753 public:
754 AOSQueuePingPonger()
755 : ping_queue_(RawQueue::Fetch("ping", sizeof(Data), 0, 1)),
756 pong_queue_(RawQueue::Fetch("pong", sizeof(Data), 0, 1)) {}
757
758 Data *PingData() override {
759 CHECK_EQ(nullptr, ping_to_send_);
760 ping_to_send_ = static_cast<Data *>(ping_queue_->GetMessage());
761 return ping_to_send_;
762 }
763
764 const Data *Ping() override {
765 CHECK_NE(nullptr, ping_to_send_);
766 CHECK(ping_queue_->WriteMessage(ping_to_send_, RawQueue::kBlock));
767 ping_to_send_ = nullptr;
768 pong_queue_->FreeMessage(pong_received_);
769 pong_received_ =
770 static_cast<const Data *>(pong_queue_->ReadMessage(RawQueue::kBlock));
771 return pong_received_;
772 }
773
774 const Data *Wait() override {
775 ping_queue_->FreeMessage(ping_received_);
776 ping_received_ =
777 static_cast<const Data *>(ping_queue_->ReadMessage(RawQueue::kBlock));
778 return ping_received_;
779 }
780
781 Data *PongData() override {
782 CHECK_EQ(nullptr, pong_to_send_);
783 pong_to_send_ = static_cast<Data *>(pong_queue_->GetMessage());
784 return pong_to_send_;
785 }
786
787 void Pong() override {
788 CHECK_NE(nullptr, pong_to_send_);
789 CHECK(pong_queue_->WriteMessage(pong_to_send_, RawQueue::kBlock));
790 pong_to_send_ = nullptr;
791 }
792
793 private:
794 RawQueue *const ping_queue_;
795 RawQueue *const pong_queue_;
796
797 Data *ping_to_send_ = nullptr, *pong_to_send_ = nullptr;
798 const Data *ping_received_ = nullptr, *pong_received_ = nullptr;
799};
800
801int Main(int /*argc*/, char **argv) {
802 ::std::unique_ptr<PingPongerInterface> ping_ponger;
803 if (FLAGS_method == "pipe") {
804 ping_ponger.reset(new PipePingPonger());
805 } else if (FLAGS_method == "named_pipe") {
806 ping_ponger.reset(new NamedPipePingPonger());
807 } else if (FLAGS_method == "aos_mutex") {
808 ping_ponger.reset(new AOSMutexPingPonger());
809 } else if (FLAGS_method == "aos_event") {
810 ping_ponger.reset(new AOSEventPingPonger());
811 } else if (FLAGS_method == "pthread_mutex") {
Brian Silvermanfd788882016-09-10 16:56:20 -0400812 ping_ponger.reset(new PthreadMutexPingPonger(false, false));
813 } else if (FLAGS_method == "pthread_mutex_pshared") {
814 ping_ponger.reset(new PthreadMutexPingPonger(true, false));
815 } else if (FLAGS_method == "pthread_mutex_pshared_pi") {
816 ping_ponger.reset(new PthreadMutexPingPonger(true, true));
817 } else if (FLAGS_method == "pthread_mutex_pi") {
818 ping_ponger.reset(new PthreadMutexPingPonger(false, true));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800819 } else if (FLAGS_method == "aos_queue") {
820 ping_ponger.reset(new AOSQueuePingPonger());
821 } else if (FLAGS_method == "eventfd") {
822 ping_ponger.reset(new EventFDPingPonger());
823 } else if (FLAGS_method == "sysv_semaphore") {
824 ping_ponger.reset(new SysvSemaphorePingPonger());
825 } else if (FLAGS_method == "sysv_queue") {
826 ping_ponger.reset(new SysvQueuePingPonger());
827 } else if (FLAGS_method == "posix_semaphore_unnamed_shared") {
828 ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(1));
829 } else if (FLAGS_method == "posix_semaphore_unnamed_unshared") {
830 ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(0));
831 } else if (FLAGS_method == "posix_semaphore_named") {
832 ping_ponger.reset(new PosixNamedSemaphorePingPonger());
833 } else if (FLAGS_method == "posix_queue") {
834 ping_ponger.reset(new PosixQueuePingPonger());
835 } else if (FLAGS_method == "unix_stream") {
836 ping_ponger.reset(new UnixPingPonger(SOCK_STREAM));
837 } else if (FLAGS_method == "unix_datagram") {
838 ping_ponger.reset(new UnixPingPonger(SOCK_DGRAM));
839 } else if (FLAGS_method == "unix_seqpacket") {
840 ping_ponger.reset(new UnixPingPonger(SOCK_SEQPACKET));
841 } else if (FLAGS_method == "tcp") {
Brian Silvermanfd788882016-09-10 16:56:20 -0400842 ping_ponger.reset(new TCPPingPonger(false));
843 } else if (FLAGS_method == "tcp_nodelay") {
844 ping_ponger.reset(new TCPPingPonger(true));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800845 } else if (FLAGS_method == "udp") {
846 ping_ponger.reset(new UDPPingPonger());
847 } else {
848 fprintf(stderr, "Unknown IPC method to test '%s'\n", FLAGS_method.c_str());
849 ::gflags::ShowUsageWithFlags(argv[0]);
850 return 1;
851 }
852
Brian Silverman1d42ce22016-09-10 16:55:40 -0400853 ::std::atomic<bool> done{false};
Brian Silvermane4d8b282015-12-24 13:44:48 -0800854
855 ::std::thread server([&ping_ponger, &done]() {
856 if (FLAGS_server_priority > 0) {
857 SetCurrentThreadRealtimePriority(FLAGS_server_priority);
858 }
859 PinCurrentThreadToCPU(FLAGS_server_cpu);
860
861 while (!done) {
862 const PingPongerInterface::Data *data = ping_ponger->Wait();
863 PingPongerInterface::Data *response = ping_ponger->PongData();
864 for (size_t i = 0; i < sizeof(*data); ++i) {
865 (*response)[i] = (*data)[i] + 1;
866 }
867 ping_ponger->Pong();
868 }
869 });
870
871 if (FLAGS_client_priority > 0) {
872 SetCurrentThreadRealtimePriority(FLAGS_client_priority);
873 }
874 PinCurrentThreadToCPU(FLAGS_client_cpu);
875
876 // Warm everything up.
877 for (int i = 0; i < 1000; ++i) {
878 PingPongerInterface::Data *warmup_data = ping_ponger->PingData();
879 memset(*warmup_data, i % 255, sizeof(*warmup_data));
880 ping_ponger->Ping();
881 }
882
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800883 const monotonic_clock::time_point start = monotonic_clock::now();
Brian Silvermane4d8b282015-12-24 13:44:48 -0800884
885 for (int32_t i = 0; i < FLAGS_messages; ++i) {
886 PingPongerInterface::Data *to_send = ping_ponger->PingData();
887 memset(*to_send, i % 123, sizeof(*to_send));
888 const PingPongerInterface::Data *received = ping_ponger->Ping();
889 for (size_t ii = 0; ii < sizeof(*received); ++ii) {
890 CHECK_EQ(((i % 123) + 1) % 255, (*received)[ii]);
891 }
892 }
893
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800894 const monotonic_clock::time_point end = monotonic_clock::now();
Brian Silvermane4d8b282015-12-24 13:44:48 -0800895
Brian Silverman1d42ce22016-09-10 16:55:40 -0400896 // Try to make sure the server thread gets past its check of done so our
897 // Ping() down below doesn't hang. Kind of lame, but doing better would
898 // require complicating the interface to each implementation which isn't worth
899 // it here.
900 ::std::this_thread::sleep_for(::std::chrono::milliseconds(200));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800901 done = true;
902 ping_ponger->PingData();
903 ping_ponger->Ping();
904 server.join();
905
906 LOG(INFO, "Took %f seconds to send %" PRId32 " messages\n",
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700907 ::aos::time::DurationInSeconds(end - start), FLAGS_messages);
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800908 const chrono::nanoseconds per_message = (end - start) / FLAGS_messages;
909 if (per_message >= chrono::seconds(1)) {
Brian Silvermane4d8b282015-12-24 13:44:48 -0800910 LOG(INFO, "More than 1 second per message ?!?\n");
911 } else {
912 LOG(INFO, "That is %" PRId32 " nanoseconds per message\n",
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800913 static_cast<int>(per_message.count()));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800914 }
915
916 return 0;
917}
918
919} // namespace aos
920
921int main(int argc, char **argv) {
922 ::gflags::SetUsageMessage(
923 ::std::string("Compares various forms of IPC. Usage:\n") + argv[0] +
924 " --method=METHOD\n"
925 "METHOD can be one of the following:\n"
926 "\tpipe\n"
927 "\tnamed_pipe\n"
928 "\taos_mutex\n"
929 "\taos_event\n"
930 "\tpthread_mutex\n"
Brian Silvermanfd788882016-09-10 16:56:20 -0400931 "\tpthread_mutex_pshared\n"
932 "\tpthread_mutex_pshared_pi\n"
933 "\tpthread_mutex_pi\n"
Brian Silvermane4d8b282015-12-24 13:44:48 -0800934 "\taos_queue\n"
935 "\teventfd\n"
936 "\tsysv_semaphore\n"
937 "\tsysv_queue\n"
938 "\tposix_semaphore_unnamed_shared\n"
939 "\tposix_semaphore_unnamed_unshared\n"
940 "\tposix_semaphore_named\n"
941 "\tposix_queue\n"
942 "\tunix_stream\n"
943 "\tunix_datagram\n"
944 "\tunix_seqpacket\n"
945 "\ttcp\n"
Brian Silvermanfd788882016-09-10 16:56:20 -0400946 "\ttcp_nodelay\n"
Brian Silvermane4d8b282015-12-24 13:44:48 -0800947 "\tudp\n");
948 ::gflags::ParseCommandLineFlags(&argc, &argv, true);
949
950 ::aos::InitNRT();
951 ::aos::logging::AddImplementation(
952 new ::aos::logging::StreamLogImplementation(stdout));
953
954 return ::aos::Main(argc, argv);
955}