blob: 5201f47f04d359b158a5850918e9f4acace61f05 [file] [log] [blame]
Adam Snaidere104ebf2023-06-05 17:42:06 -07001#include <chrono>
2
3#include "gflags/gflags.h"
4#include "glog/logging.h"
5
6#include "aos/events/shm_event_loop.h"
7#include "aos/init.h"
8#include "aos/network/sctp_client.h"
Adam Snaider96a0f4b2023-05-18 20:41:19 -07009#include "aos/network/sctp_lib.h"
Adam Snaidere104ebf2023-06-05 17:42:06 -070010#include "aos/network/sctp_server.h"
11
12DEFINE_string(config, "aos_config.json", "Path to the config.");
13DEFINE_uint32(port, 1323, "Port to run the sctp test on");
14DEFINE_uint32(payload_size, 1000, "Size of data to send in bytes");
15DEFINE_uint32(ttl, 0, "TTL in milliseconds");
16DEFINE_uint32(rx_size, 1000000,
17 "RX buffer size to set the max size to be in bytes.");
18DEFINE_string(host, "", "Server host (acts as server if unspecified)");
19
20DEFINE_bool(client, false,
21 "If true, then act as a client, otherwise act as a server");
22DEFINE_uint32(skip_first_n, 10,
23 "Skip the first 'n' messages when computing statistics.");
24
Adam Snaider96a0f4b2023-05-18 20:41:19 -070025DEFINE_string(sctp_auth_key_file, "",
26 "When set, use the provided key for SCTP authentication as "
27 "defined in RFC 4895");
Adam Snaider96a0f4b2023-05-18 20:41:19 -070028
Adam Snaidere104ebf2023-06-05 17:42:06 -070029DECLARE_bool(die_on_malloc);
30
31namespace aos::message_bridge::perf {
32
Adam Snaider96a0f4b2023-05-18 20:41:19 -070033namespace {
34
35using util::ReadFileToVecOrDie;
36
Adam Snaider9bb33442023-06-26 16:31:37 -070037SctpAuthMethod SctpAuthMethod() {
38 return FLAGS_sctp_auth_key_file.empty() ? SctpAuthMethod::kNoAuth
39 : SctpAuthMethod::kAuth;
40}
41
Adam Snaider96a0f4b2023-05-18 20:41:19 -070042std::vector<uint8_t> GetSctpAuthKey() {
Adam Snaider9bb33442023-06-26 16:31:37 -070043 if (SctpAuthMethod() == SctpAuthMethod::kNoAuth) {
44 return {};
Adam Snaider96a0f4b2023-05-18 20:41:19 -070045 }
Adam Snaider9bb33442023-06-26 16:31:37 -070046 return ReadFileToVecOrDie(FLAGS_sctp_auth_key_file);
Adam Snaider96a0f4b2023-05-18 20:41:19 -070047}
48
49} // namespace
50
Adam Snaidere104ebf2023-06-05 17:42:06 -070051namespace chrono = std::chrono;
52
53class Server {
54 public:
55 Server(aos::ShmEventLoop *event_loop)
Adam Snaider96a0f4b2023-05-18 20:41:19 -070056 : event_loop_(event_loop),
Adam Snaider9bb33442023-06-26 16:31:37 -070057 server_(2, "0.0.0.0", FLAGS_port, SctpAuthMethod()) {
58 server_.SetAuthKey(GetSctpAuthKey());
Adam Snaidere104ebf2023-06-05 17:42:06 -070059 event_loop_->epoll()->OnReadable(server_.fd(),
60 [this]() { MessageReceived(); });
61 server_.SetMaxReadSize(FLAGS_rx_size + 100);
62 server_.SetMaxWriteSize(FLAGS_rx_size + 100);
63
64 event_loop_->SetRuntimeRealtimePriority(5);
65 }
66
67 ~Server() { event_loop_->epoll()->DeleteFd(server_.fd()); }
68
69 void SendMessage(std::string_view message) {
70 if (sac_assoc_id_ == 0) {
71 LOG(INFO) << "Lost connection to client. Not sending";
72 return;
73 }
74 if (server_.Send(message, sac_assoc_id_, 0, FLAGS_ttl)) {
75 LOG(INFO) << "Server reply with " << message.size() << "B";
76 } else {
77 PLOG(FATAL) << "Failed to send";
78 }
79 }
80
81 void MessageReceived() {
82 LOG(INFO) << "Received message";
83 aos::unique_c_ptr<Message> message = server_.Read();
84 if (!message) {
85 return;
86 }
87
88 if (message->message_type == Message::kNotification) {
89 const union sctp_notification *snp =
90 (const union sctp_notification *)message->data();
91
92 if (VLOG_IS_ON(2)) {
93 PrintNotification(message.get());
94 }
95
96 switch (snp->sn_header.sn_type) {
97 case SCTP_ASSOC_CHANGE: {
98 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
99 switch (sac->sac_state) {
100 case SCTP_COMM_UP:
101 NodeConnected(sac->sac_assoc_id);
102 VLOG(1) << "Peer connected";
103 break;
104 case SCTP_COMM_LOST:
105 case SCTP_SHUTDOWN_COMP:
106 case SCTP_CANT_STR_ASSOC:
107 NodeDisconnected(sac->sac_assoc_id);
108 VLOG(1) << "Disconnect";
109 break;
110 case SCTP_RESTART:
111 LOG(FATAL) << "Never seen this before.";
112 break;
113 }
114 } break;
115 }
116 } else if (message->message_type == Message::kMessage) {
117 SendMessage(
118 std::string_view((const char *)message->data(), message->size));
119 }
120 }
121
122 void NodeConnected(sctp_assoc_t assoc_id) {
123 sac_assoc_id_ = assoc_id;
124 server_.SetPriorityScheduler(assoc_id);
125 }
126 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) { sac_assoc_id_ = 0; }
127
128 private:
129 sctp_assoc_t sac_assoc_id_ = 0;
130 aos::ShmEventLoop *event_loop_;
131 SctpServer server_;
132};
133
134class Client {
135 public:
136 Client(aos::ShmEventLoop *event_loop)
Adam Snaider96a0f4b2023-05-18 20:41:19 -0700137 : event_loop_(event_loop),
138 client_(FLAGS_host, FLAGS_port, 2, "0.0.0.0", FLAGS_port,
Adam Snaider9bb33442023-06-26 16:31:37 -0700139 SctpAuthMethod()) {
140 client_.SetAuthKey(GetSctpAuthKey());
Adam Snaidere104ebf2023-06-05 17:42:06 -0700141 client_.SetMaxReadSize(FLAGS_rx_size + 100);
142 client_.SetMaxWriteSize(FLAGS_rx_size + 100);
143
144 timer_ = event_loop_->AddTimer([this]() { Ping(); });
145
146 event_loop_->OnRun([this]() {
147 timer_->Schedule(event_loop_->monotonic_now(),
148 chrono::milliseconds(1000));
149 });
150
151 event_loop_->epoll()->OnReadable(client_.fd(),
152 [this]() { MessageReceived(); });
153 event_loop_->SetRuntimeRealtimePriority(5);
154 }
155
156 ~Client() { event_loop_->epoll()->DeleteFd(client_.fd()); }
157
158 void Ping() {
159 std::string payload(FLAGS_payload_size, 'a');
160 sent_time_ = aos::monotonic_clock::now();
161 if (client_.Send(0, payload, FLAGS_ttl)) {
162 LOG(INFO) << "Sending " << payload.size() << "B";
163 } else {
164 PLOG(ERROR) << "Failed to send";
165 }
166 }
167
168 void MessageReceived() {
169 aos::unique_c_ptr<Message> message = client_.Read();
170 if (!message) {
171 return;
172 }
173
174 if (message->message_type == Message::kNotification) {
175 const union sctp_notification *snp =
176 (const union sctp_notification *)message->data();
177
178 if (VLOG_IS_ON(2)) {
179 PrintNotification(message.get());
180 }
181
182 switch (snp->sn_header.sn_type) {
183 case SCTP_ASSOC_CHANGE: {
184 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
185 switch (sac->sac_state) {
186 case SCTP_COMM_UP:
187 NodeConnected(sac->sac_assoc_id);
188 VLOG(1) << "Peer connected";
189 break;
190 case SCTP_COMM_LOST:
191 case SCTP_SHUTDOWN_COMP:
192 case SCTP_CANT_STR_ASSOC:
193 NodeDisconnected(sac->sac_assoc_id);
194 VLOG(1) << "Disconnect";
195 break;
196 case SCTP_RESTART:
197 LOG(FATAL) << "Never seen this before.";
198 break;
199 }
200 } break;
201 }
202 } else if (message->message_type == Message::kMessage) {
203 HandleData(message.get());
204 }
205 }
206
207 void NodeConnected(sctp_assoc_t assoc_id) {
208 client_.SetPriorityScheduler(assoc_id);
209 }
210 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) {}
211
212 void HandleData(const Message *) {
213 count_++;
214 if (count_ <= 0) {
215 LOG(INFO) << "Got message: Skipping " << -count_;
216 return;
217 }
218 auto elapsed = aos::monotonic_clock::now() - sent_time_;
219 double elapsed_secs =
220 std::chrono::duration_cast<std::chrono::duration<double>>(elapsed)
221 .count();
222 avg_latency_ = (avg_latency_ * (count_ - 1) + elapsed_secs) / count_;
223 // average one-way throughput
224 double throughput = FLAGS_payload_size * 2.0 / elapsed_secs;
225 double avg_throughput = FLAGS_payload_size * 2.0 / avg_latency_;
226 printf(
Adam Snaider96a0f4b2023-05-18 20:41:19 -0700227 "Round trip: %.2fms | %.2f KB/s | Avg RTL: %.2fms | %.2f KB/s | "
228 "Count: %d\n",
Adam Snaidere104ebf2023-06-05 17:42:06 -0700229 elapsed_secs * 1000, throughput / 1024, avg_latency_ * 1000,
230 avg_throughput / 1024, count_);
231 }
232
233 private:
234 aos::ShmEventLoop *event_loop_;
235 SctpClient client_;
236 aos::TimerHandler *timer_;
237 double avg_latency_ = 0.0;
238 int count_ = -FLAGS_skip_first_n;
239
240 aos::monotonic_clock::time_point sent_time_;
241};
242
243int Main() {
244 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
245 aos::configuration::ReadConfig(FLAGS_config);
246
247 aos::ShmEventLoop event_loop(&config.message());
248 if (FLAGS_client) {
249 CHECK(!FLAGS_host.empty()) << "Client Usage: `sctp_perf --client --host "
250 "abc.com --payload_size [bytes] "
251 "[--port PORT] [--config PATH]`";
252
253 Client client(&event_loop);
254 event_loop.Run();
255 } else {
256 CHECK(FLAGS_host.empty()) << "Server Usage: `sctp_perf [--config PATH]`";
257 Server server(&event_loop);
258 event_loop.Run();
259 }
260
261 return EXIT_SUCCESS;
262}
263
264} // namespace aos::message_bridge::perf
265
266int main(int argc, char **argv) {
267 gflags::SetUsageMessage(absl::StrCat(
268 "Measure SCTP performance\n", " Server Usage: `sctp_perf`\n",
269 " Client Usage: `sctp_perf --client --host abc.com`\n"));
270 aos::InitGoogle(&argc, &argv);
271
272 // Client and server need to malloc.
273 FLAGS_die_on_malloc = false;
274 return aos::message_bridge::perf::Main();
275}