blob: 8f088364d8dad3c606d0a9a4d4aa15284bec7998 [file] [log] [blame]
Austin Schuhd8d23612021-10-05 18:26:45 -07001#include <chrono>
2
3#include "gflags/gflags.h"
4#include "glog/logging.h"
5
6#include "aos/events/shm_event_loop.h"
7#include "aos/init.h"
8#include "aos/network/sctp_client.h"
9
10DEFINE_string(config, "aos_config.json", "Path to the config.");
11DEFINE_uint32(port, 1323, "Port to pingpong on");
12DEFINE_string(target, "vpu0-0a", "Host to connect to");
13DEFINE_uint32(rx_size, 1000000,
14 "RX buffer size to set the max size to be in bytes.");
15DEFINE_uint32(size, 1000, "Size of data to send in bytes");
16DEFINE_uint32(ttl, 0, "TTL in milliseconds");
17
18namespace aos {
19namespace message_bridge {
20
21namespace chrono = std::chrono;
22
23class PingClient {
24 public:
25 PingClient(aos::ShmEventLoop *event_loop)
26 : event_loop_(event_loop), client_(FLAGS_target, FLAGS_port, 2, "::", 0) {
27 client_.SetMaxReadSize(std::max(FLAGS_rx_size, FLAGS_size) + 100);
28 client_.SetMaxWriteSize(std::max(FLAGS_rx_size, FLAGS_size) + 100);
29
30 timer_ = event_loop_->AddTimer([this]() { Timer(); });
31
32 event_loop_->OnRun([this]() {
33 timer_->Schedule(event_loop_->monotonic_now(),
34 chrono::milliseconds(1000));
35 });
36
37 event_loop_->epoll()->OnReadable(client_.fd(),
38 [this]() { MessageReceived(); });
39 event_loop_->SetRuntimeRealtimePriority(5);
40 }
41
42 ~PingClient() { event_loop_->epoll()->DeleteFd(client_.fd()); }
43
44 aos::TimerHandler *timer_;
45 sctp_assoc_t sac_assoc_id_ = 0;
46
47 void Timer() {
48 std::string data(FLAGS_size, 'a');
49
50 if (client_.Send(0, data, FLAGS_ttl)) {
51 LOG(INFO) << "Sent " << data.size();
52 } else {
53 PLOG(ERROR) << "Failed to send";
54 }
55 }
56
57 void MessageReceived() {
58 aos::unique_c_ptr<Message> message = client_.Read();
59 ++count_;
60 if (!message) {
61 return;
62 }
63
64 if (message->message_type == Message::kNotification) {
65 const union sctp_notification *snp =
66 (const union sctp_notification *)message->data();
67
68 if (VLOG_IS_ON(2)) {
69 PrintNotification(message.get());
70 }
71
72 switch (snp->sn_header.sn_type) {
73 case SCTP_ASSOC_CHANGE: {
74 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
75 switch (sac->sac_state) {
76 case SCTP_COMM_UP:
77 NodeConnected(sac->sac_assoc_id);
78 VLOG(1) << "Peer connected";
79 break;
80 case SCTP_COMM_LOST:
81 case SCTP_SHUTDOWN_COMP:
82 case SCTP_CANT_STR_ASSOC:
83 NodeDisconnected(sac->sac_assoc_id);
84 VLOG(1) << "Disconnect";
85 break;
86 case SCTP_RESTART:
87 LOG(FATAL) << "Never seen this before.";
88 break;
89 }
90 } break;
91 }
92 } else if (message->message_type == Message::kMessage) {
93 HandleData(message.get());
94 }
95 }
96
97 void NodeConnected(sctp_assoc_t assoc_id) {
98 client_.SetPriorityScheduler(assoc_id);
99 }
100 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) {}
101
102 void HandleData(const Message *message) {
103 LOG(INFO) << "Received data of length " << message->size << " total "
104 << size_ << " count " << count_;
105 size_ += message->size;
106
107 if (VLOG_IS_ON(1)) {
108 message->LogRcvInfo();
109 }
110 }
111
112 private:
113 aos::ShmEventLoop *event_loop_;
114 SctpClient client_;
115
116 size_t count_ = 0;
117 size_t size_ = 0;
118};
119
120int Main() {
121 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
122 aos::configuration::ReadConfig(FLAGS_config);
123
124 aos::ShmEventLoop event_loop(&config.message());
125 PingClient server(&event_loop);
126 event_loop.Run();
127
128 return EXIT_SUCCESS;
129}
130
131} // namespace message_bridge
132} // namespace aos
133
134int main(int argc, char **argv) {
135 aos::InitGoogle(&argc, &argv);
136
137 return aos::message_bridge::Main();
138}