blob: 6683f590655b529356b8ea5bb4dadcd32a61e32e [file] [log] [blame]
Austin Schuh5af45eb2019-09-16 20:54:18 -07001#include "gflags/gflags.h"
2
3#include <sys/stat.h>
4#include <sys/types.h>
5#include <chrono>
6#include <random>
7#include <thread>
8
9#include "aos/events/epoll.h"
10#include "aos/init.h"
11#include "aos/ipc_lib/latency_lib.h"
12#include "aos/logging/implementations.h"
13#include "aos/logging/logging.h"
14#include "aos/realtime.h"
15#include "aos/time/time.h"
16
17// This is a demo program which uses named pipes to communicate.
18// It measures both latency of a random timer thread, and latency of the
19// pipe.
20
21DEFINE_bool(sender, true, "If true, send signals to the other process.");
22DEFINE_string(fifo, "/dev/shm/aos/named_pipe_latency", "FIFO to use for the test.");
23DEFINE_int32(seconds, 10, "Duration of the test to run");
24DEFINE_int32(
25 latency_threshold, 1000,
26 "Disable tracing when anything takes more than this many microseoncds");
27DEFINE_int32(core, 7, "Core to pin to");
28DEFINE_int32(sender_priority, 53, "RT priority to send at");
29DEFINE_int32(receiver_priority, 52, "RT priority to receive at");
30DEFINE_int32(timer_priority, 51, "RT priority to spin the timer at");
31
32DEFINE_bool(log_latency, false, "If true, log the latency");
33
34namespace chrono = ::std::chrono;
35
36namespace aos {
37
38void SenderThread() {
39 int pipefd = open(FLAGS_fifo.c_str(), FD_CLOEXEC | O_NONBLOCK | O_WRONLY | O_NOATIME);
40 const monotonic_clock::time_point end_time =
41 monotonic_clock::now() + chrono::seconds(FLAGS_seconds);
42 // Standard mersenne_twister_engine seeded with 0
43 ::std::mt19937 generator(0);
44
45 // Sleep between 1 and 15 ms.
46 ::std::uniform_int_distribution<> distribution(1000, 15000);
47
48 PinCurrentThreadToCPU(FLAGS_core);
49 SetCurrentThreadRealtimePriority(FLAGS_sender_priority);
50 while (true) {
51 const monotonic_clock::time_point wakeup_time =
52 monotonic_clock::now() + chrono::microseconds(distribution(generator));
53
54 ::std::this_thread::sleep_until(wakeup_time);
55 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
56 char sent_time_buffer[8];
57 memcpy(sent_time_buffer, &monotonic_now, sizeof(sent_time_buffer));
58 PCHECK(write(pipefd, static_cast<void *>(sent_time_buffer),
59 sizeof(sent_time_buffer)));
60
61 if (monotonic_now > end_time) {
62 break;
63 }
64 }
65
66 {
67 char sent_time_buffer[8];
68 memset(sent_time_buffer, 0, sizeof(sent_time_buffer));
69 PCHECK(write(pipefd, static_cast<void *>(sent_time_buffer),
70 sizeof(sent_time_buffer)));
71 }
72 UnsetCurrentThreadRealtimePriority();
73
74 PCHECK(close(pipefd));
75}
76
77void ReceiverThread() {
78 int pipefd = open(FLAGS_fifo.c_str(), O_CLOEXEC | O_NONBLOCK | O_RDONLY | O_NOATIME);
79 Tracing t;
80 t.Start();
81
82 chrono::nanoseconds max_wakeup_latency = chrono::nanoseconds(0);
83
84 chrono::nanoseconds sum_latency = chrono::nanoseconds(0);
85 int latency_count = 0;
86
87 internal::EPoll epoll;
88
89 epoll.OnReadable(pipefd, [&t, &epoll, &max_wakeup_latency, &sum_latency,
90 &latency_count, pipefd]() {
91 char sent_time_buffer[8];
92 const int ret = read(pipefd, static_cast<void *>(sent_time_buffer),
93 sizeof(sent_time_buffer));
94 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
95 CHECK_EQ(ret, 8);
96
97 monotonic_clock::time_point sent_time;
98 memcpy(&sent_time, sent_time_buffer, sizeof(sent_time_buffer));
99
100 if (sent_time == monotonic_clock::epoch()) {
101 epoll.Quit();
102 return;
103 }
104
105 const chrono::nanoseconds wakeup_latency = monotonic_now - sent_time;
106
107 sum_latency += wakeup_latency;
108 ++latency_count;
109
110 max_wakeup_latency = ::std::max(wakeup_latency, max_wakeup_latency);
111
112 if (wakeup_latency > chrono::microseconds(FLAGS_latency_threshold)) {
113 t.Stop();
114 AOS_LOG(INFO, "Stopped tracing, latency %" PRId64 "\n",
115 static_cast<int64_t>(wakeup_latency.count()));
116 }
117
118 if (FLAGS_log_latency) {
119 AOS_LOG(INFO, "dt: %8d.%03d\n",
120 static_cast<int>(wakeup_latency.count() / 1000),
121 static_cast<int>(wakeup_latency.count() % 1000));
122 }
123 });
124
125 PinCurrentThreadToCPU(FLAGS_core);
126 SetCurrentThreadRealtimePriority(FLAGS_receiver_priority);
127 epoll.Run();
128 UnsetCurrentThreadRealtimePriority();
129 epoll.DeleteFd(pipefd);
130
131 const chrono::nanoseconds average_latency = sum_latency / latency_count;
132
133 AOS_LOG(
134 INFO,
135 "Max named pip wakeup latency: %d.%03d microseconds, average: %d.%03d "
136 "microseconds\n",
137 static_cast<int>(max_wakeup_latency.count() / 1000),
138 static_cast<int>(max_wakeup_latency.count() % 1000),
139 static_cast<int>(average_latency.count() / 1000),
140 static_cast<int>(average_latency.count() % 1000));
141
142 PCHECK(close(pipefd));
143}
144
145int Main(int /*argc*/, char ** /*argv*/) {
146 mkfifo(FLAGS_fifo.c_str(), 0777);
147
148 AOS_LOG(INFO, "Main!\n");
149 ::std::thread t([]() {
150 TimerThread(monotonic_clock::now() + chrono::seconds(FLAGS_seconds),
151 FLAGS_timer_priority);
152 });
153
154 ::std::thread st(
155 []() { SenderThread(); });
156
157 ReceiverThread();
158 st.join();
159
160 t.join();
161 return 0;
162}
163
164} // namespace aos
165
166int main(int argc, char **argv) {
167 ::gflags::ParseCommandLineFlags(&argc, &argv, true);
168
169 ::aos::logging::Init();
170 ::aos::logging::SetImplementation(
171 new ::aos::logging::StreamLogImplementation(stdout));
172
173 return ::aos::Main(argc, argv);
174}