blob: db762dedad22a9baaf9455839e7dc6387e704a5a [file] [log] [blame]
Adam Snaidere104ebf2023-06-05 17:42:06 -07001#include <chrono>
2
3#include "gflags/gflags.h"
4#include "glog/logging.h"
5
6#include "aos/events/shm_event_loop.h"
7#include "aos/init.h"
8#include "aos/network/sctp_client.h"
Adam Snaider96a0f4b2023-05-18 20:41:19 -07009#include "aos/network/sctp_lib.h"
Adam Snaidere104ebf2023-06-05 17:42:06 -070010#include "aos/network/sctp_server.h"
11
Adam Snaider13d48d92023-08-03 12:20:15 -070012// The casts required to read datastructures from sockets trip - Wcast - align.
13#ifdef __clang
14#pragma clang diagnostic ignored "-Wcast-align"
15#endif
16
Adam Snaidere104ebf2023-06-05 17:42:06 -070017DEFINE_string(config, "aos_config.json", "Path to the config.");
18DEFINE_uint32(port, 1323, "Port to run the sctp test on");
19DEFINE_uint32(payload_size, 1000, "Size of data to send in bytes");
20DEFINE_uint32(ttl, 0, "TTL in milliseconds");
21DEFINE_uint32(rx_size, 1000000,
22 "RX buffer size to set the max size to be in bytes.");
23DEFINE_string(host, "", "Server host (acts as server if unspecified)");
24
25DEFINE_bool(client, false,
26 "If true, then act as a client, otherwise act as a server");
27DEFINE_uint32(skip_first_n, 10,
28 "Skip the first 'n' messages when computing statistics.");
29
Adam Snaider96a0f4b2023-05-18 20:41:19 -070030DEFINE_string(sctp_auth_key_file, "",
31 "When set, use the provided key for SCTP authentication as "
32 "defined in RFC 4895");
Adam Snaider96a0f4b2023-05-18 20:41:19 -070033
Adam Snaidere104ebf2023-06-05 17:42:06 -070034DECLARE_bool(die_on_malloc);
35
36namespace aos::message_bridge::perf {
37
Adam Snaider96a0f4b2023-05-18 20:41:19 -070038namespace {
39
40using util::ReadFileToVecOrDie;
41
Adam Snaider9bb33442023-06-26 16:31:37 -070042SctpAuthMethod SctpAuthMethod() {
43 return FLAGS_sctp_auth_key_file.empty() ? SctpAuthMethod::kNoAuth
44 : SctpAuthMethod::kAuth;
45}
46
Adam Snaider96a0f4b2023-05-18 20:41:19 -070047std::vector<uint8_t> GetSctpAuthKey() {
Adam Snaider9bb33442023-06-26 16:31:37 -070048 if (SctpAuthMethod() == SctpAuthMethod::kNoAuth) {
49 return {};
Adam Snaider96a0f4b2023-05-18 20:41:19 -070050 }
Adam Snaider9bb33442023-06-26 16:31:37 -070051 return ReadFileToVecOrDie(FLAGS_sctp_auth_key_file);
Adam Snaider96a0f4b2023-05-18 20:41:19 -070052}
53
54} // namespace
55
Adam Snaidere104ebf2023-06-05 17:42:06 -070056namespace chrono = std::chrono;
57
58class Server {
59 public:
60 Server(aos::ShmEventLoop *event_loop)
Adam Snaider96a0f4b2023-05-18 20:41:19 -070061 : event_loop_(event_loop),
Adam Snaider9bb33442023-06-26 16:31:37 -070062 server_(2, "0.0.0.0", FLAGS_port, SctpAuthMethod()) {
63 server_.SetAuthKey(GetSctpAuthKey());
Adam Snaidere104ebf2023-06-05 17:42:06 -070064 event_loop_->epoll()->OnReadable(server_.fd(),
65 [this]() { MessageReceived(); });
66 server_.SetMaxReadSize(FLAGS_rx_size + 100);
67 server_.SetMaxWriteSize(FLAGS_rx_size + 100);
68
69 event_loop_->SetRuntimeRealtimePriority(5);
70 }
71
72 ~Server() { event_loop_->epoll()->DeleteFd(server_.fd()); }
73
74 void SendMessage(std::string_view message) {
75 if (sac_assoc_id_ == 0) {
76 LOG(INFO) << "Lost connection to client. Not sending";
77 return;
78 }
79 if (server_.Send(message, sac_assoc_id_, 0, FLAGS_ttl)) {
80 LOG(INFO) << "Server reply with " << message.size() << "B";
81 } else {
82 PLOG(FATAL) << "Failed to send";
83 }
84 }
85
86 void MessageReceived() {
87 LOG(INFO) << "Received message";
88 aos::unique_c_ptr<Message> message = server_.Read();
89 if (!message) {
90 return;
91 }
92
93 if (message->message_type == Message::kNotification) {
94 const union sctp_notification *snp =
95 (const union sctp_notification *)message->data();
96
97 if (VLOG_IS_ON(2)) {
98 PrintNotification(message.get());
99 }
100
101 switch (snp->sn_header.sn_type) {
102 case SCTP_ASSOC_CHANGE: {
103 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
104 switch (sac->sac_state) {
105 case SCTP_COMM_UP:
106 NodeConnected(sac->sac_assoc_id);
107 VLOG(1) << "Peer connected";
108 break;
109 case SCTP_COMM_LOST:
110 case SCTP_SHUTDOWN_COMP:
111 case SCTP_CANT_STR_ASSOC:
112 NodeDisconnected(sac->sac_assoc_id);
113 VLOG(1) << "Disconnect";
114 break;
115 case SCTP_RESTART:
116 LOG(FATAL) << "Never seen this before.";
117 break;
118 }
119 } break;
120 }
121 } else if (message->message_type == Message::kMessage) {
122 SendMessage(
123 std::string_view((const char *)message->data(), message->size));
124 }
125 }
126
127 void NodeConnected(sctp_assoc_t assoc_id) {
128 sac_assoc_id_ = assoc_id;
129 server_.SetPriorityScheduler(assoc_id);
130 }
131 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) { sac_assoc_id_ = 0; }
132
133 private:
134 sctp_assoc_t sac_assoc_id_ = 0;
135 aos::ShmEventLoop *event_loop_;
136 SctpServer server_;
137};
138
139class Client {
140 public:
141 Client(aos::ShmEventLoop *event_loop)
Adam Snaider96a0f4b2023-05-18 20:41:19 -0700142 : event_loop_(event_loop),
143 client_(FLAGS_host, FLAGS_port, 2, "0.0.0.0", FLAGS_port,
Adam Snaider9bb33442023-06-26 16:31:37 -0700144 SctpAuthMethod()) {
145 client_.SetAuthKey(GetSctpAuthKey());
Adam Snaidere104ebf2023-06-05 17:42:06 -0700146 client_.SetMaxReadSize(FLAGS_rx_size + 100);
147 client_.SetMaxWriteSize(FLAGS_rx_size + 100);
148
149 timer_ = event_loop_->AddTimer([this]() { Ping(); });
150
151 event_loop_->OnRun([this]() {
152 timer_->Schedule(event_loop_->monotonic_now(),
153 chrono::milliseconds(1000));
154 });
155
156 event_loop_->epoll()->OnReadable(client_.fd(),
157 [this]() { MessageReceived(); });
158 event_loop_->SetRuntimeRealtimePriority(5);
159 }
160
161 ~Client() { event_loop_->epoll()->DeleteFd(client_.fd()); }
162
163 void Ping() {
164 std::string payload(FLAGS_payload_size, 'a');
165 sent_time_ = aos::monotonic_clock::now();
166 if (client_.Send(0, payload, FLAGS_ttl)) {
167 LOG(INFO) << "Sending " << payload.size() << "B";
168 } else {
169 PLOG(ERROR) << "Failed to send";
170 }
171 }
172
173 void MessageReceived() {
174 aos::unique_c_ptr<Message> message = client_.Read();
175 if (!message) {
176 return;
177 }
178
179 if (message->message_type == Message::kNotification) {
180 const union sctp_notification *snp =
181 (const union sctp_notification *)message->data();
182
183 if (VLOG_IS_ON(2)) {
184 PrintNotification(message.get());
185 }
186
187 switch (snp->sn_header.sn_type) {
188 case SCTP_ASSOC_CHANGE: {
189 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
190 switch (sac->sac_state) {
191 case SCTP_COMM_UP:
192 NodeConnected(sac->sac_assoc_id);
193 VLOG(1) << "Peer connected";
194 break;
195 case SCTP_COMM_LOST:
196 case SCTP_SHUTDOWN_COMP:
197 case SCTP_CANT_STR_ASSOC:
198 NodeDisconnected(sac->sac_assoc_id);
199 VLOG(1) << "Disconnect";
200 break;
201 case SCTP_RESTART:
202 LOG(FATAL) << "Never seen this before.";
203 break;
204 }
205 } break;
206 }
207 } else if (message->message_type == Message::kMessage) {
208 HandleData(message.get());
209 }
210 }
211
212 void NodeConnected(sctp_assoc_t assoc_id) {
213 client_.SetPriorityScheduler(assoc_id);
214 }
215 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) {}
216
217 void HandleData(const Message *) {
218 count_++;
219 if (count_ <= 0) {
220 LOG(INFO) << "Got message: Skipping " << -count_;
221 return;
222 }
223 auto elapsed = aos::monotonic_clock::now() - sent_time_;
224 double elapsed_secs =
225 std::chrono::duration_cast<std::chrono::duration<double>>(elapsed)
226 .count();
227 avg_latency_ = (avg_latency_ * (count_ - 1) + elapsed_secs) / count_;
228 // average one-way throughput
229 double throughput = FLAGS_payload_size * 2.0 / elapsed_secs;
230 double avg_throughput = FLAGS_payload_size * 2.0 / avg_latency_;
231 printf(
Adam Snaider96a0f4b2023-05-18 20:41:19 -0700232 "Round trip: %.2fms | %.2f KB/s | Avg RTL: %.2fms | %.2f KB/s | "
233 "Count: %d\n",
Adam Snaidere104ebf2023-06-05 17:42:06 -0700234 elapsed_secs * 1000, throughput / 1024, avg_latency_ * 1000,
235 avg_throughput / 1024, count_);
236 }
237
238 private:
239 aos::ShmEventLoop *event_loop_;
240 SctpClient client_;
241 aos::TimerHandler *timer_;
242 double avg_latency_ = 0.0;
243 int count_ = -FLAGS_skip_first_n;
244
245 aos::monotonic_clock::time_point sent_time_;
246};
247
248int Main() {
249 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
250 aos::configuration::ReadConfig(FLAGS_config);
251
252 aos::ShmEventLoop event_loop(&config.message());
253 if (FLAGS_client) {
254 CHECK(!FLAGS_host.empty()) << "Client Usage: `sctp_perf --client --host "
255 "abc.com --payload_size [bytes] "
256 "[--port PORT] [--config PATH]`";
257
258 Client client(&event_loop);
259 event_loop.Run();
260 } else {
261 CHECK(FLAGS_host.empty()) << "Server Usage: `sctp_perf [--config PATH]`";
262 Server server(&event_loop);
263 event_loop.Run();
264 }
265
266 return EXIT_SUCCESS;
267}
268
269} // namespace aos::message_bridge::perf
270
271int main(int argc, char **argv) {
272 gflags::SetUsageMessage(absl::StrCat(
273 "Measure SCTP performance\n", " Server Usage: `sctp_perf`\n",
274 " Client Usage: `sctp_perf --client --host abc.com`\n"));
275 aos::InitGoogle(&argc, &argv);
276
277 // Client and server need to malloc.
278 FLAGS_die_on_malloc = false;
279 return aos::message_bridge::perf::Main();
280}