blob: f1051331adc31d2059744fa98776fd06b2732c30 [file] [log] [blame]
Austin Schuhd8d23612021-10-05 18:26:45 -07001#include <chrono>
2
3#include "gflags/gflags.h"
4#include "glog/logging.h"
5
6#include "aos/events/shm_event_loop.h"
7#include "aos/init.h"
8#include "aos/network/sctp_server.h"
9
10DEFINE_string(config, "aos_config.json", "Path to the config.");
11DEFINE_uint32(port, 1323, "Port to pingpong on");
12DEFINE_uint32(size, 1000000, "Size of data to send in bytes");
13DEFINE_uint32(duration, 1000, "Period to send at in milliseconds");
14DEFINE_uint32(ttl, 0, "TTL in milliseconds");
15
16namespace aos {
17namespace message_bridge {
18
19namespace chrono = std::chrono;
20
21class PingServer {
22 public:
23 PingServer(aos::ShmEventLoop *event_loop)
24 : event_loop_(event_loop), server_(2, "::", FLAGS_port) {
25 event_loop_->epoll()->OnReadable(server_.fd(),
26 [this]() { MessageReceived(); });
27 server_.SetMaxReadSize(FLAGS_size + 100);
28 server_.SetMaxWriteSize(FLAGS_size + 100);
29
30 timer_ = event_loop_->AddTimer([this]() { Timer(); });
31
32 event_loop_->OnRun([this]() {
33 timer_->Schedule(event_loop_->monotonic_now(),
34 chrono::milliseconds(FLAGS_duration));
35 });
36
37 event_loop_->SetRuntimeRealtimePriority(5);
38 }
39
40 ~PingServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
41
42 aos::TimerHandler *timer_;
43 sctp_assoc_t sac_assoc_id_ = 0;
44
45 void Timer() {
46 if (sac_assoc_id_ == 0) {
47 LOG(INFO) << "No client, not sending";
48 return;
49 }
50
51 std::string data(FLAGS_size, 'a');
52
53 if (server_.Send(data, sac_assoc_id_, 0, FLAGS_ttl)) {
54 LOG(INFO) << "Sent " << data.size();
55 } else {
56 PLOG(ERROR) << "Failed to send";
57 }
58 }
59
60 void MessageReceived() {
61 aos::unique_c_ptr<Message> message = server_.Read();
62 if (!message) {
63 return;
64 }
65
66 if (message->message_type == Message::kNotification) {
67 const union sctp_notification *snp =
68 (const union sctp_notification *)message->data();
69
70 if (VLOG_IS_ON(2)) {
71 PrintNotification(message.get());
72 }
73
74 switch (snp->sn_header.sn_type) {
75 case SCTP_ASSOC_CHANGE: {
76 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
77 switch (sac->sac_state) {
78 case SCTP_COMM_UP:
79 NodeConnected(sac->sac_assoc_id);
80 VLOG(1) << "Peer connected";
81 break;
82 case SCTP_COMM_LOST:
83 case SCTP_SHUTDOWN_COMP:
84 case SCTP_CANT_STR_ASSOC:
85 NodeDisconnected(sac->sac_assoc_id);
86 VLOG(1) << "Disconnect";
87 break;
88 case SCTP_RESTART:
89 LOG(FATAL) << "Never seen this before.";
90 break;
91 }
92 } break;
93 }
94 } else if (message->message_type == Message::kMessage) {
95 HandleData(message.get());
96 }
97 }
98
99 void NodeConnected(sctp_assoc_t assoc_id) {
100 sac_assoc_id_ = assoc_id;
101 server_.SetPriorityScheduler(assoc_id);
102 }
103 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) { sac_assoc_id_ = 0; }
104
105 void HandleData(const Message *message) {
106 VLOG(1) << "Received data of length " << message->size;
107
108 if (VLOG_IS_ON(1)) {
109 message->LogRcvInfo();
110 }
111 }
112
113 private:
114 aos::ShmEventLoop *event_loop_;
115 SctpServer server_;
116};
117
118int Main() {
119 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
120 aos::configuration::ReadConfig(FLAGS_config);
121
122 aos::ShmEventLoop event_loop(&config.message());
123 PingServer server(&event_loop);
124 event_loop.Run();
125
126 return EXIT_SUCCESS;
127}
128
129} // namespace message_bridge
130} // namespace aos
131
132int main(int argc, char **argv) {
133 aos::InitGoogle(&argc, &argv);
134
135 return aos::message_bridge::Main();
136}