blob: cce4bed6019f94da335e7f6aa1afac202024a6ea [file] [log] [blame]
Adam Snaidere104ebf2023-06-05 17:42:06 -07001#include <chrono>
2
3#include "gflags/gflags.h"
4#include "glog/logging.h"
5
6#include "aos/events/shm_event_loop.h"
7#include "aos/init.h"
8#include "aos/network/sctp_client.h"
9#include "aos/network/sctp_server.h"
10
11DEFINE_string(config, "aos_config.json", "Path to the config.");
12DEFINE_uint32(port, 1323, "Port to run the sctp test on");
13DEFINE_uint32(payload_size, 1000, "Size of data to send in bytes");
14DEFINE_uint32(ttl, 0, "TTL in milliseconds");
15DEFINE_uint32(rx_size, 1000000,
16 "RX buffer size to set the max size to be in bytes.");
17DEFINE_string(host, "", "Server host (acts as server if unspecified)");
18
19DEFINE_bool(client, false,
20 "If true, then act as a client, otherwise act as a server");
21DEFINE_uint32(skip_first_n, 10,
22 "Skip the first 'n' messages when computing statistics.");
23
24DECLARE_bool(die_on_malloc);
25
26namespace aos::message_bridge::perf {
27
28namespace chrono = std::chrono;
29
30class Server {
31 public:
32 Server(aos::ShmEventLoop *event_loop)
33 : event_loop_(event_loop), server_(2, "0.0.0.0", FLAGS_port) {
34 event_loop_->epoll()->OnReadable(server_.fd(),
35 [this]() { MessageReceived(); });
36 server_.SetMaxReadSize(FLAGS_rx_size + 100);
37 server_.SetMaxWriteSize(FLAGS_rx_size + 100);
38
39 event_loop_->SetRuntimeRealtimePriority(5);
40 }
41
42 ~Server() { event_loop_->epoll()->DeleteFd(server_.fd()); }
43
44 void SendMessage(std::string_view message) {
45 if (sac_assoc_id_ == 0) {
46 LOG(INFO) << "Lost connection to client. Not sending";
47 return;
48 }
49 if (server_.Send(message, sac_assoc_id_, 0, FLAGS_ttl)) {
50 LOG(INFO) << "Server reply with " << message.size() << "B";
51 } else {
52 PLOG(FATAL) << "Failed to send";
53 }
54 }
55
56 void MessageReceived() {
57 LOG(INFO) << "Received message";
58 aos::unique_c_ptr<Message> message = server_.Read();
59 if (!message) {
60 return;
61 }
62
63 if (message->message_type == Message::kNotification) {
64 const union sctp_notification *snp =
65 (const union sctp_notification *)message->data();
66
67 if (VLOG_IS_ON(2)) {
68 PrintNotification(message.get());
69 }
70
71 switch (snp->sn_header.sn_type) {
72 case SCTP_ASSOC_CHANGE: {
73 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
74 switch (sac->sac_state) {
75 case SCTP_COMM_UP:
76 NodeConnected(sac->sac_assoc_id);
77 VLOG(1) << "Peer connected";
78 break;
79 case SCTP_COMM_LOST:
80 case SCTP_SHUTDOWN_COMP:
81 case SCTP_CANT_STR_ASSOC:
82 NodeDisconnected(sac->sac_assoc_id);
83 VLOG(1) << "Disconnect";
84 break;
85 case SCTP_RESTART:
86 LOG(FATAL) << "Never seen this before.";
87 break;
88 }
89 } break;
90 }
91 } else if (message->message_type == Message::kMessage) {
92 SendMessage(
93 std::string_view((const char *)message->data(), message->size));
94 }
95 }
96
97 void NodeConnected(sctp_assoc_t assoc_id) {
98 sac_assoc_id_ = assoc_id;
99 server_.SetPriorityScheduler(assoc_id);
100 }
101 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) { sac_assoc_id_ = 0; }
102
103 private:
104 sctp_assoc_t sac_assoc_id_ = 0;
105 aos::ShmEventLoop *event_loop_;
106 SctpServer server_;
107};
108
109class Client {
110 public:
111 Client(aos::ShmEventLoop *event_loop)
112 : event_loop_(event_loop), client_(FLAGS_host, FLAGS_port, 2) {
113 client_.SetMaxReadSize(FLAGS_rx_size + 100);
114 client_.SetMaxWriteSize(FLAGS_rx_size + 100);
115
116 timer_ = event_loop_->AddTimer([this]() { Ping(); });
117
118 event_loop_->OnRun([this]() {
119 timer_->Schedule(event_loop_->monotonic_now(),
120 chrono::milliseconds(1000));
121 });
122
123 event_loop_->epoll()->OnReadable(client_.fd(),
124 [this]() { MessageReceived(); });
125 event_loop_->SetRuntimeRealtimePriority(5);
126 }
127
128 ~Client() { event_loop_->epoll()->DeleteFd(client_.fd()); }
129
130 void Ping() {
131 std::string payload(FLAGS_payload_size, 'a');
132 sent_time_ = aos::monotonic_clock::now();
133 if (client_.Send(0, payload, FLAGS_ttl)) {
134 LOG(INFO) << "Sending " << payload.size() << "B";
135 } else {
136 PLOG(ERROR) << "Failed to send";
137 }
138 }
139
140 void MessageReceived() {
141 aos::unique_c_ptr<Message> message = client_.Read();
142 if (!message) {
143 return;
144 }
145
146 if (message->message_type == Message::kNotification) {
147 const union sctp_notification *snp =
148 (const union sctp_notification *)message->data();
149
150 if (VLOG_IS_ON(2)) {
151 PrintNotification(message.get());
152 }
153
154 switch (snp->sn_header.sn_type) {
155 case SCTP_ASSOC_CHANGE: {
156 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
157 switch (sac->sac_state) {
158 case SCTP_COMM_UP:
159 NodeConnected(sac->sac_assoc_id);
160 VLOG(1) << "Peer connected";
161 break;
162 case SCTP_COMM_LOST:
163 case SCTP_SHUTDOWN_COMP:
164 case SCTP_CANT_STR_ASSOC:
165 NodeDisconnected(sac->sac_assoc_id);
166 VLOG(1) << "Disconnect";
167 break;
168 case SCTP_RESTART:
169 LOG(FATAL) << "Never seen this before.";
170 break;
171 }
172 } break;
173 }
174 } else if (message->message_type == Message::kMessage) {
175 HandleData(message.get());
176 }
177 }
178
179 void NodeConnected(sctp_assoc_t assoc_id) {
180 client_.SetPriorityScheduler(assoc_id);
181 }
182 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) {}
183
184 void HandleData(const Message *) {
185 count_++;
186 if (count_ <= 0) {
187 LOG(INFO) << "Got message: Skipping " << -count_;
188 return;
189 }
190 auto elapsed = aos::monotonic_clock::now() - sent_time_;
191 double elapsed_secs =
192 std::chrono::duration_cast<std::chrono::duration<double>>(elapsed)
193 .count();
194 avg_latency_ = (avg_latency_ * (count_ - 1) + elapsed_secs) / count_;
195 // average one-way throughput
196 double throughput = FLAGS_payload_size * 2.0 / elapsed_secs;
197 double avg_throughput = FLAGS_payload_size * 2.0 / avg_latency_;
198 printf(
199 "Round trip: %.2fms | %.2f KB/s | Avg RTL: %.2fms | %.2f KB/s | Count: "
200 "%d\n",
201 elapsed_secs * 1000, throughput / 1024, avg_latency_ * 1000,
202 avg_throughput / 1024, count_);
203 }
204
205 private:
206 aos::ShmEventLoop *event_loop_;
207 SctpClient client_;
208 aos::TimerHandler *timer_;
209 double avg_latency_ = 0.0;
210 int count_ = -FLAGS_skip_first_n;
211
212 aos::monotonic_clock::time_point sent_time_;
213};
214
215int Main() {
216 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
217 aos::configuration::ReadConfig(FLAGS_config);
218
219 aos::ShmEventLoop event_loop(&config.message());
220 if (FLAGS_client) {
221 CHECK(!FLAGS_host.empty()) << "Client Usage: `sctp_perf --client --host "
222 "abc.com --payload_size [bytes] "
223 "[--port PORT] [--config PATH]`";
224
225 Client client(&event_loop);
226 event_loop.Run();
227 } else {
228 CHECK(FLAGS_host.empty()) << "Server Usage: `sctp_perf [--config PATH]`";
229 Server server(&event_loop);
230 event_loop.Run();
231 }
232
233 return EXIT_SUCCESS;
234}
235
236} // namespace aos::message_bridge::perf
237
238int main(int argc, char **argv) {
239 gflags::SetUsageMessage(absl::StrCat(
240 "Measure SCTP performance\n", " Server Usage: `sctp_perf`\n",
241 " Client Usage: `sctp_perf --client --host abc.com`\n"));
242 aos::InitGoogle(&argc, &argv);
243
244 // Client and server need to malloc.
245 FLAGS_die_on_malloc = false;
246 return aos::message_bridge::perf::Main();
247}