blob: ffbfb06c5b8d36d405bed1eefe94c8fff18445fc [file] [log] [blame]
Austin Schuh5af45eb2019-09-16 20:54:18 -07001#include "gflags/gflags.h"
2
3#include <signal.h>
4#include <sys/signalfd.h>
5#include <chrono>
6#include <random>
7#include <thread>
8
9#include "aos/events/epoll.h"
10#include "aos/init.h"
11#include "aos/ipc_lib/latency_lib.h"
12#include "aos/logging/implementations.h"
13#include "aos/logging/logging.h"
14#include "aos/realtime.h"
15#include "aos/time/time.h"
16
17// This is a demo program which uses Real-Time posix signals to communicate.
18// It measures both latency of a random timer thread, and latency of the
19// signals.
20//
21// To enable function graph:
22// echo "function_graph" > current_tracer
23
24DEFINE_bool(sender, true, "If true, send signals to the other process.");
25DEFINE_int32(other_pid, -1, "PID of other process to ping");
26DEFINE_int32(seconds, 10, "Duration of the test to run");
27DEFINE_int32(
28 latency_threshold, 1000,
29 "Disable tracing when anything takes more than this many microseoncds");
30DEFINE_int32(core, 7, "Core to pin to");
31DEFINE_int32(sender_priority, 53, "RT priority to send at");
32DEFINE_int32(receiver_priority, 52, "RT priority to receive at");
33DEFINE_int32(timer_priority, 51, "RT priority to spin the timer at");
34
35DEFINE_bool(log_latency, false, "If true, log the latency");
36
37const uint32_t kSignalNumber = SIGRTMIN + 1;
38const uint32_t kQuitSignalNumber = SIGRTMIN + 2;
39
40namespace chrono = ::std::chrono;
41
42namespace aos {
43
44void SenderThread() {
45 const monotonic_clock::time_point end_time =
46 monotonic_clock::now() + chrono::seconds(FLAGS_seconds);
47 // Standard mersenne_twister_engine seeded with 0
48 ::std::mt19937 generator(0);
49
50 // Sleep between 1 and 15 ms.
51 ::std::uniform_int_distribution<> distribution(1000, 15000);
52
53 int pid = getpid();
54 if (FLAGS_other_pid != -1) {
55 pid = FLAGS_other_pid;
56 }
57 AOS_LOG(INFO, "Current PID: %d\n", pid);
58
59 PinCurrentThreadToCPU(FLAGS_core);
60 SetCurrentThreadRealtimePriority(FLAGS_sender_priority);
61 while (true) {
62 const monotonic_clock::time_point wakeup_time =
63 monotonic_clock::now() + chrono::microseconds(distribution(generator));
64
65 ::std::this_thread::sleep_until(wakeup_time);
66 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
67 sigval s;
68 s.sival_int = static_cast<uint32_t>(
69 static_cast<uint64_t>(monotonic_now.time_since_epoch().count()) &
70 0xfffffffful);
71
72 PCHECK(sigqueue(pid, kSignalNumber, s));
73
74 if (monotonic_now > end_time) {
75 break;
76 }
77 }
78
79 {
80 sigval s;
81 s.sival_int = 0;
82 PCHECK(sigqueue(pid, kQuitSignalNumber, s));
83 }
84 UnsetCurrentThreadRealtimePriority();
85}
86
87void ReceiverThread() {
88 int signalfd_fd;
89 Tracing t;
90 t.Start();
91
92 sigset_t x;
93 sigemptyset(&x);
94 sigaddset(&x, kSignalNumber);
95 sigaddset(&x, kQuitSignalNumber);
96
97 PCHECK(signalfd_fd = signalfd(-1, &x, SFD_NONBLOCK | SFD_CLOEXEC));
98 chrono::nanoseconds max_wakeup_latency = chrono::nanoseconds(0);
99
100 chrono::nanoseconds sum_latency = chrono::nanoseconds(0);
101 int latency_count = 0;
102
103 internal::EPoll epoll;
104
105 epoll.OnReadable(signalfd_fd, [&t, &epoll, &max_wakeup_latency, &sum_latency,
106 &latency_count, signalfd_fd]() {
107 const monotonic_clock::time_point monotonic_now = monotonic_clock::now();
108 signalfd_siginfo si;
109 const int ret =
110 read(signalfd_fd, static_cast<void *>(&si), sizeof(signalfd_siginfo));
111 CHECK_EQ(ret, static_cast<int>(sizeof(signalfd_siginfo)));
112
113 if (si.ssi_signo == kQuitSignalNumber) {
114 epoll.Quit();
115 return;
116 }
117
118 int64_t wakeup_latency_int64 =
119 static_cast<int64_t>(monotonic_now.time_since_epoch().count()) &
120 0xfffffffful;
121
122 wakeup_latency_int64 -= static_cast<int64_t>(si.ssi_int);
123
124 if (wakeup_latency_int64 > 0x80000000l) {
125 wakeup_latency_int64 -= 0x100000000l;
126 }
127
128 const chrono::nanoseconds wakeup_latency(wakeup_latency_int64);
129
130 sum_latency += wakeup_latency;
131 ++latency_count;
132
133 max_wakeup_latency = ::std::max(wakeup_latency, max_wakeup_latency);
134
135 if (wakeup_latency > chrono::microseconds(FLAGS_latency_threshold)) {
136 t.Stop();
137 AOS_LOG(INFO, "Stopped tracing, latency %" PRId64 "\n",
138 static_cast<int64_t>(wakeup_latency.count()));
139 }
140
141 if (FLAGS_log_latency) {
142 AOS_LOG(INFO, "signo: %d, sending pid: %d, dt: %8d.%03d\n", si.ssi_signo,
143 si.ssi_pid, static_cast<int>(wakeup_latency_int64 / 1000),
144 static_cast<int>(wakeup_latency_int64 % 1000));
145 }
146 });
147
148 PinCurrentThreadToCPU(FLAGS_core);
149 SetCurrentThreadRealtimePriority(FLAGS_receiver_priority);
150 epoll.Run();
151 UnsetCurrentThreadRealtimePriority();
152 epoll.DeleteFd(signalfd_fd);
153
154 const chrono::nanoseconds average_latency = sum_latency / latency_count;
155
156 AOS_LOG(INFO,
157 "Max signal wakeup latency: %d.%03d microseconds, average: %d.%03d "
158 "microseconds\n",
159 static_cast<int>(max_wakeup_latency.count() / 1000),
160 static_cast<int>(max_wakeup_latency.count() % 1000),
161 static_cast<int>(average_latency.count() / 1000),
162 static_cast<int>(average_latency.count() % 1000));
163
164 PCHECK(close(signalfd_fd));
165}
166
167int Main(int /*argc*/, char ** /*argv*/) {
168 sigset_t x;
169 sigemptyset(&x);
170 sigaddset(&x, kSignalNumber);
171 sigaddset(&x, kQuitSignalNumber);
172 pthread_sigmask(SIG_BLOCK, &x, NULL);
173
174 AOS_LOG(INFO, "Main!\n");
175 ::std::thread t([]() {
176 TimerThread(monotonic_clock::now() + chrono::seconds(FLAGS_seconds),
177 FLAGS_timer_priority);
178 });
179
180 ::std::thread st([]() { SenderThread(); });
181
182 ReceiverThread();
183 st.join();
184
185 t.join();
186 return 0;
187}
188
189} // namespace aos
190
191int main(int argc, char **argv) {
192 ::gflags::ParseCommandLineFlags(&argc, &argv, true);
193
194 ::aos::logging::Init();
195 ::aos::logging::SetImplementation(
196 new ::aos::logging::StreamLogImplementation(stdout));
197
198 return ::aos::Main(argc, argv);
199}