Add simple ping pong apps for testing SCTP
We are seeing that when we send a large hunk of data (over 13 ish MB),
we hit SIGXCPU on message_bridge. Rather than debug with the full AOS
stack in play, I wrote a simple demo app which sends a configurable
message at a configurable period. It reproduces the issue quite nicely,
for better or worse.
Change-Id: I80681f149efcb21e856e7f65cb0caa4d890c0107
Signed-off-by: James Kuszmaul <james.kuszmaul@bluerivertech.com>
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 8186abb..96f8b50 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -876,6 +876,30 @@
],
)
+cc_binary(
+ name = "ping",
+ srcs = [
+ "ping.cc",
+ ],
+ deps = [
+ ":sctp_server",
+ "//aos:init",
+ "//aos/events:shm_event_loop",
+ ],
+)
+
+cc_binary(
+ name = "pong",
+ srcs = [
+ "pong.cc",
+ ],
+ deps = [
+ ":sctp_client",
+ "//aos:init",
+ "//aos/events:shm_event_loop",
+ ],
+)
+
cc_test(
name = "timestamp_channel_test",
srcs = ["timestamp_channel_test.cc"],
diff --git a/aos/network/ping.cc b/aos/network/ping.cc
new file mode 100644
index 0000000..f105133
--- /dev/null
+++ b/aos/network/ping.cc
@@ -0,0 +1,136 @@
+#include <chrono>
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/network/sctp_server.h"
+
+DEFINE_string(config, "aos_config.json", "Path to the config.");
+DEFINE_uint32(port, 1323, "Port to pingpong on");
+DEFINE_uint32(size, 1000000, "Size of data to send in bytes");
+DEFINE_uint32(duration, 1000, "Period to send at in milliseconds");
+DEFINE_uint32(ttl, 0, "TTL in milliseconds");
+
+namespace aos {
+namespace message_bridge {
+
+namespace chrono = std::chrono;
+
+class PingServer {
+ public:
+ PingServer(aos::ShmEventLoop *event_loop)
+ : event_loop_(event_loop), server_(2, "::", FLAGS_port) {
+ event_loop_->epoll()->OnReadable(server_.fd(),
+ [this]() { MessageReceived(); });
+ server_.SetMaxReadSize(FLAGS_size + 100);
+ server_.SetMaxWriteSize(FLAGS_size + 100);
+
+ timer_ = event_loop_->AddTimer([this]() { Timer(); });
+
+ event_loop_->OnRun([this]() {
+ timer_->Schedule(event_loop_->monotonic_now(),
+ chrono::milliseconds(FLAGS_duration));
+ });
+
+ event_loop_->SetRuntimeRealtimePriority(5);
+ }
+
+ ~PingServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
+
+ aos::TimerHandler *timer_;
+ sctp_assoc_t sac_assoc_id_ = 0;
+
+ void Timer() {
+ if (sac_assoc_id_ == 0) {
+ LOG(INFO) << "No client, not sending";
+ return;
+ }
+
+ std::string data(FLAGS_size, 'a');
+
+ if (server_.Send(data, sac_assoc_id_, 0, FLAGS_ttl)) {
+ LOG(INFO) << "Sent " << data.size();
+ } else {
+ PLOG(ERROR) << "Failed to send";
+ }
+ }
+
+ void MessageReceived() {
+ aos::unique_c_ptr<Message> message = server_.Read();
+ if (!message) {
+ return;
+ }
+
+ if (message->message_type == Message::kNotification) {
+ const union sctp_notification *snp =
+ (const union sctp_notification *)message->data();
+
+ if (VLOG_IS_ON(2)) {
+ PrintNotification(message.get());
+ }
+
+ switch (snp->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE: {
+ const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+ switch (sac->sac_state) {
+ case SCTP_COMM_UP:
+ NodeConnected(sac->sac_assoc_id);
+ VLOG(1) << "Peer connected";
+ break;
+ case SCTP_COMM_LOST:
+ case SCTP_SHUTDOWN_COMP:
+ case SCTP_CANT_STR_ASSOC:
+ NodeDisconnected(sac->sac_assoc_id);
+ VLOG(1) << "Disconnect";
+ break;
+ case SCTP_RESTART:
+ LOG(FATAL) << "Never seen this before.";
+ break;
+ }
+ } break;
+ }
+ } else if (message->message_type == Message::kMessage) {
+ HandleData(message.get());
+ }
+ }
+
+ void NodeConnected(sctp_assoc_t assoc_id) {
+ sac_assoc_id_ = assoc_id;
+ server_.SetPriorityScheduler(assoc_id);
+ }
+ void NodeDisconnected(sctp_assoc_t /*assoc_id*/) { sac_assoc_id_ = 0; }
+
+ void HandleData(const Message *message) {
+ VLOG(1) << "Received data of length " << message->size;
+
+ if (VLOG_IS_ON(1)) {
+ message->LogRcvInfo();
+ }
+ }
+
+ private:
+ aos::ShmEventLoop *event_loop_;
+ SctpServer server_;
+};
+
+int Main() {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(FLAGS_config);
+
+ aos::ShmEventLoop event_loop(&config.message());
+ PingServer server(&event_loop);
+ event_loop.Run();
+
+ return EXIT_SUCCESS;
+}
+
+} // namespace message_bridge
+} // namespace aos
+
+int main(int argc, char **argv) {
+ aos::InitGoogle(&argc, &argv);
+
+ return aos::message_bridge::Main();
+}
diff --git a/aos/network/pong.cc b/aos/network/pong.cc
new file mode 100644
index 0000000..8f08836
--- /dev/null
+++ b/aos/network/pong.cc
@@ -0,0 +1,138 @@
+#include <chrono>
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/network/sctp_client.h"
+
+DEFINE_string(config, "aos_config.json", "Path to the config.");
+DEFINE_uint32(port, 1323, "Port to pingpong on");
+DEFINE_string(target, "vpu0-0a", "Host to connect to");
+DEFINE_uint32(rx_size, 1000000,
+ "RX buffer size to set the max size to be in bytes.");
+DEFINE_uint32(size, 1000, "Size of data to send in bytes");
+DEFINE_uint32(ttl, 0, "TTL in milliseconds");
+
+namespace aos {
+namespace message_bridge {
+
+namespace chrono = std::chrono;
+
+class PingClient {
+ public:
+ PingClient(aos::ShmEventLoop *event_loop)
+ : event_loop_(event_loop), client_(FLAGS_target, FLAGS_port, 2, "::", 0) {
+ client_.SetMaxReadSize(std::max(FLAGS_rx_size, FLAGS_size) + 100);
+ client_.SetMaxWriteSize(std::max(FLAGS_rx_size, FLAGS_size) + 100);
+
+ timer_ = event_loop_->AddTimer([this]() { Timer(); });
+
+ event_loop_->OnRun([this]() {
+ timer_->Schedule(event_loop_->monotonic_now(),
+ chrono::milliseconds(1000));
+ });
+
+ event_loop_->epoll()->OnReadable(client_.fd(),
+ [this]() { MessageReceived(); });
+ event_loop_->SetRuntimeRealtimePriority(5);
+ }
+
+ ~PingClient() { event_loop_->epoll()->DeleteFd(client_.fd()); }
+
+ aos::TimerHandler *timer_;
+ sctp_assoc_t sac_assoc_id_ = 0;
+
+ void Timer() {
+ std::string data(FLAGS_size, 'a');
+
+ if (client_.Send(0, data, FLAGS_ttl)) {
+ LOG(INFO) << "Sent " << data.size();
+ } else {
+ PLOG(ERROR) << "Failed to send";
+ }
+ }
+
+ void MessageReceived() {
+ aos::unique_c_ptr<Message> message = client_.Read();
+ ++count_;
+ if (!message) {
+ return;
+ }
+
+ if (message->message_type == Message::kNotification) {
+ const union sctp_notification *snp =
+ (const union sctp_notification *)message->data();
+
+ if (VLOG_IS_ON(2)) {
+ PrintNotification(message.get());
+ }
+
+ switch (snp->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE: {
+ const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+ switch (sac->sac_state) {
+ case SCTP_COMM_UP:
+ NodeConnected(sac->sac_assoc_id);
+ VLOG(1) << "Peer connected";
+ break;
+ case SCTP_COMM_LOST:
+ case SCTP_SHUTDOWN_COMP:
+ case SCTP_CANT_STR_ASSOC:
+ NodeDisconnected(sac->sac_assoc_id);
+ VLOG(1) << "Disconnect";
+ break;
+ case SCTP_RESTART:
+ LOG(FATAL) << "Never seen this before.";
+ break;
+ }
+ } break;
+ }
+ } else if (message->message_type == Message::kMessage) {
+ HandleData(message.get());
+ }
+ }
+
+ void NodeConnected(sctp_assoc_t assoc_id) {
+ client_.SetPriorityScheduler(assoc_id);
+ }
+ void NodeDisconnected(sctp_assoc_t /*assoc_id*/) {}
+
+ void HandleData(const Message *message) {
+ LOG(INFO) << "Received data of length " << message->size << " total "
+ << size_ << " count " << count_;
+ size_ += message->size;
+
+ if (VLOG_IS_ON(1)) {
+ message->LogRcvInfo();
+ }
+ }
+
+ private:
+ aos::ShmEventLoop *event_loop_;
+ SctpClient client_;
+
+ size_t count_ = 0;
+ size_t size_ = 0;
+};
+
+int Main() {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(FLAGS_config);
+
+ aos::ShmEventLoop event_loop(&config.message());
+ PingClient server(&event_loop);
+ event_loop.Run();
+
+ return EXIT_SUCCESS;
+}
+
+} // namespace message_bridge
+} // namespace aos
+
+int main(int argc, char **argv) {
+ aos::InitGoogle(&argc, &argv);
+
+ return aos::message_bridge::Main();
+}