blob: 3bafed1f0a0bb45fbfd7af71eda4a351469fc8e5 [file] [log] [blame]
Adam Snaidere104ebf2023-06-05 17:42:06 -07001#include <chrono>
2
3#include "gflags/gflags.h"
4#include "glog/logging.h"
5
6#include "aos/events/shm_event_loop.h"
7#include "aos/init.h"
8#include "aos/network/sctp_client.h"
Adam Snaider96a0f4b2023-05-18 20:41:19 -07009#include "aos/network/sctp_lib.h"
Adam Snaidere104ebf2023-06-05 17:42:06 -070010#include "aos/network/sctp_server.h"
11
12DEFINE_string(config, "aos_config.json", "Path to the config.");
13DEFINE_uint32(port, 1323, "Port to run the sctp test on");
14DEFINE_uint32(payload_size, 1000, "Size of data to send in bytes");
15DEFINE_uint32(ttl, 0, "TTL in milliseconds");
16DEFINE_uint32(rx_size, 1000000,
17 "RX buffer size to set the max size to be in bytes.");
18DEFINE_string(host, "", "Server host (acts as server if unspecified)");
19
20DEFINE_bool(client, false,
21 "If true, then act as a client, otherwise act as a server");
22DEFINE_uint32(skip_first_n, 10,
23 "Skip the first 'n' messages when computing statistics.");
24
Adam Snaider96a0f4b2023-05-18 20:41:19 -070025#if HAS_SCTP_AUTH
26DEFINE_string(sctp_auth_key_file, "",
27 "When set, use the provided key for SCTP authentication as "
28 "defined in RFC 4895");
29#endif
30
Adam Snaidere104ebf2023-06-05 17:42:06 -070031DECLARE_bool(die_on_malloc);
32
33namespace aos::message_bridge::perf {
34
Adam Snaider96a0f4b2023-05-18 20:41:19 -070035namespace {
36
37using util::ReadFileToVecOrDie;
38
39std::vector<uint8_t> GetSctpAuthKey() {
40#if HAS_SCTP_AUTH
41 if (!FLAGS_sctp_auth_key_file.empty()) {
42 return ReadFileToVecOrDie(FLAGS_sctp_auth_key_file);
43 }
44#endif
45 return {};
46}
47
48} // namespace
49
Adam Snaidere104ebf2023-06-05 17:42:06 -070050namespace chrono = std::chrono;
51
52class Server {
53 public:
54 Server(aos::ShmEventLoop *event_loop)
Adam Snaider96a0f4b2023-05-18 20:41:19 -070055 : event_loop_(event_loop),
56 server_(2, "0.0.0.0", FLAGS_port, GetSctpAuthKey()) {
Adam Snaidere104ebf2023-06-05 17:42:06 -070057 event_loop_->epoll()->OnReadable(server_.fd(),
58 [this]() { MessageReceived(); });
59 server_.SetMaxReadSize(FLAGS_rx_size + 100);
60 server_.SetMaxWriteSize(FLAGS_rx_size + 100);
61
62 event_loop_->SetRuntimeRealtimePriority(5);
63 }
64
65 ~Server() { event_loop_->epoll()->DeleteFd(server_.fd()); }
66
67 void SendMessage(std::string_view message) {
68 if (sac_assoc_id_ == 0) {
69 LOG(INFO) << "Lost connection to client. Not sending";
70 return;
71 }
72 if (server_.Send(message, sac_assoc_id_, 0, FLAGS_ttl)) {
73 LOG(INFO) << "Server reply with " << message.size() << "B";
74 } else {
75 PLOG(FATAL) << "Failed to send";
76 }
77 }
78
79 void MessageReceived() {
80 LOG(INFO) << "Received message";
81 aos::unique_c_ptr<Message> message = server_.Read();
82 if (!message) {
83 return;
84 }
85
86 if (message->message_type == Message::kNotification) {
87 const union sctp_notification *snp =
88 (const union sctp_notification *)message->data();
89
90 if (VLOG_IS_ON(2)) {
91 PrintNotification(message.get());
92 }
93
94 switch (snp->sn_header.sn_type) {
95 case SCTP_ASSOC_CHANGE: {
96 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
97 switch (sac->sac_state) {
98 case SCTP_COMM_UP:
99 NodeConnected(sac->sac_assoc_id);
100 VLOG(1) << "Peer connected";
101 break;
102 case SCTP_COMM_LOST:
103 case SCTP_SHUTDOWN_COMP:
104 case SCTP_CANT_STR_ASSOC:
105 NodeDisconnected(sac->sac_assoc_id);
106 VLOG(1) << "Disconnect";
107 break;
108 case SCTP_RESTART:
109 LOG(FATAL) << "Never seen this before.";
110 break;
111 }
112 } break;
113 }
114 } else if (message->message_type == Message::kMessage) {
115 SendMessage(
116 std::string_view((const char *)message->data(), message->size));
117 }
118 }
119
120 void NodeConnected(sctp_assoc_t assoc_id) {
121 sac_assoc_id_ = assoc_id;
122 server_.SetPriorityScheduler(assoc_id);
123 }
124 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) { sac_assoc_id_ = 0; }
125
126 private:
127 sctp_assoc_t sac_assoc_id_ = 0;
128 aos::ShmEventLoop *event_loop_;
129 SctpServer server_;
130};
131
132class Client {
133 public:
134 Client(aos::ShmEventLoop *event_loop)
Adam Snaider96a0f4b2023-05-18 20:41:19 -0700135 : event_loop_(event_loop),
136 client_(FLAGS_host, FLAGS_port, 2, "0.0.0.0", FLAGS_port,
137 GetSctpAuthKey()) {
Adam Snaidere104ebf2023-06-05 17:42:06 -0700138 client_.SetMaxReadSize(FLAGS_rx_size + 100);
139 client_.SetMaxWriteSize(FLAGS_rx_size + 100);
140
141 timer_ = event_loop_->AddTimer([this]() { Ping(); });
142
143 event_loop_->OnRun([this]() {
144 timer_->Schedule(event_loop_->monotonic_now(),
145 chrono::milliseconds(1000));
146 });
147
148 event_loop_->epoll()->OnReadable(client_.fd(),
149 [this]() { MessageReceived(); });
150 event_loop_->SetRuntimeRealtimePriority(5);
151 }
152
153 ~Client() { event_loop_->epoll()->DeleteFd(client_.fd()); }
154
155 void Ping() {
156 std::string payload(FLAGS_payload_size, 'a');
157 sent_time_ = aos::monotonic_clock::now();
158 if (client_.Send(0, payload, FLAGS_ttl)) {
159 LOG(INFO) << "Sending " << payload.size() << "B";
160 } else {
161 PLOG(ERROR) << "Failed to send";
162 }
163 }
164
165 void MessageReceived() {
166 aos::unique_c_ptr<Message> message = client_.Read();
167 if (!message) {
168 return;
169 }
170
171 if (message->message_type == Message::kNotification) {
172 const union sctp_notification *snp =
173 (const union sctp_notification *)message->data();
174
175 if (VLOG_IS_ON(2)) {
176 PrintNotification(message.get());
177 }
178
179 switch (snp->sn_header.sn_type) {
180 case SCTP_ASSOC_CHANGE: {
181 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
182 switch (sac->sac_state) {
183 case SCTP_COMM_UP:
184 NodeConnected(sac->sac_assoc_id);
185 VLOG(1) << "Peer connected";
186 break;
187 case SCTP_COMM_LOST:
188 case SCTP_SHUTDOWN_COMP:
189 case SCTP_CANT_STR_ASSOC:
190 NodeDisconnected(sac->sac_assoc_id);
191 VLOG(1) << "Disconnect";
192 break;
193 case SCTP_RESTART:
194 LOG(FATAL) << "Never seen this before.";
195 break;
196 }
197 } break;
198 }
199 } else if (message->message_type == Message::kMessage) {
200 HandleData(message.get());
201 }
202 }
203
204 void NodeConnected(sctp_assoc_t assoc_id) {
205 client_.SetPriorityScheduler(assoc_id);
206 }
207 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) {}
208
209 void HandleData(const Message *) {
210 count_++;
211 if (count_ <= 0) {
212 LOG(INFO) << "Got message: Skipping " << -count_;
213 return;
214 }
215 auto elapsed = aos::monotonic_clock::now() - sent_time_;
216 double elapsed_secs =
217 std::chrono::duration_cast<std::chrono::duration<double>>(elapsed)
218 .count();
219 avg_latency_ = (avg_latency_ * (count_ - 1) + elapsed_secs) / count_;
220 // average one-way throughput
221 double throughput = FLAGS_payload_size * 2.0 / elapsed_secs;
222 double avg_throughput = FLAGS_payload_size * 2.0 / avg_latency_;
223 printf(
Adam Snaider96a0f4b2023-05-18 20:41:19 -0700224 "Round trip: %.2fms | %.2f KB/s | Avg RTL: %.2fms | %.2f KB/s | "
225 "Count: %d\n",
Adam Snaidere104ebf2023-06-05 17:42:06 -0700226 elapsed_secs * 1000, throughput / 1024, avg_latency_ * 1000,
227 avg_throughput / 1024, count_);
228 }
229
230 private:
231 aos::ShmEventLoop *event_loop_;
232 SctpClient client_;
233 aos::TimerHandler *timer_;
234 double avg_latency_ = 0.0;
235 int count_ = -FLAGS_skip_first_n;
236
237 aos::monotonic_clock::time_point sent_time_;
238};
239
240int Main() {
241 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
242 aos::configuration::ReadConfig(FLAGS_config);
243
244 aos::ShmEventLoop event_loop(&config.message());
245 if (FLAGS_client) {
246 CHECK(!FLAGS_host.empty()) << "Client Usage: `sctp_perf --client --host "
247 "abc.com --payload_size [bytes] "
248 "[--port PORT] [--config PATH]`";
249
250 Client client(&event_loop);
251 event_loop.Run();
252 } else {
253 CHECK(FLAGS_host.empty()) << "Server Usage: `sctp_perf [--config PATH]`";
254 Server server(&event_loop);
255 event_loop.Run();
256 }
257
258 return EXIT_SUCCESS;
259}
260
261} // namespace aos::message_bridge::perf
262
263int main(int argc, char **argv) {
264 gflags::SetUsageMessage(absl::StrCat(
265 "Measure SCTP performance\n", " Server Usage: `sctp_perf`\n",
266 " Client Usage: `sctp_perf --client --host abc.com`\n"));
267 aos::InitGoogle(&argc, &argv);
268
269 // Client and server need to malloc.
270 FLAGS_die_on_malloc = false;
271 return aos::message_bridge::perf::Main();
272}