blob: 8bb78d7720b2b7a9a7ec3b9e81f91cfed1991250 [file] [log] [blame]
Stephan Pleines682928d2024-05-31 20:43:48 -07001#include <arpa/inet.h>
2#include <errno.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -08003#include <fcntl.h>
Stephan Pleines682928d2024-05-31 20:43:48 -07004#include <inttypes.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -08005#include <mqueue.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -08006#include <netinet/in.h>
Brian Silvermanfd788882016-09-10 16:56:20 -04007#include <netinet/tcp.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -08008#include <pthread.h>
9#include <semaphore.h>
Stephan Pleines682928d2024-05-31 20:43:48 -070010#include <stdio.h>
11#include <string.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -080012#include <sys/eventfd.h>
Stephan Pleines682928d2024-05-31 20:43:48 -070013#include <sys/ipc.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -080014#include <sys/msg.h>
15#include <sys/sem.h>
Austin Schuhf2a50ba2016-12-24 16:16:26 -080016#include <sys/socket.h>
17#include <sys/stat.h>
Stephan Pleines682928d2024-05-31 20:43:48 -070018#include <unistd.h>
Brian Silvermane4d8b282015-12-24 13:44:48 -080019
Austin Schuhf2a50ba2016-12-24 16:16:26 -080020#include <atomic>
21#include <chrono>
Stephan Pleines682928d2024-05-31 20:43:48 -070022#include <compare>
Tyler Chatowbf0609c2021-07-31 16:13:27 -070023#include <cstdint>
Brian Silvermane4d8b282015-12-24 13:44:48 -080024#include <memory>
25#include <string>
Austin Schuhf2a50ba2016-12-24 16:16:26 -080026#include <thread>
Stephan Pleines682928d2024-05-31 20:43:48 -070027#include <utility>
Brian Silvermane4d8b282015-12-24 13:44:48 -080028
Philipp Schrader790cb542023-07-05 21:06:52 -070029#include "gflags/gflags.h"
30
John Park33858a32018-09-28 23:05:48 -070031#include "aos/condition.h"
James Kuszmaul651fc3f2019-05-15 21:14:25 -070032#include "aos/init.h"
Brian Silverman7b266d92021-02-17 21:24:02 -080033#include "aos/ipc_lib/event.h"
John Park33858a32018-09-28 23:05:48 -070034#include "aos/logging/implementations.h"
John Park33858a32018-09-28 23:05:48 -070035#include "aos/mutex/mutex.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070036#include "aos/realtime.h"
John Park33858a32018-09-28 23:05:48 -070037#include "aos/time/time.h"
Brian Silvermane4d8b282015-12-24 13:44:48 -080038
39DEFINE_string(method, "", "Which IPC method to use");
40DEFINE_int32(messages, 1000000, "How many messages to send back and forth");
41DEFINE_int32(client_cpu, 0, "CPU to pin client to");
42DEFINE_int32(server_cpu, 0, "CPU to pin server to");
43DEFINE_int32(client_priority, 1,
44 "Realtime priority for client. Negative for don't change");
45DEFINE_int32(server_priority, 1,
46 "Realtime priority for server. Negative for don't change");
47
48namespace aos {
49
Austin Schuhf2a50ba2016-12-24 16:16:26 -080050namespace chrono = ::std::chrono;
51
Brian Silvermane4d8b282015-12-24 13:44:48 -080052// A generic interface for an object which can send some data to another thread
53// and back.
54//
55// One side is called the "server". It should constantly Wait, do something with
56// the result, and then call Pong.
57// The other side is called the "client". It should repeatedly call Ping.
58class PingPongerInterface {
59 public:
60 // A chunk of memory definitely on its own cache line anywhere sane.
61 typedef uint8_t Data[1024] __attribute__((aligned(128)));
62
63 virtual ~PingPongerInterface() {}
64
65 // Returns where the "client" side should write data in preparation to send to
66 // the server.
67 // The result is valid until the next Ping call.
68 virtual Data *PingData() = 0;
69
70 // Sends the data returned from the most recent PingData call to the "server"
71 // side and returns its response.
72 // PingData must be called exactly once before each call of this method.
73 // The result is valid until the next PingData call.
74 virtual const Data *Ping() = 0;
75
76 // Waits for a Ping call and then returns the associated data.
77 // The result is valid until the beginning of the next Pong call.
78 virtual const Data *Wait() = 0;
79
80 // Returns where the "server" side should write data in preparation to send
81 // back to the "client".
82 // The result is valid until the next Pong call.
83 virtual Data *PongData() = 0;
84
85 // Sends data back to an in-progress Ping.
86 // Sends the data returned from the most recent PongData call back to an
87 // in-progress Ping.
88 // PongData must be called exactly once before each call of this method.
89 virtual void Pong() = 0;
90};
91
92// Base class for implementations which simple use a pair of Data objects for
93// all Pings and Pongs.
94class StaticPingPonger : public PingPongerInterface {
95 public:
96 Data *PingData() override { return &ping_data_; }
97 Data *PongData() override { return &pong_data_; }
98
99 private:
100 Data ping_data_, pong_data_;
101};
102
103// Implements ping-pong by sending the data over file descriptors.
104class FDPingPonger : public StaticPingPonger {
105 protected:
106 // Subclasses must override and call Init.
107 FDPingPonger() {}
108
109 // Subclasses must call this in their constructor.
110 // Does not take ownership of any of the file descriptors, any/all of which
111 // may be the same.
112 // {server,client}_read must be open for reading and {server,client}_write
113 // must be open for writing.
114 void Init(int server_read, int server_write, int client_read,
115 int client_write) {
116 server_read_ = server_read;
117 server_write_ = server_write;
118 client_read_ = client_read;
119 client_write_ = client_write;
120 }
121
122 private:
123 const Data *Ping() override {
124 WriteFully(client_write_, *PingData());
125 ReadFully(client_read_, &read_by_client_);
126 return &read_by_client_;
127 }
128
129 const Data *Wait() override {
130 ReadFully(server_read_, &read_by_server_);
131 return &read_by_server_;
132 }
133
134 void Pong() override { WriteFully(server_write_, *PongData()); }
135
136 void ReadFully(int fd, Data *data) {
137 size_t remaining = sizeof(*data);
138 uint8_t *current = &(*data)[0];
139 while (remaining > 0) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700140 const ssize_t result = AOS_PCHECK(read(fd, current, remaining));
141 AOS_CHECK_LE(static_cast<size_t>(result), remaining);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800142 remaining -= result;
143 current += result;
144 }
145 }
146
147 void WriteFully(int fd, const Data &data) {
148 size_t remaining = sizeof(data);
149 const uint8_t *current = &data[0];
150 while (remaining > 0) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700151 const ssize_t result = AOS_PCHECK(write(fd, current, remaining));
152 AOS_CHECK_LE(static_cast<size_t>(result), remaining);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800153 remaining -= result;
154 current += result;
155 }
156 }
157
158 Data read_by_client_, read_by_server_;
159 int server_read_ = -1, server_write_ = -1, client_read_ = -1,
160 client_write_ = -1;
161};
162
163class PipePingPonger : public FDPingPonger {
164 public:
165 PipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700166 AOS_PCHECK(pipe(to_server));
167 AOS_PCHECK(pipe(from_server));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800168 Init(to_server[0], from_server[1], from_server[0], to_server[1]);
169 }
170 ~PipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700171 AOS_PCHECK(close(to_server[0]));
172 AOS_PCHECK(close(to_server[1]));
173 AOS_PCHECK(close(from_server[0]));
174 AOS_PCHECK(close(from_server[1]));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800175 }
176
177 private:
178 int to_server[2], from_server[2];
179};
180
181class NamedPipePingPonger : public FDPingPonger {
182 public:
183 NamedPipePingPonger() {
184 OpenFifo("/tmp/to_server", &client_write_, &server_read_);
185 OpenFifo("/tmp/from_server", &server_write_, &client_read_);
186
187 Init(server_read_, server_write_, client_read_, client_write_);
188 }
189 ~NamedPipePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700190 AOS_PCHECK(close(server_read_));
191 AOS_PCHECK(close(client_write_));
192 AOS_PCHECK(close(client_read_));
193 AOS_PCHECK(close(server_write_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800194 }
195
196 private:
197 void OpenFifo(const char *name, int *write, int *read) {
198 {
199 const int ret = unlink(name);
200 if (ret == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700201 AOS_PLOG(FATAL, "unlink(%s)", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800202 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700203 AOS_PCHECK(mkfifo(name, S_IWUSR | S_IRUSR));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800204 // Have to open it nonblocking because the other end isn't open yet...
Austin Schuhf257f3c2019-10-27 21:00:43 -0700205 *read = AOS_PCHECK(open(name, O_RDONLY | O_NONBLOCK));
206 *write = AOS_PCHECK(open(name, O_WRONLY));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800207 {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700208 const int flags = AOS_PCHECK(fcntl(*read, F_GETFL));
209 AOS_PCHECK(fcntl(*read, F_SETFL, flags & ~O_NONBLOCK));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800210 }
211 }
212 }
213
214 int server_read_, server_write_, client_read_, client_write_;
215};
216
217class UnixPingPonger : public FDPingPonger {
218 public:
219 UnixPingPonger(int type) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700220 AOS_PCHECK(socketpair(AF_UNIX, type, 0, to_server));
221 AOS_PCHECK(socketpair(AF_UNIX, type, 0, from_server));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800222 Init(to_server[0], from_server[1], from_server[0], to_server[1]);
223 }
224 ~UnixPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700225 AOS_PCHECK(close(to_server[0]));
226 AOS_PCHECK(close(to_server[1]));
227 AOS_PCHECK(close(from_server[0]));
228 AOS_PCHECK(close(from_server[1]));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800229 }
230
231 private:
232 int to_server[2], from_server[2];
233};
234
235class TCPPingPonger : public FDPingPonger {
236 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400237 TCPPingPonger(bool nodelay) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700238 server_ = AOS_PCHECK(socket(AF_INET, SOCK_STREAM, 0));
Brian Silvermanfd788882016-09-10 16:56:20 -0400239 if (nodelay) {
240 const int yes = 1;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700241 AOS_PCHECK(
242 setsockopt(server_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
Brian Silvermanfd788882016-09-10 16:56:20 -0400243 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800244 {
245 sockaddr_in server_address;
246 memset(&server_address, 0, sizeof(server_address));
247 server_address.sin_family = AF_INET;
248 server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700249 AOS_PCHECK(bind(server_, reinterpret_cast<sockaddr *>(&server_address),
250 sizeof(server_address)));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800251 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700252 AOS_PCHECK(listen(server_, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800253
Austin Schuhf257f3c2019-10-27 21:00:43 -0700254 client_ = AOS_PCHECK(socket(AF_INET, SOCK_STREAM, 0));
Brian Silvermanfd788882016-09-10 16:56:20 -0400255 if (nodelay) {
256 const int yes = 1;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700257 AOS_PCHECK(
258 setsockopt(client_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
Brian Silvermanfd788882016-09-10 16:56:20 -0400259 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800260 {
261 sockaddr_in client_address;
262 unsigned int length = sizeof(client_address);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700263 AOS_PCHECK(getsockname(
264 server_, reinterpret_cast<sockaddr *>(&client_address), &length));
265 AOS_PCHECK(connect(client_, reinterpret_cast<sockaddr *>(&client_address),
266 length));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800267 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700268 server_connection_ = AOS_PCHECK(accept(server_, nullptr, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800269
270 Init(server_connection_, server_connection_, client_, client_);
271 }
272 ~TCPPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700273 AOS_PCHECK(close(client_));
274 AOS_PCHECK(close(server_connection_));
275 AOS_PCHECK(close(server_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800276 }
277
278 private:
279 int server_, client_, server_connection_;
280};
281
282class UDPPingPonger : public FDPingPonger {
283 public:
284 UDPPingPonger() {
285 CreatePair(&server_read_, &client_write_);
286 CreatePair(&client_read_, &server_write_);
287
288 Init(server_read_, server_write_, client_read_, client_write_);
289 }
290 ~UDPPingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700291 AOS_PCHECK(close(server_read_));
292 AOS_PCHECK(close(client_write_));
293 AOS_PCHECK(close(client_read_));
294 AOS_PCHECK(close(server_write_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800295 }
296
297 private:
298 void CreatePair(int *server, int *client) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700299 *server = AOS_PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800300 {
301 sockaddr_in server_address;
302 memset(&server_address, 0, sizeof(server_address));
303 server_address.sin_family = AF_INET;
304 server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
305 // server_address.sin_port = htons(server_ + 3000);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700306 AOS_PCHECK(bind(*server, reinterpret_cast<sockaddr *>(&server_address),
307 sizeof(server_address)));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800308 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700309 *client = AOS_PCHECK(socket(AF_INET, SOCK_DGRAM, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800310 {
311 sockaddr_in client_address;
312 unsigned int length = sizeof(client_address);
Austin Schuhf257f3c2019-10-27 21:00:43 -0700313 AOS_PCHECK(getsockname(
314 *server, reinterpret_cast<sockaddr *>(&client_address), &length));
315 AOS_PCHECK(connect(*client, reinterpret_cast<sockaddr *>(&client_address),
316 length));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800317 }
318 }
319
320 int server_read_, server_write_, client_read_, client_write_;
321};
322
323// Implements ping-pong without copying the data using a condition variable-like
324// interface.
325class ConditionVariablePingPonger : public StaticPingPonger {
326 protected:
327 // Represents a condition variable bundled with a mutex.
328 //
329 // Wait may return spuriously.
330 class ConditionVariableInterface {
331 public:
332 virtual ~ConditionVariableInterface() {}
333
334 // Locks the mutex.
335 virtual void Lock() = 0;
336
337 // Unlocks the mutex.
338 virtual void Unlock() = 0;
339
340 // Waits on the condition variable.
341 //
342 // The mutex must be locked when this is called.
343 virtual void Wait() = 0;
344
345 // Signals the condition variable.
346 //
347 // The mutex does not have to be locked during this.
348 virtual void Signal() = 0;
349 };
350
351 ConditionVariablePingPonger(
352 ::std::unique_ptr<ConditionVariableInterface> ping,
353 ::std::unique_ptr<ConditionVariableInterface> pong)
354 : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
355
356 private:
357 const Data *Ping() override {
358 ping_->Lock();
359 to_server_ = PingData();
360 ping_->Unlock();
361 ping_->Signal();
362 pong_->Lock();
363 while (from_server_ == nullptr) {
364 pong_->Wait();
365 }
366 const Data *r = from_server_;
367 from_server_ = nullptr;
368 pong_->Unlock();
369 return r;
370 }
371
372 const Data *Wait() override {
373 ping_->Lock();
374 while (to_server_ == nullptr) {
375 ping_->Wait();
376 }
377 const Data *r = to_server_;
378 to_server_ = nullptr;
379 ping_->Unlock();
380 return r;
381 }
382
383 void Pong() override {
384 pong_->Lock();
385 from_server_ = PongData();
386 pong_->Unlock();
387 pong_->Signal();
388 }
389
390 const Data *to_server_ = nullptr, *from_server_ = nullptr;
391 const ::std::unique_ptr<ConditionVariableInterface> ping_, pong_;
392};
393
394// Implements ping-pong without copying the data using a semaphore-like
395// interface.
396class SemaphorePingPonger : public StaticPingPonger {
397 protected:
398 // Represents a semaphore, which need only count to 1.
399 //
400 // The behavior when calling Get/Put in anything other than alternating order
401 // is undefined.
402 //
403 // Wait may NOT return spuriously.
404 class SemaphoreInterface {
405 public:
406 virtual ~SemaphoreInterface() {}
407
408 virtual void Get() = 0;
409 virtual void Put() = 0;
410 };
411
412 SemaphorePingPonger(::std::unique_ptr<SemaphoreInterface> ping,
413 ::std::unique_ptr<SemaphoreInterface> pong)
414 : ping_(::std::move(ping)), pong_(::std::move(pong)) {}
415
416 private:
417 const Data *Ping() override {
418 to_server_ = PingData();
419 ping_->Put();
420 pong_->Get();
421 return from_server_;
422 }
423
424 const Data *Wait() override {
425 ping_->Get();
426 return to_server_;
427 }
428
429 void Pong() override {
430 from_server_ = PongData();
431 pong_->Put();
432 }
433
434 const Data *to_server_ = nullptr, *from_server_ = nullptr;
435 const ::std::unique_ptr<SemaphoreInterface> ping_, pong_;
436};
437
Brian Silvermane4d8b282015-12-24 13:44:48 -0800438class AOSMutexPingPonger : public ConditionVariablePingPonger {
439 public:
440 AOSMutexPingPonger()
441 : ConditionVariablePingPonger(
442 ::std::unique_ptr<ConditionVariableInterface>(
443 new AOSConditionVariable()),
444 ::std::unique_ptr<ConditionVariableInterface>(
445 new AOSConditionVariable())) {}
446
447 private:
448 class AOSConditionVariable : public ConditionVariableInterface {
449 public:
450 AOSConditionVariable() : condition_(&mutex_) {}
451
452 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700453 void Lock() override { AOS_CHECK(!mutex_.Lock()); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800454 void Unlock() override { mutex_.Unlock(); }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700455 void Wait() override { AOS_CHECK(!condition_.Wait()); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800456 void Signal() override { condition_.Broadcast(); }
457
458 Mutex mutex_;
459 Condition condition_;
460 };
461};
462
463class AOSEventPingPonger : public SemaphorePingPonger {
464 public:
465 AOSEventPingPonger()
466 : SemaphorePingPonger(
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700467 ::std::unique_ptr<SemaphoreInterface>(new AOSEventSemaphore()),
468 ::std::unique_ptr<SemaphoreInterface>(new AOSEventSemaphore())) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800469
470 private:
471 class AOSEventSemaphore : public SemaphoreInterface {
472 private:
473 void Get() override {
474 event_.Wait();
475 event_.Clear();
476 }
477 void Put() override { event_.Set(); }
478
479 Event event_;
480 };
481};
482
483class PthreadMutexPingPonger : public ConditionVariablePingPonger {
484 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400485 PthreadMutexPingPonger(int pshared, bool pi)
Brian Silvermane4d8b282015-12-24 13:44:48 -0800486 : ConditionVariablePingPonger(
487 ::std::unique_ptr<ConditionVariableInterface>(
Brian Silvermanfd788882016-09-10 16:56:20 -0400488 new PthreadConditionVariable(pshared, pi)),
Brian Silvermane4d8b282015-12-24 13:44:48 -0800489 ::std::unique_ptr<ConditionVariableInterface>(
Brian Silvermanfd788882016-09-10 16:56:20 -0400490 new PthreadConditionVariable(pshared, pi))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800491
492 private:
493 class PthreadConditionVariable : public ConditionVariableInterface {
494 public:
Brian Silvermanfd788882016-09-10 16:56:20 -0400495 PthreadConditionVariable(bool pshared, bool pi) {
496 {
497 pthread_condattr_t cond_attr;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700498 AOS_PRCHECK(pthread_condattr_init(&cond_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400499 if (pshared) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700500 AOS_PRCHECK(
Brian Silvermanfd788882016-09-10 16:56:20 -0400501 pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED));
502 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700503 AOS_PRCHECK(pthread_cond_init(&condition_, &cond_attr));
504 AOS_PRCHECK(pthread_condattr_destroy(&cond_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400505 }
506
507 {
508 pthread_mutexattr_t mutex_attr;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700509 AOS_PRCHECK(pthread_mutexattr_init(&mutex_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400510 if (pshared) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700511 AOS_PRCHECK(pthread_mutexattr_setpshared(&mutex_attr,
512 PTHREAD_PROCESS_SHARED));
Brian Silvermanfd788882016-09-10 16:56:20 -0400513 }
514 if (pi) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700515 AOS_PRCHECK(
Brian Silvermanfd788882016-09-10 16:56:20 -0400516 pthread_mutexattr_setprotocol(&mutex_attr, PTHREAD_PRIO_INHERIT));
517 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700518 AOS_PRCHECK(pthread_mutex_init(&mutex_, nullptr));
519 AOS_PRCHECK(pthread_mutexattr_destroy(&mutex_attr));
Brian Silvermanfd788882016-09-10 16:56:20 -0400520 }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800521 }
522 ~PthreadConditionVariable() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700523 AOS_PRCHECK(pthread_mutex_destroy(&mutex_));
524 AOS_PRCHECK(pthread_cond_destroy(&condition_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800525 }
526
527 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700528 void Lock() override { AOS_PRCHECK(pthread_mutex_lock(&mutex_)); }
529 void Unlock() override { AOS_PRCHECK(pthread_mutex_unlock(&mutex_)); }
530 void Wait() override {
531 AOS_PRCHECK(pthread_cond_wait(&condition_, &mutex_));
532 }
533 void Signal() override { AOS_PRCHECK(pthread_cond_broadcast(&condition_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800534
535 pthread_cond_t condition_;
536 pthread_mutex_t mutex_;
537 };
538};
539
540class EventFDPingPonger : public SemaphorePingPonger {
541 public:
542 EventFDPingPonger()
543 : SemaphorePingPonger(
544 ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore()),
545 ::std::unique_ptr<SemaphoreInterface>(new EventFDSemaphore())) {}
546
547 private:
548 class EventFDSemaphore : public SemaphoreInterface {
549 public:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700550 EventFDSemaphore() : fd_(AOS_PCHECK(eventfd(0, 0))) {}
551 ~EventFDSemaphore() { AOS_PCHECK(close(fd_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800552
553 private:
554 void Get() override {
555 uint64_t value;
556 if (read(fd_, &value, sizeof(value)) != sizeof(value)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700557 AOS_PLOG(FATAL, "reading from eventfd %d failed\n", fd_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800558 }
Austin Schuhf257f3c2019-10-27 21:00:43 -0700559 AOS_CHECK_EQ(1u, value);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800560 }
561 void Put() override {
562 uint64_t value = 1;
563 if (write(fd_, &value, sizeof(value)) != sizeof(value)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700564 AOS_PLOG(FATAL, "writing to eventfd %d failed\n", fd_);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800565 }
566 }
567
568 const int fd_;
569 };
570};
571
572class SysvSemaphorePingPonger : public SemaphorePingPonger {
573 public:
574 SysvSemaphorePingPonger()
575 : SemaphorePingPonger(
576 ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore()),
577 ::std::unique_ptr<SemaphoreInterface>(new SysvSemaphore())) {}
578
579 private:
580 class SysvSemaphore : public SemaphoreInterface {
581 public:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700582 SysvSemaphore() : sem_id_(AOS_PCHECK(semget(IPC_PRIVATE, 1, 0600))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800583
584 private:
585 void Get() override {
586 struct sembuf op;
587 op.sem_num = 0;
588 op.sem_op = -1;
589 op.sem_flg = 0;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700590 AOS_PCHECK(semop(sem_id_, &op, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800591 }
592 void Put() override {
593 struct sembuf op;
594 op.sem_num = 0;
595 op.sem_op = 1;
596 op.sem_flg = 0;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700597 AOS_PCHECK(semop(sem_id_, &op, 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800598 }
599
600 const int sem_id_;
601 };
602};
603
604class PosixSemaphorePingPonger : public SemaphorePingPonger {
605 protected:
606 PosixSemaphorePingPonger(sem_t *ping_sem, sem_t *pong_sem)
607 : SemaphorePingPonger(
608 ::std::unique_ptr<SemaphoreInterface>(new PosixSemaphore(ping_sem)),
609 ::std::unique_ptr<SemaphoreInterface>(
610 new PosixSemaphore(pong_sem))) {}
611
612 private:
613 class PosixSemaphore : public SemaphoreInterface {
614 public:
James Kuszmaul651fc3f2019-05-15 21:14:25 -0700615 PosixSemaphore(sem_t *sem) : sem_(sem) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800616
617 private:
Austin Schuhf257f3c2019-10-27 21:00:43 -0700618 void Get() override { AOS_PCHECK(sem_wait(sem_)); }
619 void Put() override { AOS_PCHECK(sem_post(sem_)); }
Brian Silvermane4d8b282015-12-24 13:44:48 -0800620
621 sem_t *const sem_;
622 };
623};
624
625class SysvQueuePingPonger : public StaticPingPonger {
626 public:
627 SysvQueuePingPonger()
Austin Schuhf257f3c2019-10-27 21:00:43 -0700628 : ping_(AOS_PCHECK(msgget(IPC_PRIVATE, 0600))),
629 pong_(AOS_PCHECK(msgget(IPC_PRIVATE, 0600))) {}
Brian Silvermane4d8b282015-12-24 13:44:48 -0800630
631 const Data *Ping() override {
632 {
633 Message to_send;
634 memcpy(&to_send.data, PingData(), sizeof(Data));
Austin Schuhf257f3c2019-10-27 21:00:43 -0700635 AOS_PCHECK(msgsnd(ping_, &to_send, sizeof(Data), 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800636 }
637 {
638 Message received;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700639 AOS_PCHECK(msgrcv(pong_, &received, sizeof(Data), 1, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800640 memcpy(&pong_received_, &received.data, sizeof(Data));
641 }
642 return &pong_received_;
643 }
644
645 const Data *Wait() override {
646 {
647 Message received;
Austin Schuhf257f3c2019-10-27 21:00:43 -0700648 AOS_PCHECK(msgrcv(ping_, &received, sizeof(Data), 1, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800649 memcpy(&ping_received_, &received.data, sizeof(Data));
650 }
651 return &ping_received_;
652 }
653
654 virtual void Pong() override {
655 Message to_send;
656 memcpy(&to_send.data, PongData(), sizeof(Data));
Austin Schuhf257f3c2019-10-27 21:00:43 -0700657 AOS_PCHECK(msgsnd(pong_, &to_send, sizeof(Data), 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800658 }
659
660 private:
661 struct Message {
662 long mtype = 1;
663 char data[sizeof(Data)];
664 };
665
666 Data ping_received_, pong_received_;
667
668 const int ping_, pong_;
669};
670
671class PosixQueuePingPonger : public StaticPingPonger {
672 public:
673 PosixQueuePingPonger() : ping_(Open("/ping")), pong_(Open("/pong")) {}
674 ~PosixQueuePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700675 AOS_PCHECK(mq_close(ping_));
676 AOS_PCHECK(mq_close(pong_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800677 }
678
679 const Data *Ping() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700680 AOS_PCHECK(mq_send(ping_,
681 static_cast<char *>(static_cast<void *>(PingData())),
682 sizeof(Data), 1));
683 AOS_PCHECK(mq_receive(
684 pong_, static_cast<char *>(static_cast<void *>(&pong_received_)),
685 sizeof(Data), nullptr));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800686 return &pong_received_;
687 }
688
689 const Data *Wait() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700690 AOS_PCHECK(mq_receive(
691 ping_, static_cast<char *>(static_cast<void *>(&ping_received_)),
692 sizeof(Data), nullptr));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800693 return &ping_received_;
694 }
695
696 virtual void Pong() override {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700697 AOS_PCHECK(mq_send(pong_,
698 static_cast<char *>(static_cast<void *>(PongData())),
699 sizeof(Data), 1));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800700 }
701
702 private:
703 mqd_t Open(const char *name) {
704 if (mq_unlink(name) == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700705 AOS_PLOG(FATAL, "mq_unlink(%s) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800706 }
707 struct mq_attr attr;
708 attr.mq_flags = 0;
709 attr.mq_maxmsg = 1;
710 attr.mq_msgsize = sizeof(Data);
711 attr.mq_curmsgs = 0;
712 const mqd_t r = mq_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, &attr);
713 if (r == reinterpret_cast<mqd_t>(-1)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700714 AOS_PLOG(FATAL, "mq_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800715 }
716 return r;
717 }
718
719 const mqd_t ping_, pong_;
720 Data ping_received_, pong_received_;
721};
722
723class PosixUnnamedSemaphorePingPonger : public PosixSemaphorePingPonger {
724 public:
725 PosixUnnamedSemaphorePingPonger(int pshared)
726 : PosixSemaphorePingPonger(&ping_sem_, &pong_sem_) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700727 AOS_PCHECK(sem_init(&ping_sem_, pshared, 0));
728 AOS_PCHECK(sem_init(&pong_sem_, pshared, 0));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800729 }
730 ~PosixUnnamedSemaphorePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700731 AOS_PCHECK(sem_destroy(&ping_sem_));
732 AOS_PCHECK(sem_destroy(&pong_sem_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800733 }
734
735 private:
736 sem_t ping_sem_, pong_sem_;
737};
738
739class PosixNamedSemaphorePingPonger : public PosixSemaphorePingPonger {
740 public:
741 PosixNamedSemaphorePingPonger()
742 : PosixSemaphorePingPonger(ping_sem_ = Open("/ping"),
743 pong_sem_ = Open("/pong")) {}
744 ~PosixNamedSemaphorePingPonger() {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700745 AOS_PCHECK(sem_close(ping_sem_));
746 AOS_PCHECK(sem_close(pong_sem_));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800747 }
748
749 private:
750 sem_t *Open(const char *name) {
751 if (sem_unlink(name) == -1 && errno != ENOENT) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700752 AOS_PLOG(FATAL, "shm_unlink(%s) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800753 }
754 sem_t *const r = sem_open(name, O_CREAT | O_RDWR | O_EXCL, 0600, 0);
755 if (r == SEM_FAILED) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700756 AOS_PLOG(FATAL, "sem_open(%s, O_CREAT | O_RDWR | O_EXCL) failed", name);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800757 }
758 return r;
759 }
760
761 sem_t *ping_sem_, *pong_sem_;
762};
763
Brian Silvermane4d8b282015-12-24 13:44:48 -0800764int Main(int /*argc*/, char **argv) {
765 ::std::unique_ptr<PingPongerInterface> ping_ponger;
766 if (FLAGS_method == "pipe") {
767 ping_ponger.reset(new PipePingPonger());
768 } else if (FLAGS_method == "named_pipe") {
769 ping_ponger.reset(new NamedPipePingPonger());
770 } else if (FLAGS_method == "aos_mutex") {
771 ping_ponger.reset(new AOSMutexPingPonger());
772 } else if (FLAGS_method == "aos_event") {
773 ping_ponger.reset(new AOSEventPingPonger());
774 } else if (FLAGS_method == "pthread_mutex") {
Brian Silvermanfd788882016-09-10 16:56:20 -0400775 ping_ponger.reset(new PthreadMutexPingPonger(false, false));
776 } else if (FLAGS_method == "pthread_mutex_pshared") {
777 ping_ponger.reset(new PthreadMutexPingPonger(true, false));
778 } else if (FLAGS_method == "pthread_mutex_pshared_pi") {
779 ping_ponger.reset(new PthreadMutexPingPonger(true, true));
780 } else if (FLAGS_method == "pthread_mutex_pi") {
781 ping_ponger.reset(new PthreadMutexPingPonger(false, true));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800782 } else if (FLAGS_method == "eventfd") {
783 ping_ponger.reset(new EventFDPingPonger());
784 } else if (FLAGS_method == "sysv_semaphore") {
785 ping_ponger.reset(new SysvSemaphorePingPonger());
786 } else if (FLAGS_method == "sysv_queue") {
787 ping_ponger.reset(new SysvQueuePingPonger());
788 } else if (FLAGS_method == "posix_semaphore_unnamed_shared") {
789 ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(1));
790 } else if (FLAGS_method == "posix_semaphore_unnamed_unshared") {
791 ping_ponger.reset(new PosixUnnamedSemaphorePingPonger(0));
792 } else if (FLAGS_method == "posix_semaphore_named") {
793 ping_ponger.reset(new PosixNamedSemaphorePingPonger());
794 } else if (FLAGS_method == "posix_queue") {
795 ping_ponger.reset(new PosixQueuePingPonger());
796 } else if (FLAGS_method == "unix_stream") {
797 ping_ponger.reset(new UnixPingPonger(SOCK_STREAM));
798 } else if (FLAGS_method == "unix_datagram") {
799 ping_ponger.reset(new UnixPingPonger(SOCK_DGRAM));
800 } else if (FLAGS_method == "unix_seqpacket") {
801 ping_ponger.reset(new UnixPingPonger(SOCK_SEQPACKET));
802 } else if (FLAGS_method == "tcp") {
Brian Silvermanfd788882016-09-10 16:56:20 -0400803 ping_ponger.reset(new TCPPingPonger(false));
804 } else if (FLAGS_method == "tcp_nodelay") {
805 ping_ponger.reset(new TCPPingPonger(true));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800806 } else if (FLAGS_method == "udp") {
807 ping_ponger.reset(new UDPPingPonger());
808 } else {
809 fprintf(stderr, "Unknown IPC method to test '%s'\n", FLAGS_method.c_str());
810 ::gflags::ShowUsageWithFlags(argv[0]);
811 return 1;
812 }
813
Brian Silverman1d42ce22016-09-10 16:55:40 -0400814 ::std::atomic<bool> done{false};
Brian Silvermane4d8b282015-12-24 13:44:48 -0800815
816 ::std::thread server([&ping_ponger, &done]() {
817 if (FLAGS_server_priority > 0) {
818 SetCurrentThreadRealtimePriority(FLAGS_server_priority);
819 }
Austin Schuh9014e3b2020-11-21 14:26:07 -0800820 SetCurrentThreadAffinity(MakeCpusetFromCpus({FLAGS_server_cpu}));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800821
822 while (!done) {
823 const PingPongerInterface::Data *data = ping_ponger->Wait();
824 PingPongerInterface::Data *response = ping_ponger->PongData();
825 for (size_t i = 0; i < sizeof(*data); ++i) {
826 (*response)[i] = (*data)[i] + 1;
827 }
828 ping_ponger->Pong();
829 }
830 });
831
832 if (FLAGS_client_priority > 0) {
833 SetCurrentThreadRealtimePriority(FLAGS_client_priority);
834 }
Austin Schuh9014e3b2020-11-21 14:26:07 -0800835 SetCurrentThreadAffinity(MakeCpusetFromCpus({FLAGS_client_cpu}));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800836
837 // Warm everything up.
838 for (int i = 0; i < 1000; ++i) {
839 PingPongerInterface::Data *warmup_data = ping_ponger->PingData();
840 memset(*warmup_data, i % 255, sizeof(*warmup_data));
841 ping_ponger->Ping();
842 }
843
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800844 const monotonic_clock::time_point start = monotonic_clock::now();
Brian Silvermane4d8b282015-12-24 13:44:48 -0800845
846 for (int32_t i = 0; i < FLAGS_messages; ++i) {
847 PingPongerInterface::Data *to_send = ping_ponger->PingData();
848 memset(*to_send, i % 123, sizeof(*to_send));
849 const PingPongerInterface::Data *received = ping_ponger->Ping();
850 for (size_t ii = 0; ii < sizeof(*received); ++ii) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700851 AOS_CHECK_EQ(((i % 123) + 1) % 255, (*received)[ii]);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800852 }
853 }
854
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800855 const monotonic_clock::time_point end = monotonic_clock::now();
Brian Silvermane4d8b282015-12-24 13:44:48 -0800856
Brian Silverman1d42ce22016-09-10 16:55:40 -0400857 // Try to make sure the server thread gets past its check of done so our
858 // Ping() down below doesn't hang. Kind of lame, but doing better would
859 // require complicating the interface to each implementation which isn't worth
860 // it here.
861 ::std::this_thread::sleep_for(::std::chrono::milliseconds(200));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800862 done = true;
863 ping_ponger->PingData();
864 ping_ponger->Ping();
865 server.join();
866
Austin Schuhf257f3c2019-10-27 21:00:43 -0700867 AOS_LOG(INFO, "Took %f seconds to send %" PRId32 " messages\n",
868 ::aos::time::DurationInSeconds(end - start), FLAGS_messages);
Austin Schuhf2a50ba2016-12-24 16:16:26 -0800869 const chrono::nanoseconds per_message = (end - start) / FLAGS_messages;
870 if (per_message >= chrono::seconds(1)) {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700871 AOS_LOG(INFO, "More than 1 second per message ?!?\n");
Brian Silvermane4d8b282015-12-24 13:44:48 -0800872 } else {
Austin Schuhf257f3c2019-10-27 21:00:43 -0700873 AOS_LOG(INFO, "That is %" PRId32 " nanoseconds per message\n",
874 static_cast<int>(per_message.count()));
Brian Silvermane4d8b282015-12-24 13:44:48 -0800875 }
876
877 return 0;
878}
879
880} // namespace aos
881
882int main(int argc, char **argv) {
883 ::gflags::SetUsageMessage(
884 ::std::string("Compares various forms of IPC. Usage:\n") + argv[0] +
885 " --method=METHOD\n"
886 "METHOD can be one of the following:\n"
887 "\tpipe\n"
888 "\tnamed_pipe\n"
889 "\taos_mutex\n"
890 "\taos_event\n"
891 "\tpthread_mutex\n"
Brian Silvermanfd788882016-09-10 16:56:20 -0400892 "\tpthread_mutex_pshared\n"
893 "\tpthread_mutex_pshared_pi\n"
894 "\tpthread_mutex_pi\n"
Brian Silvermane4d8b282015-12-24 13:44:48 -0800895 "\teventfd\n"
896 "\tsysv_semaphore\n"
897 "\tsysv_queue\n"
898 "\tposix_semaphore_unnamed_shared\n"
899 "\tposix_semaphore_unnamed_unshared\n"
900 "\tposix_semaphore_named\n"
901 "\tposix_queue\n"
902 "\tunix_stream\n"
903 "\tunix_datagram\n"
904 "\tunix_seqpacket\n"
905 "\ttcp\n"
Brian Silvermanfd788882016-09-10 16:56:20 -0400906 "\ttcp_nodelay\n"
Brian Silvermane4d8b282015-12-24 13:44:48 -0800907 "\tudp\n");
Austin Schuh094d09b2020-11-20 23:26:52 -0800908 ::aos::InitGoogle(&argc, &argv);
Brian Silvermane4d8b282015-12-24 13:44:48 -0800909
910 return ::aos::Main(argc, argv);
911}