blob: 29253ec563c5576344bfbe86db7d7a2b5327164a [file] [log] [blame]
#include <chrono>
#include "absl/flags/flag.h"
#include "absl/log/log.h"
#include "aos/events/shm_event_loop.h"
#include "aos/init.h"
#include "aos/network/sctp_client.h"
ABSL_FLAG(std::string, config, "aos_config.json", "Path to the config.");
ABSL_FLAG(uint32_t, port, 1323, "Port to pingpong on");
ABSL_FLAG(std::string, target, "vpu0-0a", "Host to connect to");
ABSL_FLAG(uint32_t, rx_size, 1000000,
"RX buffer size to set the max size to be in bytes.");
ABSL_FLAG(uint32_t, size, 1000, "Size of data to send in bytes");
ABSL_FLAG(uint32_t, ttl, 0, "TTL in milliseconds");
namespace aos {
namespace message_bridge {
namespace chrono = std::chrono;
class PingClient {
public:
PingClient(aos::ShmEventLoop *event_loop)
: event_loop_(event_loop),
client_(absl::GetFlag(FLAGS_target), absl::GetFlag(FLAGS_port), 2,
"::", 0) {
client_.SetMaxReadSize(
std::max(absl::GetFlag(FLAGS_rx_size), absl::GetFlag(FLAGS_size)) +
100);
client_.SetMaxWriteSize(
std::max(absl::GetFlag(FLAGS_rx_size), absl::GetFlag(FLAGS_size)) +
100);
timer_ = event_loop_->AddTimer([this]() { Timer(); });
event_loop_->OnRun([this]() {
timer_->Schedule(event_loop_->monotonic_now(),
chrono::milliseconds(1000));
});
event_loop_->epoll()->OnReadable(client_.fd(),
[this]() { MessageReceived(); });
event_loop_->SetRuntimeRealtimePriority(5);
}
~PingClient() { event_loop_->epoll()->DeleteFd(client_.fd()); }
aos::TimerHandler *timer_;
sctp_assoc_t sac_assoc_id_ = 0;
void Timer() {
std::string data(absl::GetFlag(FLAGS_size), 'a');
if (client_.Send(0, data, absl::GetFlag(FLAGS_ttl))) {
LOG(INFO) << "Sent " << data.size();
} else {
PLOG(ERROR) << "Failed to send";
}
}
void MessageReceived() {
aos::unique_c_ptr<Message> message = client_.Read();
++count_;
if (!message) {
return;
}
if (message->message_type == Message::kNotification) {
const union sctp_notification *snp =
(const union sctp_notification *)message->data();
if (VLOG_IS_ON(2)) {
PrintNotification(message.get());
}
switch (snp->sn_header.sn_type) {
case SCTP_ASSOC_CHANGE: {
const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
switch (sac->sac_state) {
case SCTP_COMM_UP:
NodeConnected(sac->sac_assoc_id);
VLOG(1) << "Peer connected";
break;
case SCTP_COMM_LOST:
case SCTP_SHUTDOWN_COMP:
case SCTP_CANT_STR_ASSOC:
NodeDisconnected(sac->sac_assoc_id);
VLOG(1) << "Disconnect";
break;
case SCTP_RESTART:
LOG(FATAL) << "Never seen this before.";
break;
}
} break;
}
} else if (message->message_type == Message::kMessage) {
HandleData(message.get());
}
}
void NodeConnected(sctp_assoc_t assoc_id) {
client_.SetPriorityScheduler(assoc_id);
}
void NodeDisconnected(sctp_assoc_t /*assoc_id*/) {}
void HandleData(const Message *message) {
LOG(INFO) << "Received data of length " << message->size << " total "
<< size_ << " count " << count_;
size_ += message->size;
if (VLOG_IS_ON(1)) {
message->LogRcvInfo();
}
}
private:
aos::ShmEventLoop *event_loop_;
SctpClient client_;
size_t count_ = 0;
size_t size_ = 0;
};
int Main() {
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(absl::GetFlag(FLAGS_config));
aos::ShmEventLoop event_loop(&config.message());
PingClient server(&event_loop);
event_loop.Run();
return EXIT_SUCCESS;
}
} // namespace message_bridge
} // namespace aos
int main(int argc, char **argv) {
aos::InitGoogle(&argc, &argv);
return aos::message_bridge::Main();
}