blob: fd1ff933d85849562f46e8b56b4ba8aa5891a3bb [file] [log] [blame]
Austin Schuhf2a50ba2016-12-24 16:16:26 -08001#include <fcntl.h>
2#include <mqueue.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -08003#include <netinet/in.h>
Brian Silvermanfd788882016-09-10 16:56:20 -04004#include <netinet/tcp.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -08005#include <pthread.h>
6#include <semaphore.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -08007#include <sys/eventfd.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -08008#include <sys/msg.h>
9#include <sys/sem.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -080010#include <sys/socket.h>
11#include <sys/stat.h>
12#include <sys/types.h>
13#include <sys/un.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -080014
Austin Schuhf2a50ba2016-12-24 16:16:26 -080015#include <atomic>
16#include <chrono>
Tyler Chatowbf0609c2021-07-31 16:13:27 -070017#include <cstdint>
Brian Silvermane4d8b282015-12-24 13:44:48 -080018#include <memory>
19#include <string>
Austin Schuhf2a50ba2016-12-24 16:16:26 -080020#include <thread>
Brian Silvermane4d8b282015-12-24 13:44:48 -080021
John Park33858a32018-09-28 23:05:48 -070022#include "aos/condition.h"
James Kuszmaul651fc3f2019-05-15 21:14:25 -070023#include "aos/init.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080024#include "aos/ipc_lib/event.h"
John Park33858a32018-09-28 23:05:48 -070025#include "aos/logging/implementations.h"
26#include "aos/logging/logging.h"
27#include "aos/mutex/mutex.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070028#include "aos/realtime.h"
John Park33858a32018-09-28 23:05:48 -070029#include "aos/time/time.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080030#include "gflags/gflags.h"
Brian Silvermane4d8b282015-12-24 13:44:48 -080031
32DEFINE_string(method, "", "Which IPC method to use");
33DEFINE_int32(messages, 1000000, "How many messages to send back and forth");
34DEFINE_int32(client_cpu, 0, "CPU to pin client to");
35DEFINE_int32(server_cpu, 0, "CPU to pin server to");
36DEFINE_int32(client_priority, 1,
37 "Realtime priority for client. Negative for don't change");
38DEFINE_int32(server_priority, 1,
39 "Realtime priority for server. Negative for don't change");
40
41namespace aos {
42
Austin Schuhf2a50ba2016-12-24 16:16:26 -080043namespace chrono = ::std::chrono;
44
Brian Silvermane4d8b282015-12-24 13:44:48 -080045// A generic interface for an object which can send some data to another thread
46// and back.
47//
48// One side is called the "server". It should constantly Wait, do something with
49// the result, and then call Pong.
50// The other side is called the "client". It should repeatedly call Ping.
51class PingPongerInterface {
52 public:
53 // A chunk of memory definitely on its own cache line anywhere sane.
54 typedef uint8_t Data[1024] __attribute__((aligned(128)));
55
56 virtual ~PingPongerInterface() {}
57
58 // Returns where the "client" side should write data in preparation to send to
59 // the server.
60 // The result is valid until the next Ping call.
61 virtual Data *PingData() = 0;
62
63 // Sends the data returned from the most recent PingData call to the "server"
64 // side and returns its response.
65 // PingData must be called exactly once before each call of this method.
66 // The result is valid until the next PingData call.
67 virtual const Data *Ping() = 0;
68
69 // Waits for a Ping call and then returns the associated data.
70 // The result is valid until the beginning of the next Pong call.
71 virtual const Data *Wait() = 0;
72
73 // Returns where the "server" side should write data in preparation to send
74 // back to the "client".
75 // The result is valid until the next Pong call.
76 virtual Data *PongData() = 0;
77
78 // Sends data back to an in-progress Ping.
79 // Sends the data returned from the most recent PongData call back to an
80 // in-progress Ping.
81 // PongData must be called exactly once before each call of this method.
82 virtual void Pong() = 0;
83};
84
85// Base class for implementations which simple use a pair of Data objects for
86// all Pings and Pongs.
87class StaticPingPonger : public PingPongerInterface {
88 public:
89 Data *PingData() override { return &ping_data_; }
90 Data *PongData() override { return &pong_data_; }
91
92 private:
93 Data ping_data_, pong_data_;
94};
95
96// Implements ping-pong by sending the data over file descriptors.
97class FDPingPonger : public StaticPingPonger {
98 protected:
99 // Subclasses must override and call Init.
100 FDPingPonger() {}
101
102 // Subclasses must call this in their constructor.
103 // Does not take ownership of any of the file descriptors, any/all of which
104 // may be the same.
105 // {server,client}_read must be open for reading and {server,client}_write
106 // must be open for writing.
107 void Init(int server_read, int server_write, int client_read,
108 int client_write) {
109 server_read_ = server_read;
110 server_write_ = server_write;
111 client_read_ = client_read;
112 client_write_ = client_write;
113 }
114
115 private:
116 const Data *Ping() override {
117 WriteFully(client_write_, *PingData());
118 ReadFully(client_read_, &read_by_client_);
119 return &read_by_client_;
120 }
121
122 const Data *Wait() override {
123 ReadFully(server_read_, &read_by_server_);
124 return &read_by_server_;
125 }
126
127 void Pong() override { WriteFully(server_write_, *PongData()); }
128
129 void ReadFully(int fd, Data *data) {
130 size_t remaining = sizeof(*data);
131 uint8_t *current = &(*data)[0];
132 while (remaining > 0) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700133 const ssize_t result = AOS_PCHECK(read(fd, current, remaining));
134 AOS_CHECK_LE(static_cast<size_t>(result), remaining);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800135 remaining -= result;
136 current += result;
137 }
138 }
139
140 void WriteFully(int fd, const Data &data) {
141 size_t remaining = sizeof(data);
142 const uint8_t *current = &data[0];
143 while (remaining > 0) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700144 const ssize_t result = AOS_PCHECK(write(fd, current, remaining));
145 AOS_CHECK_LE(static_cast<size_t>(result), remaining);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800146 remaining -= result;
147 current += result;
148 }
149 }
150
151 Data read_by_client_, read_by_server_;
152 int server_read_ = -1, server_write_ = -1, client_read_ = -1,
153 client_write_ = -1;
154};
155
156class PipePingPonger : public FDPingPonger {
157 public:
158 PipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700159 AOS_PCHECK(pipe(to_server));
160 AOS_PCHECK(pipe(from_server));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800161 Init(to_server[0], from_server[1], from_server[0], to_server[1]);
162 }
163 ~PipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700164 AOS_PCHECK(close(to_server[0]));
165 AOS_PCHECK(close(to_server[1]));
166 AOS_PCHECK(close(from_server[0]));
167 AOS_PCHECK(close(from_server[1]));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800168 }
169
170 private:
171 int to_server[2], from_server[2];
172};
173
174class NamedPipePingPonger : public FDPingPonger {
175 public:
176 NamedPipePingPonger() {
177 OpenFifo("/tmp/to_server", &client_write_, &server_read_);
178 OpenFifo("/tmp/from_server", &server_write_, &client_read_);
179
180 Init(server_read_, server_write_, client_read_, client_write_);
181 }
182 ~NamedPipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700183 AOS_PCHECK(close(server_read_));
184 AOS_PCHECK(close(client_write_));
185 AOS_PCHECK(close(client_read_));
186 AOS_PCHECK(close(server_write_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800187 }
188
189 private:
190 void OpenFifo(const char *name, int *write, int *read) {
191 {
192 const int ret = unlink(name);
193 if (ret == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700194 AOS_PLOG(FATAL, "unlink(%s)", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800195 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700196 AOS_PCHECK(mkfifo(name, S_IWUSR | S_IRUSR));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800197 // Have to open it nonblocking because the other end isn't open yet...
Austin Schuhf257f3c2019-10-27 21:00:43 -0700198 *read = AOS_PCHECK(open(name, O_RDONLY | O_NONBLOCK));
199 *write = AOS_PCHECK(open(name, O_WRONLY));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800200 {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700201 const int flags = AOS_PCHECK(fcntl(*read, F_GETFL));
202 AOS_PCHECK(fcntl(*read, F_SETFL, flags & ~O_NONBLOCK));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800203 }
204 }
205 }
206
207 int server_read_, server_write_, client_read_, client_write_;
208};
209
210class UnixPingPonger : public FDPingPonger {
211 public:
212 UnixPingPonger(int type) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700213 AOS_PCHECK(socketpair(AF_UNIX, type, 0, to_server));
214 AOS_PCHECK(socketpair(AF_UNIX, type, 0, from_server));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800215 Init(to_server[0], from_server[1], from_server[0], to_server[1]);
216 }
217 ~UnixPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700218 AOS_PCHECK(close(to_server[0]));
219 AOS_PCHECK(close(to_server[1]));
220 AOS_PCHECK(close(from_server[0]));
221 AOS_PCHECK(close(from_server[1]));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800222 }
223
224 private:
225 int to_server[2], from_server[2];
226};
227
228class TCPPingPonger : public FDPingPonger {
229 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400230 TCPPingPonger(bool nodelay) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700231 server_ = AOS_PCHECK(socket(AF_INET, SOCK_STREAM, 0));
Brian Silvermanfd788882016-09-10 16:56:20 -0400232 if (nodelay) {
233 const int yes = 1;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700234 AOS_PCHECK(
235 setsockopt(server_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
Brian Silvermanfd788882016-09-10 16:56:20 -0400236 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800237 {
238 sockaddr_in server_address;
239 memset(&server_address, 0, sizeof(server_address));
240 server_address.sin_family = AF_INET;
241 server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700242 AOS_PCHECK(bind(server_, reinterpret_cast<sockaddr *>(&server_address),
243 sizeof(server_address)));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800244 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700245 AOS_PCHECK(listen(server_, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800246
Austin Schuhf257f3c2019-10-27 21:00:43 -0700247 client_ = AOS_PCHECK(socket(AF_INET, SOCK_STREAM, 0));
Brian Silvermanfd788882016-09-10 16:56:20 -0400248 if (nodelay) {
249 const int yes = 1;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700250 AOS_PCHECK(
251 setsockopt(client_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
Brian Silvermanfd788882016-09-10 16:56:20 -0400252 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800253 {
254 sockaddr_in client_address;
255 unsigned int length = sizeof(client_address);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700256 AOS_PCHECK(getsockname(
257 server_, reinterpret_cast<sockaddr *>(&client_address), &length));
258 AOS_PCHECK(connect(client_, reinterpret_cast<sockaddr *>(&client_address),
259 length));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800260 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700261 server_connection_ = AOS_PCHECK(accept(server_, nullptr, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800262
263 Init(server_connection_, server_connection_, client_, client_);
264 }
265 ~TCPPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700266 AOS_PCHECK(close(client_));
267 AOS_PCHECK(close(server_connection_));
268 AOS_PCHECK(close(server_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800269 }
270
271 private:
272 int server_, client_, server_connection_;
273};
274
275class UDPPingPonger : public FDPingPonger {
276 public:
277 UDPPingPonger() {
278 CreatePair(&server_read_, &client_write_);
279 CreatePair(&client_read_, &server_write_);
280
281 Init(server_read_, server_write_, client_read_, client_write_);
282 }
283 ~UDPPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700284 AOS_PCHECK(close(server_read_));
285 AOS_PCHECK(close(client_write_));
286 AOS_PCHECK(close(client_read_));
287 AOS_PCHECK(close(server_write_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800288 }
289
290 private:
291 void CreatePair(int *server, int *client) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700292 *server = AOS_PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800293 {
294 sockaddr_in server_address;
295 memset(&server_address, 0, sizeof(server_address));
296 server_address.sin_family = AF_INET;
297 server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
298 // server_address.sin_port = htons(server_ + 3000);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700299 AOS_PCHECK(bind(*server, reinterpret_cast<sockaddr *>(&server_address),
300 sizeof(server_address)));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800301 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700302 *client = AOS_PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800303 {
304 sockaddr_in client_address;
305 unsigned int length = sizeof(client_address);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700306 AOS_PCHECK(getsockname(
307 *server, reinterpret_cast<sockaddr *>(&client_address), &length));
308 AOS_PCHECK(connect(*client, reinterpret_cast<sockaddr *>(&client_address),
309 length));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800310 }
311 }
312
313 int server_read_, server_write_, client_read_, client_write_;
314};
315
316// Implements ping-pong without copying the data using a condition variable-like
317// interface.
318class ConditionVariablePingPonger : public StaticPingPonger {
319 protected:
320 // Represents a condition variable bundled with a mutex.
321 //
322 // Wait may return spuriously.
323 class ConditionVariableInterface {
324 public:
325 virtual ~ConditionVariableInterface() {}
326
327 // Locks the mutex.
328 virtual void Lock() = 0;
329
330 // Unlocks the mutex.
331 virtual void Unlock() = 0;
332
333 // Waits on the condition variable.
334 //
335 // The mutex must be locked when this is called.
336 virtual void Wait() = 0;
337
338 // Signals the condition variable.
339 //
340 // The mutex does not have to be locked during this.
341 virtual void Signal() = 0;
342 };
343
344 ConditionVariablePingPonger(
345 ::std::unique_ptr<ConditionVariableInterface> ping,
346 ::std::unique_ptr<ConditionVariableInterface> pong)
347 : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
348
349 private:
350 const Data *Ping() override {
351 ping_->Lock();
352 to_server_ = PingData();
353 ping_->Unlock();
354 ping_->Signal();
355 pong_->Lock();
356 while (from_server_ == nullptr) {
357 pong_->Wait();
358 }
359 const Data *r = from_server_;
360 from_server_ = nullptr;
361 pong_->Unlock();
362 return r;
363 }
364
365 const Data *Wait() override {
366 ping_->Lock();
367 while (to_server_ == nullptr) {
368 ping_->Wait();
369 }
370 const Data *r = to_server_;
371 to_server_ = nullptr;
372 ping_->Unlock();
373 return r;
374 }
375
376 void Pong() override {
377 pong_->Lock();
378 from_server_ = PongData();
379 pong_->Unlock();
380 pong_->Signal();
381 }
382
383 const Data *to_server_ = nullptr, *from_server_ = nullptr;
384 const ::std::unique_ptr<ConditionVariableInterface> ping_, pong_;
385};
386
387// Implements ping-pong without copying the data using a semaphore-like
388// interface.
389class SemaphorePingPonger : public StaticPingPonger {
390 protected:
391 // Represents a semaphore, which need only count to 1.
392 //
393 // The behavior when calling Get/Put in anything other than alternating order
394 // is undefined.
395 //
396 // Wait may NOT return spuriously.
397 class SemaphoreInterface {
398 public:
399 virtual ~SemaphoreInterface() {}
400
401 virtual void Get() = 0;
402 virtual void Put() = 0;
403 };
404
405 SemaphorePingPonger(::std::unique_ptr<SemaphoreInterface> ping,
406 ::std::unique_ptr<SemaphoreInterface> pong)
407 : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
408
409 private:
410 const Data *Ping() override {
411 to_server_ = PingData();
412 ping_->Put();
413 pong_->Get();
414 return from_server_;
415 }
416
417 const Data *Wait() override {
418 ping_->Get();
419 return to_server_;
420 }
421
422 void Pong() override {
423 from_server_ = PongData();
424 pong_->Put();
425 }
426
427 const Data *to_server_ = nullptr, *from_server_ = nullptr;
428 const ::std::unique_ptr<SemaphoreInterface> ping_, pong_;
429};
430
Brian Silvermane4d8b282015-12-24 13:44:48 -0800431class AOSMutexPingPonger : public ConditionVariablePingPonger {
432 public:
433 AOSMutexPingPonger()
434 : ConditionVariablePingPonger(
435 ::std::unique_ptr<ConditionVariableInterface>(
436 new AOSConditionVariable()),
437 ::std::unique_ptr<ConditionVariableInterface>(
438 new AOSConditionVariable())) {}
439
440 private:
441 class AOSConditionVariable : public ConditionVariableInterface {
442 public:
443 AOSConditionVariable() : condition_(&mutex_) {}
444
445 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700446 void Lock() override { AOS_CHECK(!mutex_.Lock()); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800447 void Unlock() override { mutex_.Unlock(); }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700448 void Wait() override { AOS_CHECK(!condition_.Wait()); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800449 void Signal() override { condition_.Broadcast(); }
450
451 Mutex mutex_;
452 Condition condition_;
453 };
454};
455
456class AOSEventPingPonger : public SemaphorePingPonger {
457 public:
458 AOSEventPingPonger()
459 : SemaphorePingPonger(
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700460 ::std::unique_ptr<SemaphoreInterface>(new AOSEventSemaphore()),
461 ::std::unique_ptr<SemaphoreInterface>(new AOSEventSemaphore())) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800462
463 private:
464 class AOSEventSemaphore : public SemaphoreInterface {
465 private:
466 void Get() override {
467 event_.Wait();
468 event_.Clear();
469 }
470 void Put() override { event_.Set(); }
471
472 Event event_;
473 };
474};
475
476class PthreadMutexPingPonger : public ConditionVariablePingPonger {
477 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400478 PthreadMutexPingPonger(int pshared, bool pi)
Brian Silvermane4d8b282015-12-24 13:44:48 -0800479 : ConditionVariablePingPonger(
480 ::std::unique_ptr<ConditionVariableInterface>(
Brian Silvermanfd788882016-09-10 16:56:20 -0400481 new PthreadConditionVariable(pshared, pi)),
Brian Silvermane4d8b282015-12-24 13:44:48 -0800482 ::std::unique_ptr<ConditionVariableInterface>(
Brian Silvermanfd788882016-09-10 16:56:20 -0400483 new PthreadConditionVariable(pshared, pi))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800484
485 private:
486 class PthreadConditionVariable : public ConditionVariableInterface {
487 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400488 PthreadConditionVariable(bool pshared, bool pi) {
489 {
490 pthread_condattr_t cond_attr;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700491 AOS_PRCHECK(pthread_condattr_init(&cond_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400492 if (pshared) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700493 AOS_PRCHECK(
Brian Silvermanfd788882016-09-10 16:56:20 -0400494 pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED));
495 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700496 AOS_PRCHECK(pthread_cond_init(&condition_, &cond_attr));
497 AOS_PRCHECK(pthread_condattr_destroy(&cond_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400498 }
499
500 {
501 pthread_mutexattr_t mutex_attr;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700502 AOS_PRCHECK(pthread_mutexattr_init(&mutex_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400503 if (pshared) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700504 AOS_PRCHECK(pthread_mutexattr_setpshared(&mutex_attr,
505 PTHREAD_PROCESS_SHARED));
Brian Silvermanfd788882016-09-10 16:56:20 -0400506 }
507 if (pi) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700508 AOS_PRCHECK(
Brian Silvermanfd788882016-09-10 16:56:20 -0400509 pthread_mutexattr_setprotocol(&mutex_attr, PTHREAD_PRIO_INHERIT));
510 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700511 AOS_PRCHECK(pthread_mutex_init(&mutex_, nullptr));
512 AOS_PRCHECK(pthread_mutexattr_destroy(&mutex_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400513 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800514 }
515 ~PthreadConditionVariable() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700516 AOS_PRCHECK(pthread_mutex_destroy(&mutex_));
517 AOS_PRCHECK(pthread_cond_destroy(&condition_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800518 }
519
520 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700521 void Lock() override { AOS_PRCHECK(pthread_mutex_lock(&mutex_)); }
522 void Unlock() override { AOS_PRCHECK(pthread_mutex_unlock(&mutex_)); }
523 void Wait() override {
524 AOS_PRCHECK(pthread_cond_wait(&condition_, &mutex_));
525 }
526 void Signal() override { AOS_PRCHECK(pthread_cond_broadcast(&condition_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800527
528 pthread_cond_t condition_;
529 pthread_mutex_t mutex_;
530 };
531};
532
533class EventFDPingPonger : public SemaphorePingPonger {
534 public:
535 EventFDPingPonger()
536 : SemaphorePingPonger(
537 ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore()),
538 ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore())) {}
539
540 private:
541 class EventFDSemaphore : public SemaphoreInterface {
542 public:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700543 EventFDSemaphore() : fd_(AOS_PCHECK(eventfd(0, 0))) {}
544 ~EventFDSemaphore() { AOS_PCHECK(close(fd_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800545
546 private:
547 void Get() override {
548 uint64_t value;
549 if (read(fd_, &value, sizeof(value)) != sizeof(value)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700550 AOS_PLOG(FATAL, "reading from eventfd %d failed\n", fd_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800551 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700552 AOS_CHECK_EQ(1u, value);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800553 }
554 void Put() override {
555 uint64_t value = 1;
556 if (write(fd_, &value, sizeof(value)) != sizeof(value)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700557 AOS_PLOG(FATAL, "writing to eventfd %d failed\n", fd_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800558 }
559 }
560
561 const int fd_;
562 };
563};
564
565class SysvSemaphorePingPonger : public SemaphorePingPonger {
566 public:
567 SysvSemaphorePingPonger()
568 : SemaphorePingPonger(
569 ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore()),
570 ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore())) {}
571
572 private:
573 class SysvSemaphore : public SemaphoreInterface {
574 public:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700575 SysvSemaphore() : sem_id_(AOS_PCHECK(semget(IPC_PRIVATE, 1, 0600))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800576
577 private:
578 void Get() override {
579 struct sembuf op;
580 op.sem_num = 0;
581 op.sem_op = -1;
582 op.sem_flg = 0;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700583 AOS_PCHECK(semop(sem_id_, &op, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800584 }
585 void Put() override {
586 struct sembuf op;
587 op.sem_num = 0;
588 op.sem_op = 1;
589 op.sem_flg = 0;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700590 AOS_PCHECK(semop(sem_id_, &op, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800591 }
592
593 const int sem_id_;
594 };
595};
596
597class PosixSemaphorePingPonger : public SemaphorePingPonger {
598 protected:
599 PosixSemaphorePingPonger(sem_t *ping_sem, sem_t *pong_sem)
600 : SemaphorePingPonger(
601 ::std::unique_ptr<SemaphoreInterface>(new PosixSemaphore(ping_sem)),
602 ::std::unique_ptr<SemaphoreInterface>(
603 new PosixSemaphore(pong_sem))) {}
604
605 private:
606 class PosixSemaphore : public SemaphoreInterface {
607 public:
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700608 PosixSemaphore(sem_t *sem) : sem_(sem) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800609
610 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700611 void Get() override { AOS_PCHECK(sem_wait(sem_)); }
612 void Put() override { AOS_PCHECK(sem_post(sem_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800613
614 sem_t *const sem_;
615 };
616};
617
618class SysvQueuePingPonger : public StaticPingPonger {
619 public:
620 SysvQueuePingPonger()
Austin Schuhf257f3c2019-10-27 21:00:43 -0700621 : ping_(AOS_PCHECK(msgget(IPC_PRIVATE, 0600))),
622 pong_(AOS_PCHECK(msgget(IPC_PRIVATE, 0600))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800623
624 const Data *Ping() override {
625 {
626 Message to_send;
627 memcpy(&to_send.data, PingData(), sizeof(Data));
Austin Schuhf257f3c2019-10-27 21:00:43 -0700628 AOS_PCHECK(msgsnd(ping_, &to_send, sizeof(Data), 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800629 }
630 {
631 Message received;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700632 AOS_PCHECK(msgrcv(pong_, &received, sizeof(Data), 1, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800633 memcpy(&pong_received_, &received.data, sizeof(Data));
634 }
635 return &pong_received_;
636 }
637
638 const Data *Wait() override {
639 {
640 Message received;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700641 AOS_PCHECK(msgrcv(ping_, &received, sizeof(Data), 1, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800642 memcpy(&ping_received_, &received.data, sizeof(Data));
643 }
644 return &ping_received_;
645 }
646
647 virtual void Pong() override {
648 Message to_send;
649 memcpy(&to_send.data, PongData(), sizeof(Data));
Austin Schuhf257f3c2019-10-27 21:00:43 -0700650 AOS_PCHECK(msgsnd(pong_, &to_send, sizeof(Data), 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800651 }
652
653 private:
654 struct Message {
655 long mtype = 1;
656 char data[sizeof(Data)];
657 };
658
659 Data ping_received_, pong_received_;
660
661 const int ping_, pong_;
662};
663
664class PosixQueuePingPonger : public StaticPingPonger {
665 public:
666 PosixQueuePingPonger() : ping_(Open("/ping")), pong_(Open("/pong")) {}
667 ~PosixQueuePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700668 AOS_PCHECK(mq_close(ping_));
669 AOS_PCHECK(mq_close(pong_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800670 }
671
672 const Data *Ping() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700673 AOS_PCHECK(mq_send(ping_,
674 static_cast<char *>(static_cast<void *>(PingData())),
675 sizeof(Data), 1));
676 AOS_PCHECK(mq_receive(
677 pong_, static_cast<char *>(static_cast<void *>(&pong_received_)),
678 sizeof(Data), nullptr));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800679 return &pong_received_;
680 }
681
682 const Data *Wait() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700683 AOS_PCHECK(mq_receive(
684 ping_, static_cast<char *>(static_cast<void *>(&ping_received_)),
685 sizeof(Data), nullptr));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800686 return &ping_received_;
687 }
688
689 virtual void Pong() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700690 AOS_PCHECK(mq_send(pong_,
691 static_cast<char *>(static_cast<void *>(PongData())),
692 sizeof(Data), 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800693 }
694
695 private:
696 mqd_t Open(const char *name) {
697 if (mq_unlink(name) == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700698 AOS_PLOG(FATAL, "mq_unlink(%s) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800699 }
700 struct mq_attr attr;
701 attr.mq_flags = 0;
702 attr.mq_maxmsg = 1;
703 attr.mq_msgsize = sizeof(Data);
704 attr.mq_curmsgs = 0;
705 const mqd_t r = mq_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, &attr);
706 if (r == reinterpret_cast<mqd_t>(-1)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700707 AOS_PLOG(FATAL, "mq_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800708 }
709 return r;
710 }
711
712 const mqd_t ping_, pong_;
713 Data ping_received_, pong_received_;
714};
715
716class PosixUnnamedSemaphorePingPonger : public PosixSemaphorePingPonger {
717 public:
718 PosixUnnamedSemaphorePingPonger(int pshared)
719 : PosixSemaphorePingPonger(&ping_sem_, &pong_sem_) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700720 AOS_PCHECK(sem_init(&ping_sem_, pshared, 0));
721 AOS_PCHECK(sem_init(&pong_sem_, pshared, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800722 }
723 ~PosixUnnamedSemaphorePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700724 AOS_PCHECK(sem_destroy(&ping_sem_));
725 AOS_PCHECK(sem_destroy(&pong_sem_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800726 }
727
728 private:
729 sem_t ping_sem_, pong_sem_;
730};
731
732class PosixNamedSemaphorePingPonger : public PosixSemaphorePingPonger {
733 public:
734 PosixNamedSemaphorePingPonger()
735 : PosixSemaphorePingPonger(ping_sem_ = Open("/ping"),
736 pong_sem_ = Open("/pong")) {}
737 ~PosixNamedSemaphorePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700738 AOS_PCHECK(sem_close(ping_sem_));
739 AOS_PCHECK(sem_close(pong_sem_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800740 }
741
742 private:
743 sem_t *Open(const char *name) {
744 if (sem_unlink(name) == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700745 AOS_PLOG(FATAL, "shm_unlink(%s) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800746 }
747 sem_t *const r = sem_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, 0);
748 if (r == SEM_FAILED) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700749 AOS_PLOG(FATAL, "sem_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800750 }
751 return r;
752 }
753
754 sem_t *ping_sem_, *pong_sem_;
755};
756
Brian Silvermane4d8b282015-12-24 13:44:48 -0800757int Main(int /*argc*/, char **argv) {
758 ::std::unique_ptr<PingPongerInterface> ping_ponger;
759 if (FLAGS_method == "pipe") {
760 ping_ponger.reset(new PipePingPonger());
761 } else if (FLAGS_method == "named_pipe") {
762 ping_ponger.reset(new NamedPipePingPonger());
763 } else if (FLAGS_method == "aos_mutex") {
764 ping_ponger.reset(new AOSMutexPingPonger());
765 } else if (FLAGS_method == "aos_event") {
766 ping_ponger.reset(new AOSEventPingPonger());
767 } else if (FLAGS_method == "pthread_mutex") {
Brian Silvermanfd788882016-09-10 16:56:20 -0400768 ping_ponger.reset(new PthreadMutexPingPonger(false, false));
769 } else if (FLAGS_method == "pthread_mutex_pshared") {
770 ping_ponger.reset(new PthreadMutexPingPonger(true, false));
771 } else if (FLAGS_method == "pthread_mutex_pshared_pi") {
772 ping_ponger.reset(new PthreadMutexPingPonger(true, true));
773 } else if (FLAGS_method == "pthread_mutex_pi") {
774 ping_ponger.reset(new PthreadMutexPingPonger(false, true));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800775 } else if (FLAGS_method == "eventfd") {
776 ping_ponger.reset(new EventFDPingPonger());
777 } else if (FLAGS_method == "sysv_semaphore") {
778 ping_ponger.reset(new SysvSemaphorePingPonger());
779 } else if (FLAGS_method == "sysv_queue") {
780 ping_ponger.reset(new SysvQueuePingPonger());
781 } else if (FLAGS_method == "posix_semaphore_unnamed_shared") {
782 ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(1));
783 } else if (FLAGS_method == "posix_semaphore_unnamed_unshared") {
784 ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(0));
785 } else if (FLAGS_method == "posix_semaphore_named") {
786 ping_ponger.reset(new PosixNamedSemaphorePingPonger());
787 } else if (FLAGS_method == "posix_queue") {
788 ping_ponger.reset(new PosixQueuePingPonger());
789 } else if (FLAGS_method == "unix_stream") {
790 ping_ponger.reset(new UnixPingPonger(SOCK_STREAM));
791 } else if (FLAGS_method == "unix_datagram") {
792 ping_ponger.reset(new UnixPingPonger(SOCK_DGRAM));
793 } else if (FLAGS_method == "unix_seqpacket") {
794 ping_ponger.reset(new UnixPingPonger(SOCK_SEQPACKET));
795 } else if (FLAGS_method == "tcp") {
Brian Silvermanfd788882016-09-10 16:56:20 -0400796 ping_ponger.reset(new TCPPingPonger(false));
797 } else if (FLAGS_method == "tcp_nodelay") {
798 ping_ponger.reset(new TCPPingPonger(true));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800799 } else if (FLAGS_method == "udp") {
800 ping_ponger.reset(new UDPPingPonger());
801 } else {
802 fprintf(stderr, "Unknown IPC method to test '%s'\n", FLAGS_method.c_str());
803 ::gflags::ShowUsageWithFlags(argv[0]);
804 return 1;
805 }
806
Brian Silverman1d42ce22016-09-10 16:55:40 -0400807 ::std::atomic<bool> done{false};
Brian Silvermane4d8b282015-12-24 13:44:48 -0800808
809 ::std::thread server([&ping_ponger, &done]() {
810 if (FLAGS_server_priority > 0) {
811 SetCurrentThreadRealtimePriority(FLAGS_server_priority);
812 }
Austin Schuh9014e3b2020-11-21 14:26:07 -0800813 SetCurrentThreadAffinity(MakeCpusetFromCpus({FLAGS_server_cpu}));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800814
815 while (!done) {
816 const PingPongerInterface::Data *data = ping_ponger->Wait();
817 PingPongerInterface::Data *response = ping_ponger->PongData();
818 for (size_t i = 0; i < sizeof(*data); ++i) {
819 (*response)[i] = (*data)[i] + 1;
820 }
821 ping_ponger->Pong();
822 }
823 });
824
825 if (FLAGS_client_priority > 0) {
826 SetCurrentThreadRealtimePriority(FLAGS_client_priority);
827 }
Austin Schuh9014e3b2020-11-21 14:26:07 -0800828 SetCurrentThreadAffinity(MakeCpusetFromCpus({FLAGS_client_cpu}));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800829
830 // Warm everything up.
831 for (int i = 0; i < 1000; ++i) {
832 PingPongerInterface::Data *warmup_data = ping_ponger->PingData();
833 memset(*warmup_data, i % 255, sizeof(*warmup_data));
834 ping_ponger->Ping();
835 }
836
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800837 const monotonic_clock::time_point start = monotonic_clock::now();
Brian Silvermane4d8b282015-12-24 13:44:48 -0800838
839 for (int32_t i = 0; i < FLAGS_messages; ++i) {
840 PingPongerInterface::Data *to_send = ping_ponger->PingData();
841 memset(*to_send, i % 123, sizeof(*to_send));
842 const PingPongerInterface::Data *received = ping_ponger->Ping();
843 for (size_t ii = 0; ii < sizeof(*received); ++ii) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700844 AOS_CHECK_EQ(((i % 123) + 1) % 255, (*received)[ii]);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800845 }
846 }
847
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800848 const monotonic_clock::time_point end = monotonic_clock::now();
Brian Silvermane4d8b282015-12-24 13:44:48 -0800849
Brian Silverman1d42ce22016-09-10 16:55:40 -0400850 // Try to make sure the server thread gets past its check of done so our
851 // Ping() down below doesn't hang. Kind of lame, but doing better would
852 // require complicating the interface to each implementation which isn't worth
853 // it here.
854 ::std::this_thread::sleep_for(::std::chrono::milliseconds(200));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800855 done = true;
856 ping_ponger->PingData();
857 ping_ponger->Ping();
858 server.join();
859
Austin Schuhf257f3c2019-10-27 21:00:43 -0700860 AOS_LOG(INFO, "Took %f seconds to send %" PRId32 " messages\n",
861 ::aos::time::DurationInSeconds(end - start), FLAGS_messages);
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800862 const chrono::nanoseconds per_message = (end - start) / FLAGS_messages;
863 if (per_message >= chrono::seconds(1)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700864 AOS_LOG(INFO, "More than 1 second per message ?!?\n");
Brian Silvermane4d8b282015-12-24 13:44:48 -0800865 } else {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700866 AOS_LOG(INFO, "That is %" PRId32 " nanoseconds per message\n",
867 static_cast<int>(per_message.count()));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800868 }
869
870 return 0;
871}
872
873} // namespace aos
874
875int main(int argc, char **argv) {
876 ::gflags::SetUsageMessage(
877 ::std::string("Compares various forms of IPC. Usage:\n") + argv[0] +
878 " --method=METHOD\n"
879 "METHOD can be one of the following:\n"
880 "\tpipe\n"
881 "\tnamed_pipe\n"
882 "\taos_mutex\n"
883 "\taos_event\n"
884 "\tpthread_mutex\n"
Brian Silvermanfd788882016-09-10 16:56:20 -0400885 "\tpthread_mutex_pshared\n"
886 "\tpthread_mutex_pshared_pi\n"
887 "\tpthread_mutex_pi\n"
Brian Silvermane4d8b282015-12-24 13:44:48 -0800888 "\teventfd\n"
889 "\tsysv_semaphore\n"
890 "\tsysv_queue\n"
891 "\tposix_semaphore_unnamed_shared\n"
892 "\tposix_semaphore_unnamed_unshared\n"
893 "\tposix_semaphore_named\n"
894 "\tposix_queue\n"
895 "\tunix_stream\n"
896 "\tunix_datagram\n"
897 "\tunix_seqpacket\n"
898 "\ttcp\n"
Brian Silvermanfd788882016-09-10 16:56:20 -0400899 "\ttcp_nodelay\n"
Brian Silvermane4d8b282015-12-24 13:44:48 -0800900 "\tudp\n");
Austin Schuh094d09b2020-11-20 23:26:52 -0800901 ::aos::InitGoogle(&argc, &argv);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800902
903 return ::aos::Main(argc, argv);
904}