blob: cf5581e539bdcfcf4b48a969004ccc798e235610 [file] [log] [blame]
Austin Schuh87dc7bb2018-10-29 21:53:23 -07001#include "gflags/gflags.h"
Brian Silvermane4d8b282015-12-24 13:44:48 -08002
Austin Schuhf2a50ba2016-12-24 16:16:26 -08003#include <fcntl.h>
4#include <mqueue.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -08005#include <netinet/in.h>
Brian Silvermanfd788882016-09-10 16:56:20 -04006#include <netinet/tcp.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -08007#include <pthread.h>
8#include <semaphore.h>
9#include <stdint.h>
10#include <sys/eventfd.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -080011#include <sys/msg.h>
12#include <sys/sem.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -080013#include <sys/socket.h>
14#include <sys/stat.h>
15#include <sys/types.h>
16#include <sys/un.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -080017
Austin Schuhf2a50ba2016-12-24 16:16:26 -080018#include <atomic>
19#include <chrono>
Brian Silvermane4d8b282015-12-24 13:44:48 -080020#include <memory>
21#include <string>
Austin Schuhf2a50ba2016-12-24 16:16:26 -080022#include <thread>
Brian Silvermane4d8b282015-12-24 13:44:48 -080023
John Park33858a32018-09-28 23:05:48 -070024#include "aos/condition.h"
25#include "aos/event.h"
James Kuszmaul651fc3f2019-05-15 21:14:25 -070026#include "aos/init.h"
27#include "aos/ipc_lib/queue.h"
John Park33858a32018-09-28 23:05:48 -070028#include "aos/logging/implementations.h"
29#include "aos/logging/logging.h"
30#include "aos/mutex/mutex.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070031#include "aos/realtime.h"
John Park33858a32018-09-28 23:05:48 -070032#include "aos/time/time.h"
Brian Silvermane4d8b282015-12-24 13:44:48 -080033
34DEFINE_string(method, "", "Which IPC method to use");
35DEFINE_int32(messages, 1000000, "How many messages to send back and forth");
36DEFINE_int32(client_cpu, 0, "CPU to pin client to");
37DEFINE_int32(server_cpu, 0, "CPU to pin server to");
38DEFINE_int32(client_priority, 1,
39 "Realtime priority for client. Negative for don't change");
40DEFINE_int32(server_priority, 1,
41 "Realtime priority for server. Negative for don't change");
42
43namespace aos {
44
Austin Schuhf2a50ba2016-12-24 16:16:26 -080045namespace chrono = ::std::chrono;
46
Brian Silvermane4d8b282015-12-24 13:44:48 -080047// A generic interface for an object which can send some data to another thread
48// and back.
49//
50// One side is called the "server". It should constantly Wait, do something with
51// the result, and then call Pong.
52// The other side is called the "client". It should repeatedly call Ping.
53class PingPongerInterface {
54 public:
55 // A chunk of memory definitely on its own cache line anywhere sane.
56 typedef uint8_t Data[1024] __attribute__((aligned(128)));
57
58 virtual ~PingPongerInterface() {}
59
60 // Returns where the "client" side should write data in preparation to send to
61 // the server.
62 // The result is valid until the next Ping call.
63 virtual Data *PingData() = 0;
64
65 // Sends the data returned from the most recent PingData call to the "server"
66 // side and returns its response.
67 // PingData must be called exactly once before each call of this method.
68 // The result is valid until the next PingData call.
69 virtual const Data *Ping() = 0;
70
71 // Waits for a Ping call and then returns the associated data.
72 // The result is valid until the beginning of the next Pong call.
73 virtual const Data *Wait() = 0;
74
75 // Returns where the "server" side should write data in preparation to send
76 // back to the "client".
77 // The result is valid until the next Pong call.
78 virtual Data *PongData() = 0;
79
80 // Sends data back to an in-progress Ping.
81 // Sends the data returned from the most recent PongData call back to an
82 // in-progress Ping.
83 // PongData must be called exactly once before each call of this method.
84 virtual void Pong() = 0;
85};
86
87// Base class for implementations which simple use a pair of Data objects for
88// all Pings and Pongs.
89class StaticPingPonger : public PingPongerInterface {
90 public:
91 Data *PingData() override { return &ping_data_; }
92 Data *PongData() override { return &pong_data_; }
93
94 private:
95 Data ping_data_, pong_data_;
96};
97
98// Implements ping-pong by sending the data over file descriptors.
99class FDPingPonger : public StaticPingPonger {
100 protected:
101 // Subclasses must override and call Init.
102 FDPingPonger() {}
103
104 // Subclasses must call this in their constructor.
105 // Does not take ownership of any of the file descriptors, any/all of which
106 // may be the same.
107 // {server,client}_read must be open for reading and {server,client}_write
108 // must be open for writing.
109 void Init(int server_read, int server_write, int client_read,
110 int client_write) {
111 server_read_ = server_read;
112 server_write_ = server_write;
113 client_read_ = client_read;
114 client_write_ = client_write;
115 }
116
117 private:
118 const Data *Ping() override {
119 WriteFully(client_write_, *PingData());
120 ReadFully(client_read_, &read_by_client_);
121 return &read_by_client_;
122 }
123
124 const Data *Wait() override {
125 ReadFully(server_read_, &read_by_server_);
126 return &read_by_server_;
127 }
128
129 void Pong() override { WriteFully(server_write_, *PongData()); }
130
131 void ReadFully(int fd, Data *data) {
132 size_t remaining = sizeof(*data);
133 uint8_t *current = &(*data)[0];
134 while (remaining > 0) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700135 const ssize_t result = AOS_PCHECK(read(fd, current, remaining));
136 AOS_CHECK_LE(static_cast<size_t>(result), remaining);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800137 remaining -= result;
138 current += result;
139 }
140 }
141
142 void WriteFully(int fd, const Data &data) {
143 size_t remaining = sizeof(data);
144 const uint8_t *current = &data[0];
145 while (remaining > 0) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700146 const ssize_t result = AOS_PCHECK(write(fd, current, remaining));
147 AOS_CHECK_LE(static_cast<size_t>(result), remaining);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800148 remaining -= result;
149 current += result;
150 }
151 }
152
153 Data read_by_client_, read_by_server_;
154 int server_read_ = -1, server_write_ = -1, client_read_ = -1,
155 client_write_ = -1;
156};
157
158class PipePingPonger : public FDPingPonger {
159 public:
160 PipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700161 AOS_PCHECK(pipe(to_server));
162 AOS_PCHECK(pipe(from_server));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800163 Init(to_server[0], from_server[1], from_server[0], to_server[1]);
164 }
165 ~PipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700166 AOS_PCHECK(close(to_server[0]));
167 AOS_PCHECK(close(to_server[1]));
168 AOS_PCHECK(close(from_server[0]));
169 AOS_PCHECK(close(from_server[1]));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800170 }
171
172 private:
173 int to_server[2], from_server[2];
174};
175
176class NamedPipePingPonger : public FDPingPonger {
177 public:
178 NamedPipePingPonger() {
179 OpenFifo("/tmp/to_server", &client_write_, &server_read_);
180 OpenFifo("/tmp/from_server", &server_write_, &client_read_);
181
182 Init(server_read_, server_write_, client_read_, client_write_);
183 }
184 ~NamedPipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700185 AOS_PCHECK(close(server_read_));
186 AOS_PCHECK(close(client_write_));
187 AOS_PCHECK(close(client_read_));
188 AOS_PCHECK(close(server_write_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800189 }
190
191 private:
192 void OpenFifo(const char *name, int *write, int *read) {
193 {
194 const int ret = unlink(name);
195 if (ret == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700196 AOS_PLOG(FATAL, "unlink(%s)", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800197 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700198 AOS_PCHECK(mkfifo(name, S_IWUSR | S_IRUSR));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800199 // Have to open it nonblocking because the other end isn't open yet...
Austin Schuhf257f3c2019-10-27 21:00:43 -0700200 *read = AOS_PCHECK(open(name, O_RDONLY | O_NONBLOCK));
201 *write = AOS_PCHECK(open(name, O_WRONLY));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800202 {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700203 const int flags = AOS_PCHECK(fcntl(*read, F_GETFL));
204 AOS_PCHECK(fcntl(*read, F_SETFL, flags & ~O_NONBLOCK));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800205 }
206 }
207 }
208
209 int server_read_, server_write_, client_read_, client_write_;
210};
211
212class UnixPingPonger : public FDPingPonger {
213 public:
214 UnixPingPonger(int type) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700215 AOS_PCHECK(socketpair(AF_UNIX, type, 0, to_server));
216 AOS_PCHECK(socketpair(AF_UNIX, type, 0, from_server));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800217 Init(to_server[0], from_server[1], from_server[0], to_server[1]);
218 }
219 ~UnixPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700220 AOS_PCHECK(close(to_server[0]));
221 AOS_PCHECK(close(to_server[1]));
222 AOS_PCHECK(close(from_server[0]));
223 AOS_PCHECK(close(from_server[1]));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800224 }
225
226 private:
227 int to_server[2], from_server[2];
228};
229
230class TCPPingPonger : public FDPingPonger {
231 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400232 TCPPingPonger(bool nodelay) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700233 server_ = AOS_PCHECK(socket(AF_INET, SOCK_STREAM, 0));
Brian Silvermanfd788882016-09-10 16:56:20 -0400234 if (nodelay) {
235 const int yes = 1;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700236 AOS_PCHECK(
237 setsockopt(server_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
Brian Silvermanfd788882016-09-10 16:56:20 -0400238 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800239 {
240 sockaddr_in server_address;
241 memset(&server_address, 0, sizeof(server_address));
242 server_address.sin_family = AF_INET;
243 server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700244 AOS_PCHECK(bind(server_, reinterpret_cast<sockaddr *>(&server_address),
245 sizeof(server_address)));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800246 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700247 AOS_PCHECK(listen(server_, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800248
Austin Schuhf257f3c2019-10-27 21:00:43 -0700249 client_ = AOS_PCHECK(socket(AF_INET, SOCK_STREAM, 0));
Brian Silvermanfd788882016-09-10 16:56:20 -0400250 if (nodelay) {
251 const int yes = 1;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700252 AOS_PCHECK(
253 setsockopt(client_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
Brian Silvermanfd788882016-09-10 16:56:20 -0400254 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800255 {
256 sockaddr_in client_address;
257 unsigned int length = sizeof(client_address);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700258 AOS_PCHECK(getsockname(
259 server_, reinterpret_cast<sockaddr *>(&client_address), &length));
260 AOS_PCHECK(connect(client_, reinterpret_cast<sockaddr *>(&client_address),
261 length));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800262 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700263 server_connection_ = AOS_PCHECK(accept(server_, nullptr, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800264
265 Init(server_connection_, server_connection_, client_, client_);
266 }
267 ~TCPPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700268 AOS_PCHECK(close(client_));
269 AOS_PCHECK(close(server_connection_));
270 AOS_PCHECK(close(server_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800271 }
272
273 private:
274 int server_, client_, server_connection_;
275};
276
277class UDPPingPonger : public FDPingPonger {
278 public:
279 UDPPingPonger() {
280 CreatePair(&server_read_, &client_write_);
281 CreatePair(&client_read_, &server_write_);
282
283 Init(server_read_, server_write_, client_read_, client_write_);
284 }
285 ~UDPPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700286 AOS_PCHECK(close(server_read_));
287 AOS_PCHECK(close(client_write_));
288 AOS_PCHECK(close(client_read_));
289 AOS_PCHECK(close(server_write_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800290 }
291
292 private:
293 void CreatePair(int *server, int *client) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700294 *server = AOS_PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800295 {
296 sockaddr_in server_address;
297 memset(&server_address, 0, sizeof(server_address));
298 server_address.sin_family = AF_INET;
299 server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
300 // server_address.sin_port = htons(server_ + 3000);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700301 AOS_PCHECK(bind(*server, reinterpret_cast<sockaddr *>(&server_address),
302 sizeof(server_address)));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800303 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700304 *client = AOS_PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800305 {
306 sockaddr_in client_address;
307 unsigned int length = sizeof(client_address);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700308 AOS_PCHECK(getsockname(
309 *server, reinterpret_cast<sockaddr *>(&client_address), &length));
310 AOS_PCHECK(connect(*client, reinterpret_cast<sockaddr *>(&client_address),
311 length));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800312 }
313 }
314
315 int server_read_, server_write_, client_read_, client_write_;
316};
317
318// Implements ping-pong without copying the data using a condition variable-like
319// interface.
320class ConditionVariablePingPonger : public StaticPingPonger {
321 protected:
322 // Represents a condition variable bundled with a mutex.
323 //
324 // Wait may return spuriously.
325 class ConditionVariableInterface {
326 public:
327 virtual ~ConditionVariableInterface() {}
328
329 // Locks the mutex.
330 virtual void Lock() = 0;
331
332 // Unlocks the mutex.
333 virtual void Unlock() = 0;
334
335 // Waits on the condition variable.
336 //
337 // The mutex must be locked when this is called.
338 virtual void Wait() = 0;
339
340 // Signals the condition variable.
341 //
342 // The mutex does not have to be locked during this.
343 virtual void Signal() = 0;
344 };
345
346 ConditionVariablePingPonger(
347 ::std::unique_ptr<ConditionVariableInterface> ping,
348 ::std::unique_ptr<ConditionVariableInterface> pong)
349 : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
350
351 private:
352 const Data *Ping() override {
353 ping_->Lock();
354 to_server_ = PingData();
355 ping_->Unlock();
356 ping_->Signal();
357 pong_->Lock();
358 while (from_server_ == nullptr) {
359 pong_->Wait();
360 }
361 const Data *r = from_server_;
362 from_server_ = nullptr;
363 pong_->Unlock();
364 return r;
365 }
366
367 const Data *Wait() override {
368 ping_->Lock();
369 while (to_server_ == nullptr) {
370 ping_->Wait();
371 }
372 const Data *r = to_server_;
373 to_server_ = nullptr;
374 ping_->Unlock();
375 return r;
376 }
377
378 void Pong() override {
379 pong_->Lock();
380 from_server_ = PongData();
381 pong_->Unlock();
382 pong_->Signal();
383 }
384
385 const Data *to_server_ = nullptr, *from_server_ = nullptr;
386 const ::std::unique_ptr<ConditionVariableInterface> ping_, pong_;
387};
388
389// Implements ping-pong without copying the data using a semaphore-like
390// interface.
391class SemaphorePingPonger : public StaticPingPonger {
392 protected:
393 // Represents a semaphore, which need only count to 1.
394 //
395 // The behavior when calling Get/Put in anything other than alternating order
396 // is undefined.
397 //
398 // Wait may NOT return spuriously.
399 class SemaphoreInterface {
400 public:
401 virtual ~SemaphoreInterface() {}
402
403 virtual void Get() = 0;
404 virtual void Put() = 0;
405 };
406
407 SemaphorePingPonger(::std::unique_ptr<SemaphoreInterface> ping,
408 ::std::unique_ptr<SemaphoreInterface> pong)
409 : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
410
411 private:
412 const Data *Ping() override {
413 to_server_ = PingData();
414 ping_->Put();
415 pong_->Get();
416 return from_server_;
417 }
418
419 const Data *Wait() override {
420 ping_->Get();
421 return to_server_;
422 }
423
424 void Pong() override {
425 from_server_ = PongData();
426 pong_->Put();
427 }
428
429 const Data *to_server_ = nullptr, *from_server_ = nullptr;
430 const ::std::unique_ptr<SemaphoreInterface> ping_, pong_;
431};
432
Brian Silvermane4d8b282015-12-24 13:44:48 -0800433class AOSMutexPingPonger : public ConditionVariablePingPonger {
434 public:
435 AOSMutexPingPonger()
436 : ConditionVariablePingPonger(
437 ::std::unique_ptr<ConditionVariableInterface>(
438 new AOSConditionVariable()),
439 ::std::unique_ptr<ConditionVariableInterface>(
440 new AOSConditionVariable())) {}
441
442 private:
443 class AOSConditionVariable : public ConditionVariableInterface {
444 public:
445 AOSConditionVariable() : condition_(&mutex_) {}
446
447 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700448 void Lock() override { AOS_CHECK(!mutex_.Lock()); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800449 void Unlock() override { mutex_.Unlock(); }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700450 void Wait() override { AOS_CHECK(!condition_.Wait()); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800451 void Signal() override { condition_.Broadcast(); }
452
453 Mutex mutex_;
454 Condition condition_;
455 };
456};
457
458class AOSEventPingPonger : public SemaphorePingPonger {
459 public:
460 AOSEventPingPonger()
461 : SemaphorePingPonger(
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700462 ::std::unique_ptr<SemaphoreInterface>(new AOSEventSemaphore()),
463 ::std::unique_ptr<SemaphoreInterface>(new AOSEventSemaphore())) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800464
465 private:
466 class AOSEventSemaphore : public SemaphoreInterface {
467 private:
468 void Get() override {
469 event_.Wait();
470 event_.Clear();
471 }
472 void Put() override { event_.Set(); }
473
474 Event event_;
475 };
476};
477
478class PthreadMutexPingPonger : public ConditionVariablePingPonger {
479 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400480 PthreadMutexPingPonger(int pshared, bool pi)
Brian Silvermane4d8b282015-12-24 13:44:48 -0800481 : ConditionVariablePingPonger(
482 ::std::unique_ptr<ConditionVariableInterface>(
Brian Silvermanfd788882016-09-10 16:56:20 -0400483 new PthreadConditionVariable(pshared, pi)),
Brian Silvermane4d8b282015-12-24 13:44:48 -0800484 ::std::unique_ptr<ConditionVariableInterface>(
Brian Silvermanfd788882016-09-10 16:56:20 -0400485 new PthreadConditionVariable(pshared, pi))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800486
487 private:
488 class PthreadConditionVariable : public ConditionVariableInterface {
489 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400490 PthreadConditionVariable(bool pshared, bool pi) {
491 {
492 pthread_condattr_t cond_attr;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700493 AOS_PRCHECK(pthread_condattr_init(&cond_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400494 if (pshared) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700495 AOS_PRCHECK(
Brian Silvermanfd788882016-09-10 16:56:20 -0400496 pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED));
497 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700498 AOS_PRCHECK(pthread_cond_init(&condition_, &cond_attr));
499 AOS_PRCHECK(pthread_condattr_destroy(&cond_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400500 }
501
502 {
503 pthread_mutexattr_t mutex_attr;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700504 AOS_PRCHECK(pthread_mutexattr_init(&mutex_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400505 if (pshared) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700506 AOS_PRCHECK(pthread_mutexattr_setpshared(&mutex_attr,
507 PTHREAD_PROCESS_SHARED));
Brian Silvermanfd788882016-09-10 16:56:20 -0400508 }
509 if (pi) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700510 AOS_PRCHECK(
Brian Silvermanfd788882016-09-10 16:56:20 -0400511 pthread_mutexattr_setprotocol(&mutex_attr, PTHREAD_PRIO_INHERIT));
512 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700513 AOS_PRCHECK(pthread_mutex_init(&mutex_, nullptr));
514 AOS_PRCHECK(pthread_mutexattr_destroy(&mutex_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400515 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800516 }
517 ~PthreadConditionVariable() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700518 AOS_PRCHECK(pthread_mutex_destroy(&mutex_));
519 AOS_PRCHECK(pthread_cond_destroy(&condition_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800520 }
521
522 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700523 void Lock() override { AOS_PRCHECK(pthread_mutex_lock(&mutex_)); }
524 void Unlock() override { AOS_PRCHECK(pthread_mutex_unlock(&mutex_)); }
525 void Wait() override {
526 AOS_PRCHECK(pthread_cond_wait(&condition_, &mutex_));
527 }
528 void Signal() override { AOS_PRCHECK(pthread_cond_broadcast(&condition_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800529
530 pthread_cond_t condition_;
531 pthread_mutex_t mutex_;
532 };
533};
534
535class EventFDPingPonger : public SemaphorePingPonger {
536 public:
537 EventFDPingPonger()
538 : SemaphorePingPonger(
539 ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore()),
540 ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore())) {}
541
542 private:
543 class EventFDSemaphore : public SemaphoreInterface {
544 public:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700545 EventFDSemaphore() : fd_(AOS_PCHECK(eventfd(0, 0))) {}
546 ~EventFDSemaphore() { AOS_PCHECK(close(fd_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800547
548 private:
549 void Get() override {
550 uint64_t value;
551 if (read(fd_, &value, sizeof(value)) != sizeof(value)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700552 AOS_PLOG(FATAL, "reading from eventfd %d failed\n", fd_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800553 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700554 AOS_CHECK_EQ(1u, value);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800555 }
556 void Put() override {
557 uint64_t value = 1;
558 if (write(fd_, &value, sizeof(value)) != sizeof(value)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700559 AOS_PLOG(FATAL, "writing to eventfd %d failed\n", fd_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800560 }
561 }
562
563 const int fd_;
564 };
565};
566
567class SysvSemaphorePingPonger : public SemaphorePingPonger {
568 public:
569 SysvSemaphorePingPonger()
570 : SemaphorePingPonger(
571 ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore()),
572 ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore())) {}
573
574 private:
575 class SysvSemaphore : public SemaphoreInterface {
576 public:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700577 SysvSemaphore() : sem_id_(AOS_PCHECK(semget(IPC_PRIVATE, 1, 0600))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800578
579 private:
580 void Get() override {
581 struct sembuf op;
582 op.sem_num = 0;
583 op.sem_op = -1;
584 op.sem_flg = 0;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700585 AOS_PCHECK(semop(sem_id_, &op, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800586 }
587 void Put() override {
588 struct sembuf op;
589 op.sem_num = 0;
590 op.sem_op = 1;
591 op.sem_flg = 0;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700592 AOS_PCHECK(semop(sem_id_, &op, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800593 }
594
595 const int sem_id_;
596 };
597};
598
599class PosixSemaphorePingPonger : public SemaphorePingPonger {
600 protected:
601 PosixSemaphorePingPonger(sem_t *ping_sem, sem_t *pong_sem)
602 : SemaphorePingPonger(
603 ::std::unique_ptr<SemaphoreInterface>(new PosixSemaphore(ping_sem)),
604 ::std::unique_ptr<SemaphoreInterface>(
605 new PosixSemaphore(pong_sem))) {}
606
607 private:
608 class PosixSemaphore : public SemaphoreInterface {
609 public:
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700610 PosixSemaphore(sem_t *sem) : sem_(sem) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800611
612 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700613 void Get() override { AOS_PCHECK(sem_wait(sem_)); }
614 void Put() override { AOS_PCHECK(sem_post(sem_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800615
616 sem_t *const sem_;
617 };
618};
619
620class SysvQueuePingPonger : public StaticPingPonger {
621 public:
622 SysvQueuePingPonger()
Austin Schuhf257f3c2019-10-27 21:00:43 -0700623 : ping_(AOS_PCHECK(msgget(IPC_PRIVATE, 0600))),
624 pong_(AOS_PCHECK(msgget(IPC_PRIVATE, 0600))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800625
626 const Data *Ping() override {
627 {
628 Message to_send;
629 memcpy(&to_send.data, PingData(), sizeof(Data));
Austin Schuhf257f3c2019-10-27 21:00:43 -0700630 AOS_PCHECK(msgsnd(ping_, &to_send, sizeof(Data), 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800631 }
632 {
633 Message received;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700634 AOS_PCHECK(msgrcv(pong_, &received, sizeof(Data), 1, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800635 memcpy(&pong_received_, &received.data, sizeof(Data));
636 }
637 return &pong_received_;
638 }
639
640 const Data *Wait() override {
641 {
642 Message received;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700643 AOS_PCHECK(msgrcv(ping_, &received, sizeof(Data), 1, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800644 memcpy(&ping_received_, &received.data, sizeof(Data));
645 }
646 return &ping_received_;
647 }
648
649 virtual void Pong() override {
650 Message to_send;
651 memcpy(&to_send.data, PongData(), sizeof(Data));
Austin Schuhf257f3c2019-10-27 21:00:43 -0700652 AOS_PCHECK(msgsnd(pong_, &to_send, sizeof(Data), 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800653 }
654
655 private:
656 struct Message {
657 long mtype = 1;
658 char data[sizeof(Data)];
659 };
660
661 Data ping_received_, pong_received_;
662
663 const int ping_, pong_;
664};
665
666class PosixQueuePingPonger : public StaticPingPonger {
667 public:
668 PosixQueuePingPonger() : ping_(Open("/ping")), pong_(Open("/pong")) {}
669 ~PosixQueuePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700670 AOS_PCHECK(mq_close(ping_));
671 AOS_PCHECK(mq_close(pong_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800672 }
673
674 const Data *Ping() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700675 AOS_PCHECK(mq_send(ping_,
676 static_cast<char *>(static_cast<void *>(PingData())),
677 sizeof(Data), 1));
678 AOS_PCHECK(mq_receive(
679 pong_, static_cast<char *>(static_cast<void *>(&pong_received_)),
680 sizeof(Data), nullptr));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800681 return &pong_received_;
682 }
683
684 const Data *Wait() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700685 AOS_PCHECK(mq_receive(
686 ping_, static_cast<char *>(static_cast<void *>(&ping_received_)),
687 sizeof(Data), nullptr));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800688 return &ping_received_;
689 }
690
691 virtual void Pong() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700692 AOS_PCHECK(mq_send(pong_,
693 static_cast<char *>(static_cast<void *>(PongData())),
694 sizeof(Data), 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800695 }
696
697 private:
698 mqd_t Open(const char *name) {
699 if (mq_unlink(name) == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700700 AOS_PLOG(FATAL, "mq_unlink(%s) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800701 }
702 struct mq_attr attr;
703 attr.mq_flags = 0;
704 attr.mq_maxmsg = 1;
705 attr.mq_msgsize = sizeof(Data);
706 attr.mq_curmsgs = 0;
707 const mqd_t r = mq_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, &attr);
708 if (r == reinterpret_cast<mqd_t>(-1)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700709 AOS_PLOG(FATAL, "mq_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800710 }
711 return r;
712 }
713
714 const mqd_t ping_, pong_;
715 Data ping_received_, pong_received_;
716};
717
718class PosixUnnamedSemaphorePingPonger : public PosixSemaphorePingPonger {
719 public:
720 PosixUnnamedSemaphorePingPonger(int pshared)
721 : PosixSemaphorePingPonger(&ping_sem_, &pong_sem_) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700722 AOS_PCHECK(sem_init(&ping_sem_, pshared, 0));
723 AOS_PCHECK(sem_init(&pong_sem_, pshared, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800724 }
725 ~PosixUnnamedSemaphorePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700726 AOS_PCHECK(sem_destroy(&ping_sem_));
727 AOS_PCHECK(sem_destroy(&pong_sem_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800728 }
729
730 private:
731 sem_t ping_sem_, pong_sem_;
732};
733
734class PosixNamedSemaphorePingPonger : public PosixSemaphorePingPonger {
735 public:
736 PosixNamedSemaphorePingPonger()
737 : PosixSemaphorePingPonger(ping_sem_ = Open("/ping"),
738 pong_sem_ = Open("/pong")) {}
739 ~PosixNamedSemaphorePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700740 AOS_PCHECK(sem_close(ping_sem_));
741 AOS_PCHECK(sem_close(pong_sem_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800742 }
743
744 private:
745 sem_t *Open(const char *name) {
746 if (sem_unlink(name) == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700747 AOS_PLOG(FATAL, "shm_unlink(%s) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800748 }
749 sem_t *const r = sem_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, 0);
750 if (r == SEM_FAILED) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700751 AOS_PLOG(FATAL, "sem_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800752 }
753 return r;
754 }
755
756 sem_t *ping_sem_, *pong_sem_;
757};
758
759class AOSQueuePingPonger : public PingPongerInterface {
760 public:
761 AOSQueuePingPonger()
762 : ping_queue_(RawQueue::Fetch("ping", sizeof(Data), 0, 1)),
763 pong_queue_(RawQueue::Fetch("pong", sizeof(Data), 0, 1)) {}
764
765 Data *PingData() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700766 AOS_CHECK_EQ(nullptr, ping_to_send_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800767 ping_to_send_ = static_cast<Data *>(ping_queue_->GetMessage());
768 return ping_to_send_;
769 }
770
771 const Data *Ping() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700772 AOS_CHECK_NE(nullptr, ping_to_send_);
773 AOS_CHECK(ping_queue_->WriteMessage(ping_to_send_, RawQueue::kBlock));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800774 ping_to_send_ = nullptr;
775 pong_queue_->FreeMessage(pong_received_);
776 pong_received_ =
777 static_cast<const Data *>(pong_queue_->ReadMessage(RawQueue::kBlock));
778 return pong_received_;
779 }
780
781 const Data *Wait() override {
782 ping_queue_->FreeMessage(ping_received_);
783 ping_received_ =
784 static_cast<const Data *>(ping_queue_->ReadMessage(RawQueue::kBlock));
785 return ping_received_;
786 }
787
788 Data *PongData() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700789 AOS_CHECK_EQ(nullptr, pong_to_send_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800790 pong_to_send_ = static_cast<Data *>(pong_queue_->GetMessage());
791 return pong_to_send_;
792 }
793
794 void Pong() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700795 AOS_CHECK_NE(nullptr, pong_to_send_);
796 AOS_CHECK(pong_queue_->WriteMessage(pong_to_send_, RawQueue::kBlock));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800797 pong_to_send_ = nullptr;
798 }
799
800 private:
801 RawQueue *const ping_queue_;
802 RawQueue *const pong_queue_;
803
804 Data *ping_to_send_ = nullptr, *pong_to_send_ = nullptr;
805 const Data *ping_received_ = nullptr, *pong_received_ = nullptr;
806};
807
808int Main(int /*argc*/, char **argv) {
809 ::std::unique_ptr<PingPongerInterface> ping_ponger;
810 if (FLAGS_method == "pipe") {
811 ping_ponger.reset(new PipePingPonger());
812 } else if (FLAGS_method == "named_pipe") {
813 ping_ponger.reset(new NamedPipePingPonger());
814 } else if (FLAGS_method == "aos_mutex") {
815 ping_ponger.reset(new AOSMutexPingPonger());
816 } else if (FLAGS_method == "aos_event") {
817 ping_ponger.reset(new AOSEventPingPonger());
818 } else if (FLAGS_method == "pthread_mutex") {
Brian Silvermanfd788882016-09-10 16:56:20 -0400819 ping_ponger.reset(new PthreadMutexPingPonger(false, false));
820 } else if (FLAGS_method == "pthread_mutex_pshared") {
821 ping_ponger.reset(new PthreadMutexPingPonger(true, false));
822 } else if (FLAGS_method == "pthread_mutex_pshared_pi") {
823 ping_ponger.reset(new PthreadMutexPingPonger(true, true));
824 } else if (FLAGS_method == "pthread_mutex_pi") {
825 ping_ponger.reset(new PthreadMutexPingPonger(false, true));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800826 } else if (FLAGS_method == "aos_queue") {
827 ping_ponger.reset(new AOSQueuePingPonger());
828 } else if (FLAGS_method == "eventfd") {
829 ping_ponger.reset(new EventFDPingPonger());
830 } else if (FLAGS_method == "sysv_semaphore") {
831 ping_ponger.reset(new SysvSemaphorePingPonger());
832 } else if (FLAGS_method == "sysv_queue") {
833 ping_ponger.reset(new SysvQueuePingPonger());
834 } else if (FLAGS_method == "posix_semaphore_unnamed_shared") {
835 ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(1));
836 } else if (FLAGS_method == "posix_semaphore_unnamed_unshared") {
837 ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(0));
838 } else if (FLAGS_method == "posix_semaphore_named") {
839 ping_ponger.reset(new PosixNamedSemaphorePingPonger());
840 } else if (FLAGS_method == "posix_queue") {
841 ping_ponger.reset(new PosixQueuePingPonger());
842 } else if (FLAGS_method == "unix_stream") {
843 ping_ponger.reset(new UnixPingPonger(SOCK_STREAM));
844 } else if (FLAGS_method == "unix_datagram") {
845 ping_ponger.reset(new UnixPingPonger(SOCK_DGRAM));
846 } else if (FLAGS_method == "unix_seqpacket") {
847 ping_ponger.reset(new UnixPingPonger(SOCK_SEQPACKET));
848 } else if (FLAGS_method == "tcp") {
Brian Silvermanfd788882016-09-10 16:56:20 -0400849 ping_ponger.reset(new TCPPingPonger(false));
850 } else if (FLAGS_method == "tcp_nodelay") {
851 ping_ponger.reset(new TCPPingPonger(true));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800852 } else if (FLAGS_method == "udp") {
853 ping_ponger.reset(new UDPPingPonger());
854 } else {
855 fprintf(stderr, "Unknown IPC method to test '%s'\n", FLAGS_method.c_str());
856 ::gflags::ShowUsageWithFlags(argv[0]);
857 return 1;
858 }
859
Brian Silverman1d42ce22016-09-10 16:55:40 -0400860 ::std::atomic<bool> done{false};
Brian Silvermane4d8b282015-12-24 13:44:48 -0800861
862 ::std::thread server([&ping_ponger, &done]() {
863 if (FLAGS_server_priority > 0) {
864 SetCurrentThreadRealtimePriority(FLAGS_server_priority);
865 }
866 PinCurrentThreadToCPU(FLAGS_server_cpu);
867
868 while (!done) {
869 const PingPongerInterface::Data *data = ping_ponger->Wait();
870 PingPongerInterface::Data *response = ping_ponger->PongData();
871 for (size_t i = 0; i < sizeof(*data); ++i) {
872 (*response)[i] = (*data)[i] + 1;
873 }
874 ping_ponger->Pong();
875 }
876 });
877
878 if (FLAGS_client_priority > 0) {
879 SetCurrentThreadRealtimePriority(FLAGS_client_priority);
880 }
881 PinCurrentThreadToCPU(FLAGS_client_cpu);
882
883 // Warm everything up.
884 for (int i = 0; i < 1000; ++i) {
885 PingPongerInterface::Data *warmup_data = ping_ponger->PingData();
886 memset(*warmup_data, i % 255, sizeof(*warmup_data));
887 ping_ponger->Ping();
888 }
889
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800890 const monotonic_clock::time_point start = monotonic_clock::now();
Brian Silvermane4d8b282015-12-24 13:44:48 -0800891
892 for (int32_t i = 0; i < FLAGS_messages; ++i) {
893 PingPongerInterface::Data *to_send = ping_ponger->PingData();
894 memset(*to_send, i % 123, sizeof(*to_send));
895 const PingPongerInterface::Data *received = ping_ponger->Ping();
896 for (size_t ii = 0; ii < sizeof(*received); ++ii) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700897 AOS_CHECK_EQ(((i % 123) + 1) % 255, (*received)[ii]);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800898 }
899 }
900
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800901 const monotonic_clock::time_point end = monotonic_clock::now();
Brian Silvermane4d8b282015-12-24 13:44:48 -0800902
Brian Silverman1d42ce22016-09-10 16:55:40 -0400903 // Try to make sure the server thread gets past its check of done so our
904 // Ping() down below doesn't hang. Kind of lame, but doing better would
905 // require complicating the interface to each implementation which isn't worth
906 // it here.
907 ::std::this_thread::sleep_for(::std::chrono::milliseconds(200));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800908 done = true;
909 ping_ponger->PingData();
910 ping_ponger->Ping();
911 server.join();
912
Austin Schuhf257f3c2019-10-27 21:00:43 -0700913 AOS_LOG(INFO, "Took %f seconds to send %" PRId32 " messages\n",
914 ::aos::time::DurationInSeconds(end - start), FLAGS_messages);
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800915 const chrono::nanoseconds per_message = (end - start) / FLAGS_messages;
916 if (per_message >= chrono::seconds(1)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700917 AOS_LOG(INFO, "More than 1 second per message ?!?\n");
Brian Silvermane4d8b282015-12-24 13:44:48 -0800918 } else {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700919 AOS_LOG(INFO, "That is %" PRId32 " nanoseconds per message\n",
920 static_cast<int>(per_message.count()));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800921 }
922
923 return 0;
924}
925
926} // namespace aos
927
928int main(int argc, char **argv) {
929 ::gflags::SetUsageMessage(
930 ::std::string("Compares various forms of IPC. Usage:\n") + argv[0] +
931 " --method=METHOD\n"
932 "METHOD can be one of the following:\n"
933 "\tpipe\n"
934 "\tnamed_pipe\n"
935 "\taos_mutex\n"
936 "\taos_event\n"
937 "\tpthread_mutex\n"
Brian Silvermanfd788882016-09-10 16:56:20 -0400938 "\tpthread_mutex_pshared\n"
939 "\tpthread_mutex_pshared_pi\n"
940 "\tpthread_mutex_pi\n"
Brian Silvermane4d8b282015-12-24 13:44:48 -0800941 "\taos_queue\n"
942 "\teventfd\n"
943 "\tsysv_semaphore\n"
944 "\tsysv_queue\n"
945 "\tposix_semaphore_unnamed_shared\n"
946 "\tposix_semaphore_unnamed_unshared\n"
947 "\tposix_semaphore_named\n"
948 "\tposix_queue\n"
949 "\tunix_stream\n"
950 "\tunix_datagram\n"
951 "\tunix_seqpacket\n"
952 "\ttcp\n"
Brian Silvermanfd788882016-09-10 16:56:20 -0400953 "\ttcp_nodelay\n"
Brian Silvermane4d8b282015-12-24 13:44:48 -0800954 "\tudp\n");
955 ::gflags::ParseCommandLineFlags(&argc, &argv, true);
956
957 ::aos::InitNRT();
Tyler Chatow4b471e12020-01-05 20:19:36 -0800958 ::aos::logging::SetImplementation(
Brian Silvermane4d8b282015-12-24 13:44:48 -0800959 new ::aos::logging::StreamLogImplementation(stdout));
960
961 return ::aos::Main(argc, argv);
962}