blob: 5ea1c8b1415d6e3e9d922cfadd9d8c1f1b4a5a54 [file] [log] [blame]
Austin Schuh87dc7bb2018-10-29 21:53:23 -07001#include "gflags/gflags.h"
Brian Silvermane4d8b282015-12-24 13:44:48 -08002
Austin Schuhf2a50ba2016-12-24 16:16:26 -08003#include <fcntl.h>
4#include <mqueue.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -08005#include <netinet/in.h>
Brian Silvermanfd788882016-09-10 16:56:20 -04006#include <netinet/tcp.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -08007#include <pthread.h>
8#include <semaphore.h>
9#include <stdint.h>
10#include <sys/eventfd.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -080011#include <sys/msg.h>
12#include <sys/sem.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -080013#include <sys/socket.h>
14#include <sys/stat.h>
15#include <sys/types.h>
16#include <sys/un.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -080017
Austin Schuhf2a50ba2016-12-24 16:16:26 -080018#include <atomic>
19#include <chrono>
Brian Silvermane4d8b282015-12-24 13:44:48 -080020#include <memory>
21#include <string>
Austin Schuhf2a50ba2016-12-24 16:16:26 -080022#include <thread>
Brian Silvermane4d8b282015-12-24 13:44:48 -080023
John Park33858a32018-09-28 23:05:48 -070024#include "aos/condition.h"
25#include "aos/event.h"
James Kuszmaul651fc3f2019-05-15 21:14:25 -070026#include "aos/init.h"
27#include "aos/ipc_lib/queue.h"
John Park33858a32018-09-28 23:05:48 -070028#include "aos/logging/implementations.h"
29#include "aos/logging/logging.h"
30#include "aos/mutex/mutex.h"
31#include "aos/time/time.h"
Brian Silvermane4d8b282015-12-24 13:44:48 -080032
33DEFINE_string(method, "", "Which IPC method to use");
34DEFINE_int32(messages, 1000000, "How many messages to send back and forth");
35DEFINE_int32(client_cpu, 0, "CPU to pin client to");
36DEFINE_int32(server_cpu, 0, "CPU to pin server to");
37DEFINE_int32(client_priority, 1,
38 "Realtime priority for client. Negative for don't change");
39DEFINE_int32(server_priority, 1,
40 "Realtime priority for server. Negative for don't change");
41
42namespace aos {
43
Austin Schuhf2a50ba2016-12-24 16:16:26 -080044namespace chrono = ::std::chrono;
45
Brian Silvermane4d8b282015-12-24 13:44:48 -080046// A generic interface for an object which can send some data to another thread
47// and back.
48//
49// One side is called the "server". It should constantly Wait, do something with
50// the result, and then call Pong.
51// The other side is called the "client". It should repeatedly call Ping.
52class PingPongerInterface {
53 public:
54 // A chunk of memory definitely on its own cache line anywhere sane.
55 typedef uint8_t Data[1024] __attribute__((aligned(128)));
56
57 virtual ~PingPongerInterface() {}
58
59 // Returns where the "client" side should write data in preparation to send to
60 // the server.
61 // The result is valid until the next Ping call.
62 virtual Data *PingData() = 0;
63
64 // Sends the data returned from the most recent PingData call to the "server"
65 // side and returns its response.
66 // PingData must be called exactly once before each call of this method.
67 // The result is valid until the next PingData call.
68 virtual const Data *Ping() = 0;
69
70 // Waits for a Ping call and then returns the associated data.
71 // The result is valid until the beginning of the next Pong call.
72 virtual const Data *Wait() = 0;
73
74 // Returns where the "server" side should write data in preparation to send
75 // back to the "client".
76 // The result is valid until the next Pong call.
77 virtual Data *PongData() = 0;
78
79 // Sends data back to an in-progress Ping.
80 // Sends the data returned from the most recent PongData call back to an
81 // in-progress Ping.
82 // PongData must be called exactly once before each call of this method.
83 virtual void Pong() = 0;
84};
85
86// Base class for implementations which simple use a pair of Data objects for
87// all Pings and Pongs.
88class StaticPingPonger : public PingPongerInterface {
89 public:
90 Data *PingData() override { return &ping_data_; }
91 Data *PongData() override { return &pong_data_; }
92
93 private:
94 Data ping_data_, pong_data_;
95};
96
97// Implements ping-pong by sending the data over file descriptors.
98class FDPingPonger : public StaticPingPonger {
99 protected:
100 // Subclasses must override and call Init.
101 FDPingPonger() {}
102
103 // Subclasses must call this in their constructor.
104 // Does not take ownership of any of the file descriptors, any/all of which
105 // may be the same.
106 // {server,client}_read must be open for reading and {server,client}_write
107 // must be open for writing.
108 void Init(int server_read, int server_write, int client_read,
109 int client_write) {
110 server_read_ = server_read;
111 server_write_ = server_write;
112 client_read_ = client_read;
113 client_write_ = client_write;
114 }
115
116 private:
117 const Data *Ping() override {
118 WriteFully(client_write_, *PingData());
119 ReadFully(client_read_, &read_by_client_);
120 return &read_by_client_;
121 }
122
123 const Data *Wait() override {
124 ReadFully(server_read_, &read_by_server_);
125 return &read_by_server_;
126 }
127
128 void Pong() override { WriteFully(server_write_, *PongData()); }
129
130 void ReadFully(int fd, Data *data) {
131 size_t remaining = sizeof(*data);
132 uint8_t *current = &(*data)[0];
133 while (remaining > 0) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700134 const ssize_t result = AOS_PCHECK(read(fd, current, remaining));
135 AOS_CHECK_LE(static_cast<size_t>(result), remaining);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800136 remaining -= result;
137 current += result;
138 }
139 }
140
141 void WriteFully(int fd, const Data &data) {
142 size_t remaining = sizeof(data);
143 const uint8_t *current = &data[0];
144 while (remaining > 0) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700145 const ssize_t result = AOS_PCHECK(write(fd, current, remaining));
146 AOS_CHECK_LE(static_cast<size_t>(result), remaining);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800147 remaining -= result;
148 current += result;
149 }
150 }
151
152 Data read_by_client_, read_by_server_;
153 int server_read_ = -1, server_write_ = -1, client_read_ = -1,
154 client_write_ = -1;
155};
156
157class PipePingPonger : public FDPingPonger {
158 public:
159 PipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700160 AOS_PCHECK(pipe(to_server));
161 AOS_PCHECK(pipe(from_server));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800162 Init(to_server[0], from_server[1], from_server[0], to_server[1]);
163 }
164 ~PipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700165 AOS_PCHECK(close(to_server[0]));
166 AOS_PCHECK(close(to_server[1]));
167 AOS_PCHECK(close(from_server[0]));
168 AOS_PCHECK(close(from_server[1]));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800169 }
170
171 private:
172 int to_server[2], from_server[2];
173};
174
175class NamedPipePingPonger : public FDPingPonger {
176 public:
177 NamedPipePingPonger() {
178 OpenFifo("/tmp/to_server", &client_write_, &server_read_);
179 OpenFifo("/tmp/from_server", &server_write_, &client_read_);
180
181 Init(server_read_, server_write_, client_read_, client_write_);
182 }
183 ~NamedPipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700184 AOS_PCHECK(close(server_read_));
185 AOS_PCHECK(close(client_write_));
186 AOS_PCHECK(close(client_read_));
187 AOS_PCHECK(close(server_write_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800188 }
189
190 private:
191 void OpenFifo(const char *name, int *write, int *read) {
192 {
193 const int ret = unlink(name);
194 if (ret == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700195 AOS_PLOG(FATAL, "unlink(%s)", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800196 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700197 AOS_PCHECK(mkfifo(name, S_IWUSR | S_IRUSR));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800198 // Have to open it nonblocking because the other end isn't open yet...
Austin Schuhf257f3c2019-10-27 21:00:43 -0700199 *read = AOS_PCHECK(open(name, O_RDONLY | O_NONBLOCK));
200 *write = AOS_PCHECK(open(name, O_WRONLY));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800201 {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700202 const int flags = AOS_PCHECK(fcntl(*read, F_GETFL));
203 AOS_PCHECK(fcntl(*read, F_SETFL, flags & ~O_NONBLOCK));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800204 }
205 }
206 }
207
208 int server_read_, server_write_, client_read_, client_write_;
209};
210
211class UnixPingPonger : public FDPingPonger {
212 public:
213 UnixPingPonger(int type) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700214 AOS_PCHECK(socketpair(AF_UNIX, type, 0, to_server));
215 AOS_PCHECK(socketpair(AF_UNIX, type, 0, from_server));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800216 Init(to_server[0], from_server[1], from_server[0], to_server[1]);
217 }
218 ~UnixPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700219 AOS_PCHECK(close(to_server[0]));
220 AOS_PCHECK(close(to_server[1]));
221 AOS_PCHECK(close(from_server[0]));
222 AOS_PCHECK(close(from_server[1]));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800223 }
224
225 private:
226 int to_server[2], from_server[2];
227};
228
229class TCPPingPonger : public FDPingPonger {
230 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400231 TCPPingPonger(bool nodelay) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700232 server_ = AOS_PCHECK(socket(AF_INET, SOCK_STREAM, 0));
Brian Silvermanfd788882016-09-10 16:56:20 -0400233 if (nodelay) {
234 const int yes = 1;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700235 AOS_PCHECK(
236 setsockopt(server_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
Brian Silvermanfd788882016-09-10 16:56:20 -0400237 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800238 {
239 sockaddr_in server_address;
240 memset(&server_address, 0, sizeof(server_address));
241 server_address.sin_family = AF_INET;
242 server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700243 AOS_PCHECK(bind(server_, reinterpret_cast<sockaddr *>(&server_address),
244 sizeof(server_address)));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800245 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700246 AOS_PCHECK(listen(server_, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800247
Austin Schuhf257f3c2019-10-27 21:00:43 -0700248 client_ = AOS_PCHECK(socket(AF_INET, SOCK_STREAM, 0));
Brian Silvermanfd788882016-09-10 16:56:20 -0400249 if (nodelay) {
250 const int yes = 1;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700251 AOS_PCHECK(
252 setsockopt(client_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
Brian Silvermanfd788882016-09-10 16:56:20 -0400253 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800254 {
255 sockaddr_in client_address;
256 unsigned int length = sizeof(client_address);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700257 AOS_PCHECK(getsockname(
258 server_, reinterpret_cast<sockaddr *>(&client_address), &length));
259 AOS_PCHECK(connect(client_, reinterpret_cast<sockaddr *>(&client_address),
260 length));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800261 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700262 server_connection_ = AOS_PCHECK(accept(server_, nullptr, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800263
264 Init(server_connection_, server_connection_, client_, client_);
265 }
266 ~TCPPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700267 AOS_PCHECK(close(client_));
268 AOS_PCHECK(close(server_connection_));
269 AOS_PCHECK(close(server_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800270 }
271
272 private:
273 int server_, client_, server_connection_;
274};
275
276class UDPPingPonger : public FDPingPonger {
277 public:
278 UDPPingPonger() {
279 CreatePair(&server_read_, &client_write_);
280 CreatePair(&client_read_, &server_write_);
281
282 Init(server_read_, server_write_, client_read_, client_write_);
283 }
284 ~UDPPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700285 AOS_PCHECK(close(server_read_));
286 AOS_PCHECK(close(client_write_));
287 AOS_PCHECK(close(client_read_));
288 AOS_PCHECK(close(server_write_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800289 }
290
291 private:
292 void CreatePair(int *server, int *client) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700293 *server = AOS_PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800294 {
295 sockaddr_in server_address;
296 memset(&server_address, 0, sizeof(server_address));
297 server_address.sin_family = AF_INET;
298 server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
299 // server_address.sin_port = htons(server_ + 3000);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700300 AOS_PCHECK(bind(*server, reinterpret_cast<sockaddr *>(&server_address),
301 sizeof(server_address)));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800302 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700303 *client = AOS_PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800304 {
305 sockaddr_in client_address;
306 unsigned int length = sizeof(client_address);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700307 AOS_PCHECK(getsockname(
308 *server, reinterpret_cast<sockaddr *>(&client_address), &length));
309 AOS_PCHECK(connect(*client, reinterpret_cast<sockaddr *>(&client_address),
310 length));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800311 }
312 }
313
314 int server_read_, server_write_, client_read_, client_write_;
315};
316
317// Implements ping-pong without copying the data using a condition variable-like
318// interface.
319class ConditionVariablePingPonger : public StaticPingPonger {
320 protected:
321 // Represents a condition variable bundled with a mutex.
322 //
323 // Wait may return spuriously.
324 class ConditionVariableInterface {
325 public:
326 virtual ~ConditionVariableInterface() {}
327
328 // Locks the mutex.
329 virtual void Lock() = 0;
330
331 // Unlocks the mutex.
332 virtual void Unlock() = 0;
333
334 // Waits on the condition variable.
335 //
336 // The mutex must be locked when this is called.
337 virtual void Wait() = 0;
338
339 // Signals the condition variable.
340 //
341 // The mutex does not have to be locked during this.
342 virtual void Signal() = 0;
343 };
344
345 ConditionVariablePingPonger(
346 ::std::unique_ptr<ConditionVariableInterface> ping,
347 ::std::unique_ptr<ConditionVariableInterface> pong)
348 : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
349
350 private:
351 const Data *Ping() override {
352 ping_->Lock();
353 to_server_ = PingData();
354 ping_->Unlock();
355 ping_->Signal();
356 pong_->Lock();
357 while (from_server_ == nullptr) {
358 pong_->Wait();
359 }
360 const Data *r = from_server_;
361 from_server_ = nullptr;
362 pong_->Unlock();
363 return r;
364 }
365
366 const Data *Wait() override {
367 ping_->Lock();
368 while (to_server_ == nullptr) {
369 ping_->Wait();
370 }
371 const Data *r = to_server_;
372 to_server_ = nullptr;
373 ping_->Unlock();
374 return r;
375 }
376
377 void Pong() override {
378 pong_->Lock();
379 from_server_ = PongData();
380 pong_->Unlock();
381 pong_->Signal();
382 }
383
384 const Data *to_server_ = nullptr, *from_server_ = nullptr;
385 const ::std::unique_ptr<ConditionVariableInterface> ping_, pong_;
386};
387
388// Implements ping-pong without copying the data using a semaphore-like
389// interface.
390class SemaphorePingPonger : public StaticPingPonger {
391 protected:
392 // Represents a semaphore, which need only count to 1.
393 //
394 // The behavior when calling Get/Put in anything other than alternating order
395 // is undefined.
396 //
397 // Wait may NOT return spuriously.
398 class SemaphoreInterface {
399 public:
400 virtual ~SemaphoreInterface() {}
401
402 virtual void Get() = 0;
403 virtual void Put() = 0;
404 };
405
406 SemaphorePingPonger(::std::unique_ptr<SemaphoreInterface> ping,
407 ::std::unique_ptr<SemaphoreInterface> pong)
408 : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
409
410 private:
411 const Data *Ping() override {
412 to_server_ = PingData();
413 ping_->Put();
414 pong_->Get();
415 return from_server_;
416 }
417
418 const Data *Wait() override {
419 ping_->Get();
420 return to_server_;
421 }
422
423 void Pong() override {
424 from_server_ = PongData();
425 pong_->Put();
426 }
427
428 const Data *to_server_ = nullptr, *from_server_ = nullptr;
429 const ::std::unique_ptr<SemaphoreInterface> ping_, pong_;
430};
431
Brian Silvermane4d8b282015-12-24 13:44:48 -0800432class AOSMutexPingPonger : public ConditionVariablePingPonger {
433 public:
434 AOSMutexPingPonger()
435 : ConditionVariablePingPonger(
436 ::std::unique_ptr<ConditionVariableInterface>(
437 new AOSConditionVariable()),
438 ::std::unique_ptr<ConditionVariableInterface>(
439 new AOSConditionVariable())) {}
440
441 private:
442 class AOSConditionVariable : public ConditionVariableInterface {
443 public:
444 AOSConditionVariable() : condition_(&mutex_) {}
445
446 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700447 void Lock() override { AOS_CHECK(!mutex_.Lock()); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800448 void Unlock() override { mutex_.Unlock(); }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700449 void Wait() override { AOS_CHECK(!condition_.Wait()); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800450 void Signal() override { condition_.Broadcast(); }
451
452 Mutex mutex_;
453 Condition condition_;
454 };
455};
456
457class AOSEventPingPonger : public SemaphorePingPonger {
458 public:
459 AOSEventPingPonger()
460 : SemaphorePingPonger(
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700461 ::std::unique_ptr<SemaphoreInterface>(new AOSEventSemaphore()),
462 ::std::unique_ptr<SemaphoreInterface>(new AOSEventSemaphore())) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800463
464 private:
465 class AOSEventSemaphore : public SemaphoreInterface {
466 private:
467 void Get() override {
468 event_.Wait();
469 event_.Clear();
470 }
471 void Put() override { event_.Set(); }
472
473 Event event_;
474 };
475};
476
477class PthreadMutexPingPonger : public ConditionVariablePingPonger {
478 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400479 PthreadMutexPingPonger(int pshared, bool pi)
Brian Silvermane4d8b282015-12-24 13:44:48 -0800480 : ConditionVariablePingPonger(
481 ::std::unique_ptr<ConditionVariableInterface>(
Brian Silvermanfd788882016-09-10 16:56:20 -0400482 new PthreadConditionVariable(pshared, pi)),
Brian Silvermane4d8b282015-12-24 13:44:48 -0800483 ::std::unique_ptr<ConditionVariableInterface>(
Brian Silvermanfd788882016-09-10 16:56:20 -0400484 new PthreadConditionVariable(pshared, pi))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800485
486 private:
487 class PthreadConditionVariable : public ConditionVariableInterface {
488 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400489 PthreadConditionVariable(bool pshared, bool pi) {
490 {
491 pthread_condattr_t cond_attr;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700492 AOS_PRCHECK(pthread_condattr_init(&cond_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400493 if (pshared) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700494 AOS_PRCHECK(
Brian Silvermanfd788882016-09-10 16:56:20 -0400495 pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED));
496 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700497 AOS_PRCHECK(pthread_cond_init(&condition_, &cond_attr));
498 AOS_PRCHECK(pthread_condattr_destroy(&cond_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400499 }
500
501 {
502 pthread_mutexattr_t mutex_attr;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700503 AOS_PRCHECK(pthread_mutexattr_init(&mutex_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400504 if (pshared) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700505 AOS_PRCHECK(pthread_mutexattr_setpshared(&mutex_attr,
506 PTHREAD_PROCESS_SHARED));
Brian Silvermanfd788882016-09-10 16:56:20 -0400507 }
508 if (pi) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700509 AOS_PRCHECK(
Brian Silvermanfd788882016-09-10 16:56:20 -0400510 pthread_mutexattr_setprotocol(&mutex_attr, PTHREAD_PRIO_INHERIT));
511 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700512 AOS_PRCHECK(pthread_mutex_init(&mutex_, nullptr));
513 AOS_PRCHECK(pthread_mutexattr_destroy(&mutex_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400514 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800515 }
516 ~PthreadConditionVariable() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700517 AOS_PRCHECK(pthread_mutex_destroy(&mutex_));
518 AOS_PRCHECK(pthread_cond_destroy(&condition_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800519 }
520
521 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700522 void Lock() override { AOS_PRCHECK(pthread_mutex_lock(&mutex_)); }
523 void Unlock() override { AOS_PRCHECK(pthread_mutex_unlock(&mutex_)); }
524 void Wait() override {
525 AOS_PRCHECK(pthread_cond_wait(&condition_, &mutex_));
526 }
527 void Signal() override { AOS_PRCHECK(pthread_cond_broadcast(&condition_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800528
529 pthread_cond_t condition_;
530 pthread_mutex_t mutex_;
531 };
532};
533
534class EventFDPingPonger : public SemaphorePingPonger {
535 public:
536 EventFDPingPonger()
537 : SemaphorePingPonger(
538 ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore()),
539 ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore())) {}
540
541 private:
542 class EventFDSemaphore : public SemaphoreInterface {
543 public:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700544 EventFDSemaphore() : fd_(AOS_PCHECK(eventfd(0, 0))) {}
545 ~EventFDSemaphore() { AOS_PCHECK(close(fd_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800546
547 private:
548 void Get() override {
549 uint64_t value;
550 if (read(fd_, &value, sizeof(value)) != sizeof(value)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700551 AOS_PLOG(FATAL, "reading from eventfd %d failed\n", fd_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800552 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700553 AOS_CHECK_EQ(1u, value);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800554 }
555 void Put() override {
556 uint64_t value = 1;
557 if (write(fd_, &value, sizeof(value)) != sizeof(value)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700558 AOS_PLOG(FATAL, "writing to eventfd %d failed\n", fd_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800559 }
560 }
561
562 const int fd_;
563 };
564};
565
566class SysvSemaphorePingPonger : public SemaphorePingPonger {
567 public:
568 SysvSemaphorePingPonger()
569 : SemaphorePingPonger(
570 ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore()),
571 ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore())) {}
572
573 private:
574 class SysvSemaphore : public SemaphoreInterface {
575 public:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700576 SysvSemaphore() : sem_id_(AOS_PCHECK(semget(IPC_PRIVATE, 1, 0600))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800577
578 private:
579 void Get() override {
580 struct sembuf op;
581 op.sem_num = 0;
582 op.sem_op = -1;
583 op.sem_flg = 0;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700584 AOS_PCHECK(semop(sem_id_, &op, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800585 }
586 void Put() override {
587 struct sembuf op;
588 op.sem_num = 0;
589 op.sem_op = 1;
590 op.sem_flg = 0;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700591 AOS_PCHECK(semop(sem_id_, &op, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800592 }
593
594 const int sem_id_;
595 };
596};
597
598class PosixSemaphorePingPonger : public SemaphorePingPonger {
599 protected:
600 PosixSemaphorePingPonger(sem_t *ping_sem, sem_t *pong_sem)
601 : SemaphorePingPonger(
602 ::std::unique_ptr<SemaphoreInterface>(new PosixSemaphore(ping_sem)),
603 ::std::unique_ptr<SemaphoreInterface>(
604 new PosixSemaphore(pong_sem))) {}
605
606 private:
607 class PosixSemaphore : public SemaphoreInterface {
608 public:
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700609 PosixSemaphore(sem_t *sem) : sem_(sem) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800610
611 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700612 void Get() override { AOS_PCHECK(sem_wait(sem_)); }
613 void Put() override { AOS_PCHECK(sem_post(sem_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800614
615 sem_t *const sem_;
616 };
617};
618
619class SysvQueuePingPonger : public StaticPingPonger {
620 public:
621 SysvQueuePingPonger()
Austin Schuhf257f3c2019-10-27 21:00:43 -0700622 : ping_(AOS_PCHECK(msgget(IPC_PRIVATE, 0600))),
623 pong_(AOS_PCHECK(msgget(IPC_PRIVATE, 0600))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800624
625 const Data *Ping() override {
626 {
627 Message to_send;
628 memcpy(&to_send.data, PingData(), sizeof(Data));
Austin Schuhf257f3c2019-10-27 21:00:43 -0700629 AOS_PCHECK(msgsnd(ping_, &to_send, sizeof(Data), 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800630 }
631 {
632 Message received;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700633 AOS_PCHECK(msgrcv(pong_, &received, sizeof(Data), 1, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800634 memcpy(&pong_received_, &received.data, sizeof(Data));
635 }
636 return &pong_received_;
637 }
638
639 const Data *Wait() override {
640 {
641 Message received;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700642 AOS_PCHECK(msgrcv(ping_, &received, sizeof(Data), 1, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800643 memcpy(&ping_received_, &received.data, sizeof(Data));
644 }
645 return &ping_received_;
646 }
647
648 virtual void Pong() override {
649 Message to_send;
650 memcpy(&to_send.data, PongData(), sizeof(Data));
Austin Schuhf257f3c2019-10-27 21:00:43 -0700651 AOS_PCHECK(msgsnd(pong_, &to_send, sizeof(Data), 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800652 }
653
654 private:
655 struct Message {
656 long mtype = 1;
657 char data[sizeof(Data)];
658 };
659
660 Data ping_received_, pong_received_;
661
662 const int ping_, pong_;
663};
664
665class PosixQueuePingPonger : public StaticPingPonger {
666 public:
667 PosixQueuePingPonger() : ping_(Open("/ping")), pong_(Open("/pong")) {}
668 ~PosixQueuePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700669 AOS_PCHECK(mq_close(ping_));
670 AOS_PCHECK(mq_close(pong_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800671 }
672
673 const Data *Ping() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700674 AOS_PCHECK(mq_send(ping_,
675 static_cast<char *>(static_cast<void *>(PingData())),
676 sizeof(Data), 1));
677 AOS_PCHECK(mq_receive(
678 pong_, static_cast<char *>(static_cast<void *>(&pong_received_)),
679 sizeof(Data), nullptr));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800680 return &pong_received_;
681 }
682
683 const Data *Wait() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700684 AOS_PCHECK(mq_receive(
685 ping_, static_cast<char *>(static_cast<void *>(&ping_received_)),
686 sizeof(Data), nullptr));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800687 return &ping_received_;
688 }
689
690 virtual void Pong() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700691 AOS_PCHECK(mq_send(pong_,
692 static_cast<char *>(static_cast<void *>(PongData())),
693 sizeof(Data), 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800694 }
695
696 private:
697 mqd_t Open(const char *name) {
698 if (mq_unlink(name) == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700699 AOS_PLOG(FATAL, "mq_unlink(%s) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800700 }
701 struct mq_attr attr;
702 attr.mq_flags = 0;
703 attr.mq_maxmsg = 1;
704 attr.mq_msgsize = sizeof(Data);
705 attr.mq_curmsgs = 0;
706 const mqd_t r = mq_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, &attr);
707 if (r == reinterpret_cast<mqd_t>(-1)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700708 AOS_PLOG(FATAL, "mq_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800709 }
710 return r;
711 }
712
713 const mqd_t ping_, pong_;
714 Data ping_received_, pong_received_;
715};
716
717class PosixUnnamedSemaphorePingPonger : public PosixSemaphorePingPonger {
718 public:
719 PosixUnnamedSemaphorePingPonger(int pshared)
720 : PosixSemaphorePingPonger(&ping_sem_, &pong_sem_) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700721 AOS_PCHECK(sem_init(&ping_sem_, pshared, 0));
722 AOS_PCHECK(sem_init(&pong_sem_, pshared, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800723 }
724 ~PosixUnnamedSemaphorePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700725 AOS_PCHECK(sem_destroy(&ping_sem_));
726 AOS_PCHECK(sem_destroy(&pong_sem_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800727 }
728
729 private:
730 sem_t ping_sem_, pong_sem_;
731};
732
733class PosixNamedSemaphorePingPonger : public PosixSemaphorePingPonger {
734 public:
735 PosixNamedSemaphorePingPonger()
736 : PosixSemaphorePingPonger(ping_sem_ = Open("/ping"),
737 pong_sem_ = Open("/pong")) {}
738 ~PosixNamedSemaphorePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700739 AOS_PCHECK(sem_close(ping_sem_));
740 AOS_PCHECK(sem_close(pong_sem_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800741 }
742
743 private:
744 sem_t *Open(const char *name) {
745 if (sem_unlink(name) == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700746 AOS_PLOG(FATAL, "shm_unlink(%s) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800747 }
748 sem_t *const r = sem_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, 0);
749 if (r == SEM_FAILED) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700750 AOS_PLOG(FATAL, "sem_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800751 }
752 return r;
753 }
754
755 sem_t *ping_sem_, *pong_sem_;
756};
757
758class AOSQueuePingPonger : public PingPongerInterface {
759 public:
760 AOSQueuePingPonger()
761 : ping_queue_(RawQueue::Fetch("ping", sizeof(Data), 0, 1)),
762 pong_queue_(RawQueue::Fetch("pong", sizeof(Data), 0, 1)) {}
763
764 Data *PingData() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700765 AOS_CHECK_EQ(nullptr, ping_to_send_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800766 ping_to_send_ = static_cast<Data *>(ping_queue_->GetMessage());
767 return ping_to_send_;
768 }
769
770 const Data *Ping() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700771 AOS_CHECK_NE(nullptr, ping_to_send_);
772 AOS_CHECK(ping_queue_->WriteMessage(ping_to_send_, RawQueue::kBlock));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800773 ping_to_send_ = nullptr;
774 pong_queue_->FreeMessage(pong_received_);
775 pong_received_ =
776 static_cast<const Data *>(pong_queue_->ReadMessage(RawQueue::kBlock));
777 return pong_received_;
778 }
779
780 const Data *Wait() override {
781 ping_queue_->FreeMessage(ping_received_);
782 ping_received_ =
783 static_cast<const Data *>(ping_queue_->ReadMessage(RawQueue::kBlock));
784 return ping_received_;
785 }
786
787 Data *PongData() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700788 AOS_CHECK_EQ(nullptr, pong_to_send_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800789 pong_to_send_ = static_cast<Data *>(pong_queue_->GetMessage());
790 return pong_to_send_;
791 }
792
793 void Pong() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700794 AOS_CHECK_NE(nullptr, pong_to_send_);
795 AOS_CHECK(pong_queue_->WriteMessage(pong_to_send_, RawQueue::kBlock));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800796 pong_to_send_ = nullptr;
797 }
798
799 private:
800 RawQueue *const ping_queue_;
801 RawQueue *const pong_queue_;
802
803 Data *ping_to_send_ = nullptr, *pong_to_send_ = nullptr;
804 const Data *ping_received_ = nullptr, *pong_received_ = nullptr;
805};
806
807int Main(int /*argc*/, char **argv) {
808 ::std::unique_ptr<PingPongerInterface> ping_ponger;
809 if (FLAGS_method == "pipe") {
810 ping_ponger.reset(new PipePingPonger());
811 } else if (FLAGS_method == "named_pipe") {
812 ping_ponger.reset(new NamedPipePingPonger());
813 } else if (FLAGS_method == "aos_mutex") {
814 ping_ponger.reset(new AOSMutexPingPonger());
815 } else if (FLAGS_method == "aos_event") {
816 ping_ponger.reset(new AOSEventPingPonger());
817 } else if (FLAGS_method == "pthread_mutex") {
Brian Silvermanfd788882016-09-10 16:56:20 -0400818 ping_ponger.reset(new PthreadMutexPingPonger(false, false));
819 } else if (FLAGS_method == "pthread_mutex_pshared") {
820 ping_ponger.reset(new PthreadMutexPingPonger(true, false));
821 } else if (FLAGS_method == "pthread_mutex_pshared_pi") {
822 ping_ponger.reset(new PthreadMutexPingPonger(true, true));
823 } else if (FLAGS_method == "pthread_mutex_pi") {
824 ping_ponger.reset(new PthreadMutexPingPonger(false, true));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800825 } else if (FLAGS_method == "aos_queue") {
826 ping_ponger.reset(new AOSQueuePingPonger());
827 } else if (FLAGS_method == "eventfd") {
828 ping_ponger.reset(new EventFDPingPonger());
829 } else if (FLAGS_method == "sysv_semaphore") {
830 ping_ponger.reset(new SysvSemaphorePingPonger());
831 } else if (FLAGS_method == "sysv_queue") {
832 ping_ponger.reset(new SysvQueuePingPonger());
833 } else if (FLAGS_method == "posix_semaphore_unnamed_shared") {
834 ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(1));
835 } else if (FLAGS_method == "posix_semaphore_unnamed_unshared") {
836 ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(0));
837 } else if (FLAGS_method == "posix_semaphore_named") {
838 ping_ponger.reset(new PosixNamedSemaphorePingPonger());
839 } else if (FLAGS_method == "posix_queue") {
840 ping_ponger.reset(new PosixQueuePingPonger());
841 } else if (FLAGS_method == "unix_stream") {
842 ping_ponger.reset(new UnixPingPonger(SOCK_STREAM));
843 } else if (FLAGS_method == "unix_datagram") {
844 ping_ponger.reset(new UnixPingPonger(SOCK_DGRAM));
845 } else if (FLAGS_method == "unix_seqpacket") {
846 ping_ponger.reset(new UnixPingPonger(SOCK_SEQPACKET));
847 } else if (FLAGS_method == "tcp") {
Brian Silvermanfd788882016-09-10 16:56:20 -0400848 ping_ponger.reset(new TCPPingPonger(false));
849 } else if (FLAGS_method == "tcp_nodelay") {
850 ping_ponger.reset(new TCPPingPonger(true));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800851 } else if (FLAGS_method == "udp") {
852 ping_ponger.reset(new UDPPingPonger());
853 } else {
854 fprintf(stderr, "Unknown IPC method to test '%s'\n", FLAGS_method.c_str());
855 ::gflags::ShowUsageWithFlags(argv[0]);
856 return 1;
857 }
858
Brian Silverman1d42ce22016-09-10 16:55:40 -0400859 ::std::atomic<bool> done{false};
Brian Silvermane4d8b282015-12-24 13:44:48 -0800860
861 ::std::thread server([&ping_ponger, &done]() {
862 if (FLAGS_server_priority > 0) {
863 SetCurrentThreadRealtimePriority(FLAGS_server_priority);
864 }
865 PinCurrentThreadToCPU(FLAGS_server_cpu);
866
867 while (!done) {
868 const PingPongerInterface::Data *data = ping_ponger->Wait();
869 PingPongerInterface::Data *response = ping_ponger->PongData();
870 for (size_t i = 0; i < sizeof(*data); ++i) {
871 (*response)[i] = (*data)[i] + 1;
872 }
873 ping_ponger->Pong();
874 }
875 });
876
877 if (FLAGS_client_priority > 0) {
878 SetCurrentThreadRealtimePriority(FLAGS_client_priority);
879 }
880 PinCurrentThreadToCPU(FLAGS_client_cpu);
881
882 // Warm everything up.
883 for (int i = 0; i < 1000; ++i) {
884 PingPongerInterface::Data *warmup_data = ping_ponger->PingData();
885 memset(*warmup_data, i % 255, sizeof(*warmup_data));
886 ping_ponger->Ping();
887 }
888
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800889 const monotonic_clock::time_point start = monotonic_clock::now();
Brian Silvermane4d8b282015-12-24 13:44:48 -0800890
891 for (int32_t i = 0; i < FLAGS_messages; ++i) {
892 PingPongerInterface::Data *to_send = ping_ponger->PingData();
893 memset(*to_send, i % 123, sizeof(*to_send));
894 const PingPongerInterface::Data *received = ping_ponger->Ping();
895 for (size_t ii = 0; ii < sizeof(*received); ++ii) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700896 AOS_CHECK_EQ(((i % 123) + 1) % 255, (*received)[ii]);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800897 }
898 }
899
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800900 const monotonic_clock::time_point end = monotonic_clock::now();
Brian Silvermane4d8b282015-12-24 13:44:48 -0800901
Brian Silverman1d42ce22016-09-10 16:55:40 -0400902 // Try to make sure the server thread gets past its check of done so our
903 // Ping() down below doesn't hang. Kind of lame, but doing better would
904 // require complicating the interface to each implementation which isn't worth
905 // it here.
906 ::std::this_thread::sleep_for(::std::chrono::milliseconds(200));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800907 done = true;
908 ping_ponger->PingData();
909 ping_ponger->Ping();
910 server.join();
911
Austin Schuhf257f3c2019-10-27 21:00:43 -0700912 AOS_LOG(INFO, "Took %f seconds to send %" PRId32 " messages\n",
913 ::aos::time::DurationInSeconds(end - start), FLAGS_messages);
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800914 const chrono::nanoseconds per_message = (end - start) / FLAGS_messages;
915 if (per_message >= chrono::seconds(1)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700916 AOS_LOG(INFO, "More than 1 second per message ?!?\n");
Brian Silvermane4d8b282015-12-24 13:44:48 -0800917 } else {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700918 AOS_LOG(INFO, "That is %" PRId32 " nanoseconds per message\n",
919 static_cast<int>(per_message.count()));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800920 }
921
922 return 0;
923}
924
925} // namespace aos
926
927int main(int argc, char **argv) {
928 ::gflags::SetUsageMessage(
929 ::std::string("Compares various forms of IPC. Usage:\n") + argv[0] +
930 " --method=METHOD\n"
931 "METHOD can be one of the following:\n"
932 "\tpipe\n"
933 "\tnamed_pipe\n"
934 "\taos_mutex\n"
935 "\taos_event\n"
936 "\tpthread_mutex\n"
Brian Silvermanfd788882016-09-10 16:56:20 -0400937 "\tpthread_mutex_pshared\n"
938 "\tpthread_mutex_pshared_pi\n"
939 "\tpthread_mutex_pi\n"
Brian Silvermane4d8b282015-12-24 13:44:48 -0800940 "\taos_queue\n"
941 "\teventfd\n"
942 "\tsysv_semaphore\n"
943 "\tsysv_queue\n"
944 "\tposix_semaphore_unnamed_shared\n"
945 "\tposix_semaphore_unnamed_unshared\n"
946 "\tposix_semaphore_named\n"
947 "\tposix_queue\n"
948 "\tunix_stream\n"
949 "\tunix_datagram\n"
950 "\tunix_seqpacket\n"
951 "\ttcp\n"
Brian Silvermanfd788882016-09-10 16:56:20 -0400952 "\ttcp_nodelay\n"
Brian Silvermane4d8b282015-12-24 13:44:48 -0800953 "\tudp\n");
954 ::gflags::ParseCommandLineFlags(&argc, &argv, true);
955
956 ::aos::InitNRT();
957 ::aos::logging::AddImplementation(
958 new ::aos::logging::StreamLogImplementation(stdout));
959
960 return ::aos::Main(argc, argv);
961}