blob: 6aceaeeff1e7686fd250aa3fb3ddf6e99b41117e [file] [log] [blame]
Austin Schuhd8d23612021-10-05 18:26:45 -07001#include <chrono>
2
Austin Schuh99f7c6a2024-06-25 22:07:44 -07003#include "absl/flags/flag.h"
4#include "absl/log/log.h"
Austin Schuhd8d23612021-10-05 18:26:45 -07005
6#include "aos/events/shm_event_loop.h"
7#include "aos/init.h"
8#include "aos/network/sctp_server.h"
9
Austin Schuh99f7c6a2024-06-25 22:07:44 -070010ABSL_FLAG(std::string, config, "aos_config.json", "Path to the config.");
11ABSL_FLAG(uint32_t, port, 1323, "Port to pingpong on");
12ABSL_FLAG(uint32_t, size, 1000000, "Size of data to send in bytes");
13ABSL_FLAG(uint32_t, duration, 1000, "Period to send at in milliseconds");
14ABSL_FLAG(uint32_t, ttl, 0, "TTL in milliseconds");
Austin Schuhd8d23612021-10-05 18:26:45 -070015
16namespace aos {
17namespace message_bridge {
18
19namespace chrono = std::chrono;
20
21class PingServer {
22 public:
23 PingServer(aos::ShmEventLoop *event_loop)
Austin Schuh99f7c6a2024-06-25 22:07:44 -070024 : event_loop_(event_loop), server_(2, "::", absl::GetFlag(FLAGS_port)) {
Austin Schuhd8d23612021-10-05 18:26:45 -070025 event_loop_->epoll()->OnReadable(server_.fd(),
26 [this]() { MessageReceived(); });
Austin Schuh99f7c6a2024-06-25 22:07:44 -070027 server_.SetMaxReadSize(absl::GetFlag(FLAGS_size) + 100);
28 server_.SetMaxWriteSize(absl::GetFlag(FLAGS_size) + 100);
Austin Schuhd8d23612021-10-05 18:26:45 -070029
30 timer_ = event_loop_->AddTimer([this]() { Timer(); });
31
32 event_loop_->OnRun([this]() {
33 timer_->Schedule(event_loop_->monotonic_now(),
Austin Schuh99f7c6a2024-06-25 22:07:44 -070034 chrono::milliseconds(absl::GetFlag(FLAGS_duration)));
Austin Schuhd8d23612021-10-05 18:26:45 -070035 });
36
37 event_loop_->SetRuntimeRealtimePriority(5);
38 }
39
40 ~PingServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
41
42 aos::TimerHandler *timer_;
43 sctp_assoc_t sac_assoc_id_ = 0;
44
45 void Timer() {
46 if (sac_assoc_id_ == 0) {
47 LOG(INFO) << "No client, not sending";
48 return;
49 }
50
Austin Schuh99f7c6a2024-06-25 22:07:44 -070051 std::string data(absl::GetFlag(FLAGS_size), 'a');
Austin Schuhd8d23612021-10-05 18:26:45 -070052
Austin Schuh99f7c6a2024-06-25 22:07:44 -070053 if (server_.Send(data, sac_assoc_id_, 0, absl::GetFlag(FLAGS_ttl))) {
Austin Schuhd8d23612021-10-05 18:26:45 -070054 LOG(INFO) << "Sent " << data.size();
55 } else {
56 PLOG(ERROR) << "Failed to send";
57 }
58 }
59
60 void MessageReceived() {
61 aos::unique_c_ptr<Message> message = server_.Read();
62 if (!message) {
63 return;
64 }
65
66 if (message->message_type == Message::kNotification) {
67 const union sctp_notification *snp =
68 (const union sctp_notification *)message->data();
69
70 if (VLOG_IS_ON(2)) {
71 PrintNotification(message.get());
72 }
73
74 switch (snp->sn_header.sn_type) {
75 case SCTP_ASSOC_CHANGE: {
76 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
77 switch (sac->sac_state) {
78 case SCTP_COMM_UP:
79 NodeConnected(sac->sac_assoc_id);
80 VLOG(1) << "Peer connected";
81 break;
82 case SCTP_COMM_LOST:
83 case SCTP_SHUTDOWN_COMP:
84 case SCTP_CANT_STR_ASSOC:
85 NodeDisconnected(sac->sac_assoc_id);
86 VLOG(1) << "Disconnect";
87 break;
88 case SCTP_RESTART:
89 LOG(FATAL) << "Never seen this before.";
90 break;
91 }
92 } break;
93 }
94 } else if (message->message_type == Message::kMessage) {
95 HandleData(message.get());
96 }
97 }
98
99 void NodeConnected(sctp_assoc_t assoc_id) {
100 sac_assoc_id_ = assoc_id;
101 server_.SetPriorityScheduler(assoc_id);
102 }
103 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) { sac_assoc_id_ = 0; }
104
105 void HandleData(const Message *message) {
106 VLOG(1) << "Received data of length " << message->size;
107
108 if (VLOG_IS_ON(1)) {
109 message->LogRcvInfo();
110 }
111 }
112
113 private:
114 aos::ShmEventLoop *event_loop_;
115 SctpServer server_;
116};
117
118int Main() {
119 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700120 aos::configuration::ReadConfig(absl::GetFlag(FLAGS_config));
Austin Schuhd8d23612021-10-05 18:26:45 -0700121
122 aos::ShmEventLoop event_loop(&config.message());
123 PingServer server(&event_loop);
124 event_loop.Run();
125
126 return EXIT_SUCCESS;
127}
128
129} // namespace message_bridge
130} // namespace aos
131
132int main(int argc, char **argv) {
133 aos::InitGoogle(&argc, &argv);
134
135 return aos::message_bridge::Main();
136}