blob: 29253ec563c5576344bfbe86db7d7a2b5327164a [file] [log] [blame]
Austin Schuhd8d23612021-10-05 18:26:45 -07001#include <chrono>
2
Austin Schuh99f7c6a2024-06-25 22:07:44 -07003#include "absl/flags/flag.h"
4#include "absl/log/log.h"
Austin Schuhd8d23612021-10-05 18:26:45 -07005
6#include "aos/events/shm_event_loop.h"
7#include "aos/init.h"
8#include "aos/network/sctp_client.h"
9
Austin Schuh99f7c6a2024-06-25 22:07:44 -070010ABSL_FLAG(std::string, config, "aos_config.json", "Path to the config.");
11ABSL_FLAG(uint32_t, port, 1323, "Port to pingpong on");
12ABSL_FLAG(std::string, target, "vpu0-0a", "Host to connect to");
13ABSL_FLAG(uint32_t, rx_size, 1000000,
14 "RX buffer size to set the max size to be in bytes.");
15ABSL_FLAG(uint32_t, size, 1000, "Size of data to send in bytes");
16ABSL_FLAG(uint32_t, ttl, 0, "TTL in milliseconds");
Austin Schuhd8d23612021-10-05 18:26:45 -070017
18namespace aos {
19namespace message_bridge {
20
21namespace chrono = std::chrono;
22
23class PingClient {
24 public:
25 PingClient(aos::ShmEventLoop *event_loop)
Austin Schuh99f7c6a2024-06-25 22:07:44 -070026 : event_loop_(event_loop),
27 client_(absl::GetFlag(FLAGS_target), absl::GetFlag(FLAGS_port), 2,
28 "::", 0) {
29 client_.SetMaxReadSize(
30 std::max(absl::GetFlag(FLAGS_rx_size), absl::GetFlag(FLAGS_size)) +
31 100);
32 client_.SetMaxWriteSize(
33 std::max(absl::GetFlag(FLAGS_rx_size), absl::GetFlag(FLAGS_size)) +
34 100);
Austin Schuhd8d23612021-10-05 18:26:45 -070035
36 timer_ = event_loop_->AddTimer([this]() { Timer(); });
37
38 event_loop_->OnRun([this]() {
39 timer_->Schedule(event_loop_->monotonic_now(),
40 chrono::milliseconds(1000));
41 });
42
43 event_loop_->epoll()->OnReadable(client_.fd(),
44 [this]() { MessageReceived(); });
45 event_loop_->SetRuntimeRealtimePriority(5);
46 }
47
48 ~PingClient() { event_loop_->epoll()->DeleteFd(client_.fd()); }
49
50 aos::TimerHandler *timer_;
51 sctp_assoc_t sac_assoc_id_ = 0;
52
53 void Timer() {
Austin Schuh99f7c6a2024-06-25 22:07:44 -070054 std::string data(absl::GetFlag(FLAGS_size), 'a');
Austin Schuhd8d23612021-10-05 18:26:45 -070055
Austin Schuh99f7c6a2024-06-25 22:07:44 -070056 if (client_.Send(0, data, absl::GetFlag(FLAGS_ttl))) {
Austin Schuhd8d23612021-10-05 18:26:45 -070057 LOG(INFO) << "Sent " << data.size();
58 } else {
59 PLOG(ERROR) << "Failed to send";
60 }
61 }
62
63 void MessageReceived() {
64 aos::unique_c_ptr<Message> message = client_.Read();
65 ++count_;
66 if (!message) {
67 return;
68 }
69
70 if (message->message_type == Message::kNotification) {
71 const union sctp_notification *snp =
72 (const union sctp_notification *)message->data();
73
74 if (VLOG_IS_ON(2)) {
75 PrintNotification(message.get());
76 }
77
78 switch (snp->sn_header.sn_type) {
79 case SCTP_ASSOC_CHANGE: {
80 const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
81 switch (sac->sac_state) {
82 case SCTP_COMM_UP:
83 NodeConnected(sac->sac_assoc_id);
84 VLOG(1) << "Peer connected";
85 break;
86 case SCTP_COMM_LOST:
87 case SCTP_SHUTDOWN_COMP:
88 case SCTP_CANT_STR_ASSOC:
89 NodeDisconnected(sac->sac_assoc_id);
90 VLOG(1) << "Disconnect";
91 break;
92 case SCTP_RESTART:
93 LOG(FATAL) << "Never seen this before.";
94 break;
95 }
96 } break;
97 }
98 } else if (message->message_type == Message::kMessage) {
99 HandleData(message.get());
100 }
101 }
102
103 void NodeConnected(sctp_assoc_t assoc_id) {
104 client_.SetPriorityScheduler(assoc_id);
105 }
106 void NodeDisconnected(sctp_assoc_t /*assoc_id*/) {}
107
108 void HandleData(const Message *message) {
109 LOG(INFO) << "Received data of length " << message->size << " total "
110 << size_ << " count " << count_;
111 size_ += message->size;
112
113 if (VLOG_IS_ON(1)) {
114 message->LogRcvInfo();
115 }
116 }
117
118 private:
119 aos::ShmEventLoop *event_loop_;
120 SctpClient client_;
121
122 size_t count_ = 0;
123 size_t size_ = 0;
124};
125
126int Main() {
127 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700128 aos::configuration::ReadConfig(absl::GetFlag(FLAGS_config));
Austin Schuhd8d23612021-10-05 18:26:45 -0700129
130 aos::ShmEventLoop event_loop(&config.message());
131 PingClient server(&event_loop);
132 event_loop.Run();
133
134 return EXIT_SUCCESS;
135}
136
137} // namespace message_bridge
138} // namespace aos
139
140int main(int argc, char **argv) {
141 aos::InitGoogle(&argc, &argv);
142
143 return aos::message_bridge::Main();
144}