Initial message_bridge client and server
These will forward data, and track what made it across and what didn't
when configured correctly. This should be off if nothing is requested
to be logged remotely.
It implements ttl, reconnects, and has a basic smoke test.
We still need to handle forwarding data for logging.
Change-Id: I7daebe8cef54029a5733b7f81ee6b68367c80d82
diff --git a/aos/config.bzl b/aos/config.bzl
index 3f78422..0766329 100644
--- a/aos/config.bzl
+++ b/aos/config.bzl
@@ -2,7 +2,7 @@
AosConfigInfo = provider(fields = ["transitive_flatbuffers", "transitive_src"])
-def aos_config(name, src, flatbuffers, deps = [], visibility = None):
+def aos_config(name, src, flatbuffers = [], deps = [], visibility = None):
_aos_config(
name = name,
src = src,
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 46f07ac..a40c47c 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -7,6 +7,8 @@
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <unistd.h>
+
+#include <set>
#include <string_view>
#include "absl/container/btree_set.h"
@@ -730,5 +732,57 @@
<< static_cast<int>(connection->timestamp_logger());
}
+std::vector<std::string_view> SourceNodeNames(const Configuration *config,
+ const Node *my_node) {
+ std::set<std::string_view> result_set;
+
+ for (const Channel *channel : *config->channels()) {
+ if (channel->has_destination_nodes()) {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (connection->name()->string_view() ==
+ my_node->name()->string_view()) {
+ result_set.insert(channel->source_node()->string_view());
+ }
+ }
+ }
+ }
+
+ std::vector<std::string_view> result;
+ for (const std::string_view source : result_set) {
+ VLOG(1) << "Found a source node of " << source;
+ result.emplace_back(source);
+ }
+ return result;
+}
+
+std::vector<std::string_view> DestinationNodeNames(const Configuration *config,
+ const Node *my_node) {
+ std::vector<std::string_view> result;
+
+ for (const Channel *channel : *config->channels()) {
+ if (channel->has_source_node() && channel->source_node()->string_view() ==
+ my_node->name()->string_view()) {
+ if (!channel->has_destination_nodes()) continue;
+
+ if (channel->source_node()->string_view() !=
+ my_node->name()->string_view()) {
+ continue;
+ }
+
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (std::find(result.begin(), result.end(),
+ connection->name()->string_view()) == result.end()) {
+ result.emplace_back(connection->name()->string_view());
+ }
+ }
+ }
+ }
+
+ for (const std::string_view destination : result) {
+ VLOG(1) << "Found a destination node of " << destination;
+ }
+ return result;
+}
+
} // namespace configuration
} // namespace aos
diff --git a/aos/configuration.h b/aos/configuration.h
index 7bf8203..55b64c9 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -80,6 +80,14 @@
// Prints a channel to json, but without the schema.
std::string CleanedChannelToString(const Channel *channel);
+// Returns the node names that this node should be forwarding to.
+std::vector<std::string_view> DestinationNodeNames(const Configuration *config,
+ const Node *my_node);
+
+// Returns the node names that this node should be receiving messages from.
+std::vector<std::string_view> SourceNodeNames(const Configuration *config,
+ const Node *my_node);
+
// TODO(austin): GetSchema<T>(const Flatbuffer<Configuration> &config);
} // namespace configuration
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index fab6745..b8fc5b6 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -7,6 +7,7 @@
#include "flatbuffers/reflection.h"
#include "glog/logging.h"
#include "gtest/gtest.h"
+#include "gmock/gmock.h"
namespace aos {
namespace configuration {
@@ -567,6 +568,37 @@
&logged_on_both_channel.message(), &baz_node.message(),
&baz_node.message()));
}
+
+// Tests that we can deduce source nodes from a multinode config.
+TEST_F(ConfigurationTest, SourceNodeNames) {
+ FlatbufferDetachedBuffer<Configuration> config =
+ ReadConfig("aos/testdata/config1_multinode.json");
+
+ // This is a bit simplistic in that it doesn't test deduplication, but it does
+ // exercise a lot of the logic.
+ EXPECT_THAT(
+ SourceNodeNames(&config.message(), config.message().nodes()->Get(0)),
+ ::testing::ElementsAreArray({"pi2"}));
+ EXPECT_THAT(
+ SourceNodeNames(&config.message(), config.message().nodes()->Get(1)),
+ ::testing::ElementsAreArray({"pi1"}));
+}
+
+// Tests that we can deduce destination nodes from a multinode config.
+TEST_F(ConfigurationTest, DestinationNodeNames) {
+ FlatbufferDetachedBuffer<Configuration> config =
+ ReadConfig("aos/testdata/config1_multinode.json");
+
+ // This is a bit simplistic in that it doesn't test deduplication, but it does
+ // exercise a lot of the logic.
+ EXPECT_THAT(
+ DestinationNodeNames(&config.message(), config.message().nodes()->Get(0)),
+ ::testing::ElementsAreArray({"pi2"}));
+ EXPECT_THAT(
+ DestinationNodeNames(&config.message(), config.message().nodes()->Get(1)),
+ ::testing::ElementsAreArray({"pi1"}));
+}
+
} // namespace testing
} // namespace configuration
} // namespace aos
diff --git a/aos/events/BUILD b/aos/events/BUILD
index e40ab99..79200bd 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -121,6 +121,8 @@
flatbuffers = [
":ping_fbs",
":pong_fbs",
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:message_bridge_server_fbs",
],
deps = [":config"],
)
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 97d5783..b2be972 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -1,4 +1,5 @@
load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:config.bzl", "aos_config")
flatbuffer_cc_library(
name = "logger_fbs",
@@ -84,11 +85,21 @@
],
)
+aos_config(
+ name = "multinode_pingpong_config",
+ src = "multinode_pingpong.json",
+ flatbuffers = [
+ "//aos/events:ping_fbs",
+ "//aos/events:pong_fbs",
+ ],
+ deps = ["//aos/events:config"],
+)
+
cc_test(
name = "logger_test",
srcs = ["logger_test.cc"],
data = [
- "//aos/events:multinode_pingpong_config.json",
+ ":multinode_pingpong_config.json",
"//aos/events:pingpong_config.json",
],
deps = [
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index 91190e7..70c520d 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -144,7 +144,7 @@
public:
MultinodeLoggerTest()
: config_(aos::configuration::ReadConfig(
- "aos/events/multinode_pingpong_config.json")),
+ "aos/events/logging/multinode_pingpong_config.json")),
event_loop_factory_(&config_.message(), "pi1"),
ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
ping_(ping_event_loop_.get()) {}
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
new file mode 100644
index 0000000..f85a4e1
--- /dev/null
+++ b/aos/events/logging/multinode_pingpong.json
@@ -0,0 +1,93 @@
+{
+ "channels": [
+ /* Logged on pi1 locally */
+ {
+ "name": "/aos/pi1",
+ "type": "aos.timing.Report",
+ "source_node": "pi1",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.timing.Report",
+ "source_node": "pi2",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ /* Forwarded to pi2.
+ * Doesn't matter where timestamps are logged for the test.
+ */
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "pi1",
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ /* Forwarded back to pi1.
+ * The message is logged both on the sending node and the receiving node
+ * (to make it easier to look at the results for now).
+ *
+ * The timestamps are logged on the receiving node.
+ */
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong",
+ "source_node": "pi2",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
+ }
+ ]
+ }
+ ],
+ "maps": [
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/aos/pi1"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/aos/pi2"
+ }
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2",
+ "port": 9971
+ }
+ ]
+}
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index f0e532e..c0c5087 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -2,6 +2,54 @@
"channels": [
{
"name": "/aos/pi1",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi3",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi3",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/roborio",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "roborio",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi1",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi3",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi3",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/roborio",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "roborio",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi1",
"type": "aos.timing.Report",
"source_node": "pi1",
"frequency": 50,
@@ -25,6 +73,14 @@
"max_size": 2048
},
{
+ "name": "/aos/roborio",
+ "type": "aos.timing.Report",
+ "source_node": "roborio",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "pi1",
@@ -32,8 +88,8 @@
{
"name": "pi2",
"priority": 1,
- "timestamp_logger": "REMOTE_LOGGER",
- "timestamp_logger_node": "pi1"
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
}
]
},
@@ -41,14 +97,12 @@
"name": "/test",
"type": "aos.examples.Pong",
"source_node": "pi2",
- "logger": "LOCAL_AND_REMOTE_LOGGER",
- "logger_node": "pi1",
"destination_nodes": [
{
"name": "pi1",
"priority": 1,
- "timestamp_logger": "REMOTE_LOGGER",
- "timestamp_logger_node": "pi1"
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
}
]
},
@@ -60,8 +114,8 @@
{
"name": "pi3",
"priority": 1,
- "timestamp_logger": "REMOTE_LOGGER",
- "timestamp_logger_node": "pi1"
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
}
]
},
@@ -69,14 +123,12 @@
"name": "/test2",
"type": "aos.examples.Pong",
"source_node": "pi3",
- "logger": "LOCAL_AND_REMOTE_LOGGER",
- "logger_node": "pi1",
"destination_nodes": [
{
"name": "pi1",
"priority": 1,
- "timestamp_logger": "REMOTE_LOGGER",
- "timestamp_logger_node": "pi1"
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
}
]
}
@@ -111,6 +163,96 @@
"rename": {
"name": "/aos/pi3"
}
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "source_node": "roborio"
+ },
+ "rename": {
+ "name": "/aos/roborio"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/aos/pi1"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/aos/pi2"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi3"
+ },
+ "rename": {
+ "name": "/aos/pi3"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "roborio"
+ },
+ "rename": {
+ "name": "/aos/roborio"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/aos/pi1"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/aos/pi2"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi3"
+ },
+ "rename": {
+ "name": "/aos/pi3"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "roborio"
+ },
+ "rename": {
+ "name": "/aos/roborio"
+ }
}
],
"nodes": [
@@ -128,6 +270,11 @@
"name": "pi3",
"hostname": "raspberrypi3",
"port": 9971
+ },
+ {
+ "name": "roborio",
+ "hostname": "roboRIO-6971-FRC",
+ "port": 9971
}
],
"applications": [
@@ -156,6 +303,32 @@
}
}
]
+ },
+ {
+ "name": "ping3",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test3"
+ }
+ }
+ ]
+ },
+ {
+ "name": "pong3",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test3"
+ }
+ }
+ ]
}
]
}
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 3ced933..bdee4f2 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -21,10 +21,26 @@
#include "aos/util/phased_loop.h"
#include "glog/logging.h"
+namespace {
+
+// Returns the portion of the path after the last /. This very much assumes
+// that the application name is null terminated.
+const char *Filename(const char *path) {
+ const std::string_view path_string_view = path;
+ auto last_slash_pos = path_string_view.find_last_of("/");
+
+ return last_slash_pos == std::string_view::npos ? path
+ : path + last_slash_pos + 1;
+}
+
+} // namespace
+
DEFINE_string(shm_base, "/dev/shm/aos",
"Directory to place queue backing mmaped files in.");
DEFINE_uint32(permissions, 0770,
"Permissions to make shared memory files and folders.");
+DEFINE_string(application_name, Filename(program_invocation_name),
+ "The application name");
namespace aos {
@@ -135,15 +151,6 @@
namespace {
-// Returns the portion of the path after the last /.
-std::string_view Filename(std::string_view path) {
- auto last_slash_pos = path.find_last_of("/");
-
- return last_slash_pos == std::string_view::npos
- ? path
- : path.substr(last_slash_pos + 1, path.size());
-}
-
const Node *MaybeMyNode(const Configuration *configuration) {
if (!configuration->has_nodes()) {
return nullptr;
@@ -158,7 +165,7 @@
ShmEventLoop::ShmEventLoop(const Configuration *configuration)
: EventLoop(configuration),
- name_(Filename(program_invocation_name)),
+ name_(FLAGS_application_name),
node_(MaybeMyNode(configuration)) {
if (configuration->has_nodes()) {
CHECK(node_ != nullptr) << ": Couldn't find node in config.";
@@ -802,15 +809,16 @@
}
SignalHandler::global()->Unregister(this);
+
+ // Trigger any remaining senders or fetchers to be cleared before destroying
+ // the event loop so the book keeping matches. Do this in the thread that
+ // created the timing reporter.
+ timing_report_sender_.reset();
}
void ShmEventLoop::Exit() { epoll_.Quit(); }
ShmEventLoop::~ShmEventLoop() {
- // Trigger any remaining senders or fetchers to be cleared before destroying
- // the event loop so the book keeping matches.
- timing_report_sender_.reset();
-
// Force everything with a registered fd with epoll to be destroyed now.
timers_.clear();
phased_loops_.clear();
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index d10989e..bb4ad77 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -71,6 +71,8 @@
int priority() const override { return priority_; }
+ internal::EPoll *epoll() { return &epoll_; }
+
private:
friend class internal::WatcherState;
friend class internal::TimerHandlerState;
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 4e4520c..4388687 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -5,6 +5,7 @@
#include <string_view>
#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
namespace aos {
@@ -31,15 +32,19 @@
// This class is a fixed memory allocator which holds the data for a flatbuffer
// in an array.
-template <size_t S>
class FixedAllocator : public FixedAllocatorBase {
public:
+ FixedAllocator(size_t size) : buffer_(size, 0) {}
+
uint8_t *data() override { return &buffer_[0]; }
const uint8_t *data() const override { return &buffer_[0]; }
size_t size() const override { return buffer_.size(); }
+ // Releases the data in the buffer.
+ std::vector<uint8_t> release() { return std::move(buffer_); }
+
private:
- std::array<uint8_t, S> buffer_;
+ std::vector<uint8_t> buffer_;
};
// This class adapts a preallocated memory region to an Allocator.
@@ -82,46 +87,6 @@
virtual size_t size() const = 0;
};
-// Array backed flatbuffer.
-template <typename T>
-class FlatbufferArray : public Flatbuffer<T> {
- public:
- // Builds a Flatbuffer by copying the data from the other flatbuffer.
- FlatbufferArray(const Flatbuffer<T> &other) {
- CHECK_LE(other.size(), data_.size());
-
- memcpy(data_.data(), other.data(), other.size());
- size_ = other.size();
- }
-
- // Coppies the data from the other flatbuffer.
- FlatbufferArray &operator=(const Flatbuffer<T> &other) {
- CHECK_LE(other.size(), data_.size());
-
- memcpy(data_.data(), other.data(), other.size());
- size_ = other.size();
- return *this;
- }
-
- virtual ~FlatbufferArray() override {}
-
- // Creates a builder wrapping the underlying data.
- flatbuffers::FlatBufferBuilder FlatBufferBuilder() {
- data_.deallocate(data_.data(), data_.size());
- flatbuffers::FlatBufferBuilder fbb(data_.size(), &data_);
- fbb.ForceDefaults(1);
- return fbb;
- }
-
- const uint8_t *data() const override { return data_.data(); }
- uint8_t *data() override { return data_.data(); }
- size_t size() const override { return size_; }
-
- private:
- FixedAllocator<8 * 1024> data_;
- size_t size_ = data_.size();
-};
-
// String backed flatbuffer.
template <typename T>
class FlatbufferString : public Flatbuffer<T> {
@@ -228,6 +193,48 @@
flatbuffers::DetachedBuffer buffer_;
};
+// This object associates the message type with the memory storing the
+// flatbuffer. This only stores root tables.
+//
+// From a usage point of view, pointers to the data are very different than
+// pointers to the tables.
+template <typename T>
+class SizePrefixedFlatbufferDetachedBuffer final : public Flatbuffer<T> {
+ public:
+ // Builds a Flatbuffer by taking ownership of the buffer.
+ SizePrefixedFlatbufferDetachedBuffer(flatbuffers::DetachedBuffer &&buffer)
+ : buffer_(::std::move(buffer)) {
+ CHECK_GE(buffer_.size(), sizeof(flatbuffers::uoffset_t));
+ }
+
+ // Builds a flatbuffer by taking ownership of the buffer from the other
+ // flatbuffer.
+ SizePrefixedFlatbufferDetachedBuffer(
+ SizePrefixedFlatbufferDetachedBuffer &&fb)
+ : buffer_(::std::move(fb.buffer_)) {}
+ SizePrefixedFlatbufferDetachedBuffer &operator=(
+ SizePrefixedFlatbufferDetachedBuffer &&fb) {
+ ::std::swap(buffer_, fb.buffer_);
+ return *this;
+ }
+
+ virtual ~SizePrefixedFlatbufferDetachedBuffer() override {}
+
+ // Returns references to the buffer, and the data.
+ const flatbuffers::DetachedBuffer &buffer() const { return buffer_; }
+ const uint8_t *data() const override {
+ return buffer_.data() + sizeof(flatbuffers::uoffset_t);
+ }
+ uint8_t *data() override {
+ return buffer_.data() + sizeof(flatbuffers::uoffset_t);
+ }
+ size_t size() const override {
+ return buffer_.size() - sizeof(flatbuffers::uoffset_t);
+ }
+
+ private:
+ flatbuffers::DetachedBuffer buffer_;
+};
// TODO(austin): Need a way to get our hands on the max size. Can start with
// "large" for now.
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 5d96183..9d3b3c4 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -1,5 +1,36 @@
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:config.bzl", "aos_config")
+
package(default_visibility = ["//visibility:public"])
+flatbuffer_cc_library(
+ name = "connect_fbs",
+ srcs = ["connect.fbs"],
+ gen_reflections = 1,
+ includes = [
+ "//aos:configuration_fbs_includes",
+ ],
+)
+
+flatbuffer_cc_library(
+ name = "message_bridge_client_fbs",
+ srcs = ["message_bridge_client.fbs"],
+ gen_reflections = 1,
+ includes = [
+ ":message_bridge_server_fbs_includes",
+ "//aos:configuration_fbs_includes",
+ ],
+)
+
+flatbuffer_cc_library(
+ name = "message_bridge_server_fbs",
+ srcs = ["message_bridge_server.fbs"],
+ gen_reflections = 1,
+ includes = [
+ "//aos:configuration_fbs_includes",
+ ],
+)
+
cc_library(
name = "team_number",
srcs = [
@@ -25,3 +56,182 @@
"//aos/testing:googletest",
],
)
+
+cc_library(
+ name = "sctp_lib",
+ srcs = [
+ "sctp_lib.cc",
+ ],
+ hdrs = [
+ "sctp_lib.h",
+ ],
+ copts = [
+ # The casts required to read datastructures from sockets trip -Wcast-align.
+ "-Wno-cast-align",
+ ],
+ deps = [
+ "//aos:unique_malloc_ptr",
+ "//third_party/lksctp-tools:sctp",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
+cc_library(
+ name = "sctp_server",
+ srcs = [
+ "sctp_server.cc",
+ ],
+ hdrs = [
+ "sctp_server.h",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ ],
+ deps = [
+ ":sctp_lib",
+ "//third_party/lksctp-tools:sctp",
+ ],
+)
+
+cc_library(
+ name = "message_bridge_protocol",
+ hdrs = [
+ "message_bridge_protocol.h",
+ ],
+)
+
+cc_library(
+ name = "message_bridge_server_lib",
+ srcs = [
+ "message_bridge_server_lib.cc",
+ ],
+ hdrs = [
+ "message_bridge_server_lib.h",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ ],
+ deps = [
+ ":connect_fbs",
+ ":message_bridge_protocol",
+ ":message_bridge_server_fbs",
+ ":sctp_lib",
+ ":sctp_server",
+ "//aos:unique_malloc_ptr",
+ "//aos/events:shm_event_loop",
+ "//aos/events/logging:logger",
+ "//third_party/lksctp-tools:sctp",
+ ],
+)
+
+cc_binary(
+ name = "message_bridge_server",
+ srcs = [
+ "message_bridge_server.cc",
+ ],
+ deps = [
+ ":message_bridge_server_lib",
+ "//aos:init",
+ "//aos:json_to_flatbuffer",
+ "//aos/events:shm_event_loop",
+ ],
+)
+
+cc_library(
+ name = "sctp_client",
+ srcs = [
+ "sctp_client.cc",
+ ],
+ hdrs = [
+ "sctp_client.h",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ ],
+ deps = [
+ ":sctp_lib",
+ "//third_party/lksctp-tools:sctp",
+ ],
+)
+
+cc_library(
+ name = "message_bridge_client_lib",
+ srcs = [
+ "message_bridge_client_lib.cc",
+ ],
+ hdrs = [
+ "message_bridge_client_lib.h",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ ],
+ deps = [
+ ":connect_fbs",
+ ":message_bridge_client_fbs",
+ ":message_bridge_protocol",
+ ":message_bridge_server_fbs",
+ ":sctp_client",
+ "//aos/events:shm_event_loop",
+ "//aos/events/logging:logger",
+ ],
+)
+
+cc_binary(
+ name = "message_bridge_client",
+ srcs = [
+ "message_bridge_client.cc",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ ],
+ deps = [
+ ":message_bridge_client_lib",
+ "//aos:init",
+ "//aos:json_to_flatbuffer",
+ "//aos/events:shm_event_loop",
+ ],
+)
+
+aos_config(
+ name = "message_bridge_test_common_config",
+ src = "message_bridge_test_common.json",
+ flatbuffers = [
+ "//aos/events:ping_fbs",
+ "//aos/events:pong_fbs",
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:message_bridge_server_fbs",
+ ],
+ deps = ["//aos/events:config"],
+)
+
+aos_config(
+ name = "message_bridge_test_server_config",
+ src = "message_bridge_test_server.json",
+ deps = [":message_bridge_test_common_config"],
+)
+
+aos_config(
+ name = "message_bridge_test_client_config",
+ src = "message_bridge_test_client.json",
+ deps = [":message_bridge_test_common_config"],
+)
+
+cc_test(
+ name = "message_bridge_test",
+ srcs = [
+ "message_bridge_test.cc",
+ ],
+ data = [
+ ":message_bridge_test_client_config.json",
+ ":message_bridge_test_server_config.json",
+ ],
+ deps = [
+ ":message_bridge_client_lib",
+ ":message_bridge_server_lib",
+ "//aos:json_to_flatbuffer",
+ "//aos/events:ping_fbs",
+ "//aos/events:pong_fbs",
+ "//aos/events:shm_event_loop",
+ "//aos/testing:googletest",
+ ],
+)
diff --git a/aos/network/connect.fbs b/aos/network/connect.fbs
new file mode 100644
index 0000000..32893b8
--- /dev/null
+++ b/aos/network/connect.fbs
@@ -0,0 +1,13 @@
+include "aos/configuration.fbs";
+
+namespace aos.message_bridge;
+
+// This is the message sent to initiate a connection to a message_bridge.
+// It communicates the channels that need to be forwarded back.
+table Connect {
+ // The node making the request.
+ node:aos.Node;
+
+ // The channels that we want transfered to this client.
+ channels_to_transfer:[Channel];
+}
diff --git a/aos/network/message_bridge_client.cc b/aos/network/message_bridge_client.cc
new file mode 100644
index 0000000..c44eee0
--- /dev/null
+++ b/aos/network/message_bridge_client.cc
@@ -0,0 +1,36 @@
+#include "aos/network/message_bridge_client_lib.h"
+
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+
+DEFINE_string(config, "multinode_pingpong_config.json", "Path to the config.");
+
+namespace aos {
+namespace message_bridge {
+
+int Main() {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(FLAGS_config);
+
+ aos::ShmEventLoop event_loop(&config.message());
+
+ MessageBridgeClient app(&event_loop);
+
+ // TODO(austin): Save messages into a vector to be logged. One file per
+ // channel? Need to sort out ordering.
+ //
+ // TODO(austin): Low priority, "reliable" logging channel.
+
+ event_loop.Run();
+
+ return EXIT_SUCCESS;
+}
+
+} // namespace message_bridge
+} // namespace aos
+
+int main(int argc, char **argv) {
+ aos::InitGoogle(&argc, &argv);
+
+ return aos::message_bridge::Main();
+}
diff --git a/aos/network/message_bridge_client.fbs b/aos/network/message_bridge_client.fbs
new file mode 100644
index 0000000..58b653a
--- /dev/null
+++ b/aos/network/message_bridge_client.fbs
@@ -0,0 +1,24 @@
+include "aos/network/message_bridge_server.fbs";
+
+namespace aos.message_bridge;
+
+// Statistics from a single client connection to a server.
+table ClientConnection {
+ // The node that we are connected to.
+ node:Node;
+
+ // Health of this connection. Connected or not?
+ state:State;
+
+ // Number of packets received on all channels.
+ received_packets:uint;
+
+ // TODO(austin): Per channel counts?
+}
+
+// Statistics for all clients.
+table ClientStatistics {
+ connections:[ClientConnection];
+}
+
+root_type ClientStatistics;
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
new file mode 100644
index 0000000..3652db2
--- /dev/null
+++ b/aos/network/message_bridge_client_lib.cc
@@ -0,0 +1,361 @@
+#include "aos/network/message_bridge_client_lib.h"
+
+#include <chrono>
+#include <string_view>
+
+#include "aos/events/logging/logger.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_protocol.h"
+#include "aos/network/sctp_client.h"
+#include "aos/unique_malloc_ptr.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+// This application receives messages from another node and re-publishes them on
+// this node.
+//
+// To simulate packet loss for testing, run:
+// tc qdisc add dev eth0 root netem loss random 10
+// To restore it, run:
+// tc qdisc del dev eth0 root netem
+
+namespace aos {
+namespace message_bridge {
+namespace {
+namespace chrono = std::chrono;
+
+aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
+ const Configuration *config, const Node *my_node,
+ std::string_view remote_name) {
+ CHECK(config->has_nodes()) << ": Config must have nodes to transfer.";
+
+ flatbuffers::FlatBufferBuilder fbb;
+
+ flatbuffers::Offset<Node> node_offset = CopyFlatBuffer<Node>(my_node, &fbb);
+ const std::string_view node_name = my_node->name()->string_view();
+
+ std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+ for (const Channel *channel : *config->channels()) {
+ if (channel->has_destination_nodes()) {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (connection->name()->string_view() == node_name &&
+ channel->source_node()->string_view() == remote_name) {
+ channel_offsets.emplace_back(CopyFlatBuffer<Channel>(channel, &fbb));
+ }
+ }
+ }
+ }
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+ channels_offset = fbb.CreateVector(channel_offsets);
+
+ Connect::Builder connect_builder(fbb);
+ connect_builder.add_channels_to_transfer(channels_offset);
+ connect_builder.add_node(node_offset);
+ fbb.Finish(connect_builder.Finish());
+
+ return fbb.Release();
+}
+
+std::vector<int> StreamToChannel(const Configuration *config,
+ const Node *my_node, const Node *other_node) {
+ std::vector<int> stream_to_channel;
+ int channel_index = 0;
+ for (const Channel *channel : *config->channels()) {
+ if (configuration::ChannelIsSendableOnNode(channel, other_node)) {
+ const Connection *connection =
+ configuration::ConnectionToNode(channel, my_node);
+ if (connection != nullptr) {
+ stream_to_channel.emplace_back(channel_index);
+ }
+ }
+ ++channel_index;
+ }
+
+ return stream_to_channel;
+}
+
+std::vector<bool> StreamReplyWithTimestamp(const Configuration *config,
+ const Node *my_node,
+ const Node *other_node) {
+ std::vector<bool> stream_reply_with_timestamp;
+ int channel_index = 0;
+ for (const Channel *channel : *config->channels()) {
+ if (configuration::ChannelIsSendableOnNode(channel, other_node)) {
+ const Connection *connection =
+ configuration::ConnectionToNode(channel, my_node);
+ if (connection != nullptr) {
+ // We want to reply with a timestamp if the other node is logging the
+ // timestamp (and it therefore needs the timestamp), or if we are
+ // logging the message and it needs to know if we received it so it can
+ // log (in the future) it through different mechanisms on failure.
+ stream_reply_with_timestamp.emplace_back(
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+ other_node) ||
+ configuration::ChannelMessageIsLoggedOnNode(channel, my_node));
+ }
+ }
+ ++channel_index;
+ }
+
+ return stream_reply_with_timestamp;
+}
+
+aos::FlatbufferDetachedBuffer<aos::logger::MessageHeader>
+MakeMessageHeaderReply() {
+ flatbuffers::FlatBufferBuilder fbb;
+ logger::MessageHeader::Builder message_header_builder(fbb);
+ message_header_builder.add_channel_index(0);
+ message_header_builder.add_monotonic_sent_time(0);
+ message_header_builder.add_monotonic_remote_time(0);
+ message_header_builder.add_realtime_remote_time(0);
+ message_header_builder.add_remote_queue_index(0);
+ fbb.Finish(message_header_builder.Finish());
+
+ return fbb.Release();
+}
+
+FlatbufferDetachedBuffer<ClientStatistics> MakeClientStatistics(
+ const std::vector<std::string_view> &source_node_names,
+ const Configuration *configuration) {
+ flatbuffers::FlatBufferBuilder fbb;
+
+ std::vector<flatbuffers::Offset<ClientConnection>> connection_offsets;
+ for (const std::string_view node_name : source_node_names) {
+ flatbuffers::Offset<Node> node_offset =
+ CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+ ClientConnection::Builder connection_builder(fbb);
+ connection_builder.add_node(node_offset);
+ connection_builder.add_state(State::DISCONNECTED);
+ // TODO(austin): Track dropped packets.
+ connection_builder.add_received_packets(0);
+ connection_offsets.emplace_back(connection_builder.Finish());
+ }
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
+ connections_offset = fbb.CreateVector(connection_offsets);
+
+ ClientStatistics::Builder client_statistics_builder(fbb);
+ client_statistics_builder.add_connections(connections_offset);
+ fbb.Finish(client_statistics_builder.Finish());
+
+ return fbb.Release();
+}
+
+} // namespace
+
+SctpClientConnection::SctpClientConnection(
+ aos::ShmEventLoop *const event_loop, std::string_view remote_name,
+ const Node *my_node, std::string_view local_host,
+ std::vector<std::unique_ptr<aos::RawSender>> *channels,
+ ClientConnection *connection)
+ : event_loop_(event_loop),
+ connect_message_(MakeConnectMessage(event_loop->configuration(), my_node,
+ remote_name)),
+ message_reception_reply_(MakeMessageHeaderReply()),
+ remote_node_(CHECK_NOTNULL(
+ configuration::GetNode(event_loop->configuration(), remote_name))),
+ client_(remote_node_->hostname()->string_view(), remote_node_->port(),
+ connect_message_.message().channels_to_transfer()->size() +
+ kControlStreams(),
+ local_host, 0),
+ channels_(channels),
+ stream_to_channel_(
+ StreamToChannel(event_loop->configuration(), my_node, remote_node_)),
+ stream_reply_with_timestamp_(StreamReplyWithTimestamp(
+ event_loop->configuration(), my_node, remote_node_)),
+ connection_(connection) {
+ VLOG(1) << "Connect request for " << remote_node_->name()->string_view()
+ << ": " << FlatbufferToJson(connect_message_);
+
+ connect_timer_ = event_loop_->AddTimer([this]() { SendConnect(); });
+ event_loop_->OnRun(
+ [this]() { connect_timer_->Setup(event_loop_->monotonic_now()); });
+
+ event_loop_->epoll()->OnReadable(client_.fd(),
+ [this]() { MessageReceived(); });
+}
+
+void SctpClientConnection::MessageReceived() {
+ // Dispatch the message to the correct receiver.
+ aos::unique_c_ptr<Message> message = client_.Read();
+
+ if (message->message_type == Message::kNotification) {
+ const union sctp_notification *snp =
+ (const union sctp_notification *)message->data();
+
+ switch (snp->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE: {
+ const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+ switch (sac->sac_state) {
+ case SCTP_COMM_UP:
+ NodeConnected(sac->sac_assoc_id);
+
+ VLOG(1) << "Received up from " << message->PeerAddress() << " on "
+ << sac->sac_assoc_id;
+ break;
+ case SCTP_COMM_LOST:
+ case SCTP_SHUTDOWN_COMP:
+ case SCTP_CANT_STR_ASSOC: {
+ NodeDisconnected();
+ } break;
+ case SCTP_RESTART:
+ LOG(FATAL) << "Never seen this before.";
+ break;
+ }
+ } break;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ PrintNotification(message.get());
+ }
+ } else if (message->message_type == Message::kMessage) {
+ HandleData(message.get());
+ }
+}
+
+void SctpClientConnection::SendConnect() {
+ // Try to send the connect message. If that fails, retry.
+ if (!client_.Send(kConnectStream(),
+ std::string_view(
+ reinterpret_cast<const char *>(connect_message_.data()),
+ connect_message_.size()),
+ 0)) {
+ NodeDisconnected();
+ }
+}
+
+void SctpClientConnection::NodeConnected(sctp_assoc_t assoc_id) {
+ connect_timer_->Disable();
+
+ // We want to tell the kernel to schedule the packets on this new stream with
+ // the priority scheduler. This only needs to be done once per stream.
+ client_.SetPriorityScheduler(assoc_id);
+
+ remote_assoc_id_ = assoc_id;
+ connection_->mutate_state(State::CONNECTED);
+}
+
+void SctpClientConnection::NodeDisconnected() {
+ connect_timer_->Setup(
+ event_loop_->monotonic_now() + chrono::milliseconds(100),
+ chrono::milliseconds(100));
+ remote_assoc_id_ = 0;
+ connection_->mutate_state(State::DISCONNECTED);
+}
+
+void SctpClientConnection::HandleData(const Message *message) {
+ const logger::MessageHeader *message_header =
+ flatbuffers::GetSizePrefixedRoot<logger::MessageHeader>(message->data());
+
+ connection_->mutate_received_packets(connection_->received_packets() + 1);
+
+ const int stream = message->header.rcvinfo.rcv_sid - kControlStreams();
+
+ // Publish the message.
+ RawSender *sender = (*channels_)[stream_to_channel_[stream]].get();
+ sender->Send(message_header->data()->data(), message_header->data()->size(),
+ aos::monotonic_clock::time_point(
+ chrono::nanoseconds(message_header->monotonic_sent_time())),
+ aos::realtime_clock::time_point(
+ chrono::nanoseconds(message_header->realtime_sent_time())),
+ message_header->queue_index());
+
+ if (stream_reply_with_timestamp_[stream]) {
+ // TODO(austin): Send back less if we are only acking. Maybe only a
+ // stream id? Nothing if we are only forwarding?
+
+ // Now fill out the message received reply. This uses a MessageHeader
+ // container so it can be directly logged.
+ message_reception_reply_.mutable_message()->mutate_channel_index(
+ message_header->channel_index());
+ message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
+ message_header->monotonic_sent_time());
+
+ // And capture the relevant data needed to generate the forwarding
+ // MessageHeader.
+ message_reception_reply_.mutable_message()->mutate_monotonic_remote_time(
+ sender->monotonic_sent_time().time_since_epoch().count());
+ message_reception_reply_.mutable_message()->mutate_realtime_remote_time(
+ sender->realtime_sent_time().time_since_epoch().count());
+ message_reception_reply_.mutable_message()->mutate_remote_queue_index(
+ sender->sent_queue_index());
+
+ // Unique ID is channel_index and monotonic clock.
+ // TODO(austin): Depending on if we are the logger node or not, we need to
+ // guarentee that this ack gets received too... Same path as the logger.
+ client_.Send(kTimestampStream(),
+ std::string_view(reinterpret_cast<const char *>(
+ message_reception_reply_.data()),
+ message_reception_reply_.size()),
+ 0);
+ }
+
+ VLOG(1) << "Received data of length " << message->size << " from "
+ << message->PeerAddress();
+
+ if (VLOG_IS_ON(1)) {
+ client_.LogSctpStatus(message->header.rcvinfo.rcv_assoc_id);
+ }
+
+ VLOG(2) << "\tSNDRCV (stream=" << message->header.rcvinfo.rcv_sid
+ << " ssn=" << message->header.rcvinfo.rcv_ssn
+ << " tsn=" << message->header.rcvinfo.rcv_tsn << " flags=0x"
+ << std::hex << message->header.rcvinfo.rcv_flags << std::dec
+ << " ppid=" << message->header.rcvinfo.rcv_ppid
+ << " cumtsn=" << message->header.rcvinfo.rcv_cumtsn << ")";
+}
+
+MessageBridgeClient::MessageBridgeClient(aos::ShmEventLoop *event_loop)
+ : event_loop_(event_loop),
+ sender_(event_loop_->MakeSender<ClientStatistics>("/aos")),
+ source_node_names_(configuration::SourceNodeNames(
+ event_loop->configuration(), event_loop->node())),
+ statistics_(MakeClientStatistics(source_node_names_,
+ event_loop->configuration())) {
+ std::string_view node_name = event_loop->node()->name()->string_view();
+
+ // Find all the channels which are supposed to be delivered to us.
+ channels_.resize(event_loop_->configuration()->channels()->size());
+ int channel_index = 0;
+ for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ if (channel->has_destination_nodes()) {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (connection->name()->string_view() == node_name) {
+ // Give the config a chance to remap us. This helps with testing on a
+ // single node.
+ const Channel *mapped_channel = configuration::GetChannel(
+ event_loop_->configuration(), channel->name()->string_view(),
+ channel->type()->string_view(), event_loop_->name(),
+ event_loop_->node());
+ channels_[channel_index] = event_loop_->MakeRawSender(mapped_channel);
+ break;
+ }
+ }
+ }
+ ++channel_index;
+ }
+
+ // Now, for each source node, build a connection.
+ int node_index = 0;
+ for (const std::string_view source_node : source_node_names_) {
+ // Open an unspecified connection (:: in ipv6 terminology)
+ connections_.emplace_back(new SctpClientConnection(
+ event_loop, source_node, event_loop->node(), "::", &channels_,
+ statistics_.mutable_message()->mutable_connections()->GetMutableObject(
+ node_index)));
+ ++node_index;
+ }
+
+ // And kick it all off.
+ statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+ event_loop_->OnRun([this]() {
+ statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
+ chrono::seconds(1));
+ });
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
new file mode 100644
index 0000000..77bf2b7
--- /dev/null
+++ b/aos/network/message_bridge_client_lib.h
@@ -0,0 +1,116 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
+
+#include <string_view>
+
+#include "aos/events/event_loop.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/sctp_client.h"
+#include "aos/network/sctp_lib.h"
+
+namespace aos {
+namespace message_bridge {
+
+// See message_bridge_protocol.h for more details about the protocol.
+
+// This class encapsulates all the state required to connect to a server and
+// transmit messages.
+class SctpClientConnection {
+ public:
+ SctpClientConnection(aos::ShmEventLoop *const event_loop,
+ std::string_view remote_name, const Node *my_node,
+ std::string_view local_host,
+ std::vector<std::unique_ptr<aos::RawSender>> *channels,
+ ClientConnection *connection);
+
+ ~SctpClientConnection() { event_loop_->epoll()->DeleteFd(client_.fd()); }
+
+ private:
+ // Reads a message from the socket. Could be a notification.
+ void MessageReceived();
+
+ // Sends a connection request message.
+ void SendConnect();
+
+ // Called when the server connection succeeds.
+ void NodeConnected(sctp_assoc_t assoc_id);
+ // Called when the server connection disconnects.
+ void NodeDisconnected();
+ void HandleData(const Message *message);
+
+ // Event loop to register the server on.
+ aos::ShmEventLoop *const event_loop_;
+
+ // Message to send on connect.
+ const aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect>
+ connect_message_;
+
+ // Starting point for the message reception reply (including timestamps).
+ aos::FlatbufferDetachedBuffer<aos::logger::MessageHeader>
+ message_reception_reply_;
+
+ // Node we are sending to.
+ const aos::Node *const remote_node_;
+
+ // SCTP client. There is a client per connection so we don't have to deal
+ // with association ids nearly as badly.
+ SctpClient client_;
+
+ // Channels to send received messages on.
+ std::vector<std::unique_ptr<aos::RawSender>> *channels_;
+ // Stream number -> channel lookup.
+ std::vector<int> stream_to_channel_;
+ // Bitmask signaling if we should be replying back with delivery times.
+ std::vector<bool> stream_reply_with_timestamp_;
+
+ // Timer which fires to handle reconnections.
+ aos::TimerHandler *connect_timer_;
+
+ // ClientConnection statistics message to modify. This will be published
+ // periodicially.
+ ClientConnection *connection_;
+
+ // id of the server once known. This is only valid if connection_ says
+ // connected.
+ sctp_assoc_t remote_assoc_id_ = 0;
+};
+
+// This encapsulates the state required to talk to *all* the servers from this
+// node.
+class MessageBridgeClient {
+ public:
+ MessageBridgeClient(aos::ShmEventLoop *event_loop);
+
+ ~MessageBridgeClient() {}
+
+ private:
+ // Sends out the statistics that are continually updated by the
+ // SctpClientConnections.
+ void SendStatistics() { sender_.Send(statistics_); }
+
+ // Event loop to schedule everything on.
+ aos::ShmEventLoop *event_loop_;
+ // Sender to publish statistics on.
+ aos::Sender<ClientStatistics> sender_;
+ aos::TimerHandler *statistics_timer_;
+
+ // Nodes to receive data from.
+ const std::vector<std::string_view> source_node_names_;
+
+ // Data to publish.
+ FlatbufferDetachedBuffer<ClientStatistics> statistics_;
+
+ // Channels to send data over.
+ std::vector<std::unique_ptr<aos::RawSender>> channels_;
+
+ // List of connections. These correspond to the nodes in source_node_names_
+ std::vector<std::unique_ptr<SctpClientConnection>> connections_;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
diff --git a/aos/network/message_bridge_protocol.h b/aos/network/message_bridge_protocol.h
new file mode 100644
index 0000000..1136188
--- /dev/null
+++ b/aos/network/message_bridge_protocol.h
@@ -0,0 +1,31 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
+
+namespace aos {
+namespace message_bridge {
+
+// The protocol between the message_bridge_client and server is pretty simple.
+// The overarching design philosophy is that the server sends data to the
+// client, and the client (optionally) sends timestamps back.
+//
+// 1) A connection is established by the client sending the server a Connect
+// flatbuffer on stream 0.
+// 2) The server then replies with the data, as it is available, on streams 2 +
+// channel_id in the Connect message.
+// 3) The client (optionally) replies on stream 1 with MessageHeader flatbuffers
+// with the timestamps that the messages were received.
+//
+// Most of the complexity from there is handling multiple clients and servers
+// and persuading SCTP to do what we want.
+
+// Number of streams reserved for control messages.
+constexpr size_t kControlStreams() { return 2; }
+// The stream on which Connect messages are sent.
+constexpr size_t kConnectStream() { return 0; }
+// The stream on which timestamp replies are sent.
+constexpr size_t kTimestampStream() { return 1; }
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
diff --git a/aos/network/message_bridge_server.cc b/aos/network/message_bridge_server.cc
new file mode 100644
index 0000000..fa5e7c1
--- /dev/null
+++ b/aos/network/message_bridge_server.cc
@@ -0,0 +1,35 @@
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/network/message_bridge_server_lib.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+DEFINE_string(config, "multinode_pingpong_config.json", "Path to the config.");
+
+namespace aos {
+namespace message_bridge {
+
+int Main() {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(FLAGS_config);
+
+ aos::ShmEventLoop event_loop(&config.message());
+
+ MessageBridgeServer app(&event_loop);
+
+ // TODO(austin): Track which messages didn't make it in time and need to be
+ // logged locally and forwarded.
+
+ event_loop.Run();
+
+ return EXIT_SUCCESS;
+}
+
+} // namespace message_bridge
+} // namespace aos
+
+int main(int argc, char **argv) {
+ aos::InitGoogle(&argc, &argv);
+
+ return aos::message_bridge::Main();
+}
diff --git a/aos/network/message_bridge_server.fbs b/aos/network/message_bridge_server.fbs
new file mode 100644
index 0000000..75a6a15
--- /dev/null
+++ b/aos/network/message_bridge_server.fbs
@@ -0,0 +1,33 @@
+include "aos/configuration.fbs";
+
+namespace aos.message_bridge;
+
+// State of the connection.
+enum State: ubyte {
+ CONNECTED,
+ DISCONNECTED,
+}
+
+// Statistics from a single connection to a client from this server.
+table ServerConnection {
+ // The node that we are connected to.
+ node:Node;
+
+ // Health of this connection. Connected or not?
+ state:State;
+
+ // Number of packets that have been dropped (if known).
+ dropped_packets:uint;
+
+ // Number of packets received on all channels.
+ sent_packets:uint;
+
+ // TODO(austin): Per channel counts?
+}
+
+// Statistics for all connections to all the clients.
+table ServerStatistics {
+ connections:[ServerConnection];
+}
+
+root_type ServerStatistics;
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
new file mode 100644
index 0000000..38958ba
--- /dev/null
+++ b/aos/network/message_bridge_server_lib.cc
@@ -0,0 +1,367 @@
+#include "aos/network/message_bridge_server_lib.h"
+
+#include "absl/types/span.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_protocol.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/sctp_server.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+namespace {
+
+namespace chrono = std::chrono;
+
+// Builds up the "empty" server statistics message to be pointed to by all the
+// connections, updated at runtime, and periodically sent.
+FlatbufferDetachedBuffer<ServerStatistics> MakeServerStatistics(
+ const std::vector<std::string_view> &source_node_names,
+ const Configuration *configuration) {
+ flatbuffers::FlatBufferBuilder fbb;
+
+ std::vector<flatbuffers::Offset<ServerConnection>> connection_offsets;
+ for (const std::string_view node_name : source_node_names) {
+ flatbuffers::Offset<Node> node_offset =
+ CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+ ServerConnection::Builder connection_builder(fbb);
+ connection_builder.add_node(node_offset);
+ connection_builder.add_state(State::DISCONNECTED);
+ connection_builder.add_dropped_packets(0);
+ connection_builder.add_sent_packets(0);
+ connection_offsets.emplace_back(connection_builder.Finish());
+ }
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
+ connections_offset = fbb.CreateVector(connection_offsets);
+
+ ServerStatistics::Builder server_statistics_builder(fbb);
+ server_statistics_builder.add_connections(connections_offset);
+ fbb.Finish(server_statistics_builder.Finish());
+
+ return fbb.Release();
+}
+
+// Finds the statistics for the provided node name.
+ServerConnection *FindServerConnection(ServerStatistics *statistics,
+ std::string_view node_name) {
+ ServerConnection *matching_server_connection = nullptr;
+ for (size_t i = 0; i < statistics->mutable_connections()->size(); ++i) {
+ ServerConnection *server_connection =
+ statistics->mutable_connections()->GetMutableObject(i);
+ if (server_connection->node()->name()->string_view() == node_name) {
+ matching_server_connection = server_connection;
+ break;
+ }
+ }
+
+ CHECK(matching_server_connection != nullptr) << ": Unknown client";
+
+ return matching_server_connection;
+}
+
+} // namespace
+
+bool ChannelState::Matches(const Channel *other_channel) {
+ // Confirm the normal tuple, plus make sure that the other side isn't going to
+ // send more data over than we expect with a mismatching size.
+ return (
+ channel_->name()->string_view() == other_channel->name()->string_view() &&
+ channel_->type()->string_view() == other_channel->type()->string_view() &&
+ channel_->max_size() == other_channel->max_size());
+}
+
+void ChannelState::SendData(SctpServer *server, const Context &context) {
+ // TODO(austin): I don't like allocating this buffer when we are just freeing
+ // it at the end of the function.
+ flatbuffers::FlatBufferBuilder fbb(channel_->max_size() + 100);
+ VLOG(1) << "Found " << peers_.size() << " peers on channel "
+ << channel_->name()->string_view() << " size " << context.size;
+
+ // TODO(austin): Use an iovec to build it up in 3 parts to avoid the copy?
+ // Only useful when not logging.
+ fbb.FinishSizePrefixed(logger::PackMessage(&fbb, context, channel_index_,
+ logger::LogType::kLogMessage));
+
+ // TODO(austin): Track which connections need to be reliable and handle
+ // resending properly.
+ size_t sent_count = 0;
+ bool logged_remotely = false;
+ for (Peer &peer : peers_) {
+ logged_remotely = logged_remotely || peer.logged_remotely;
+
+ if (peer.sac_assoc_id != 0) {
+ server->Send(std::string_view(
+ reinterpret_cast<const char *>(fbb.GetBufferPointer()),
+ fbb.GetSize()),
+ peer.sac_assoc_id, peer.stream,
+ peer.connection->time_to_live() / 1000000);
+ peer.server_connection_statistics->mutate_sent_packets(
+ peer.server_connection_statistics->sent_packets() + 1);
+ if (peer.logged_remotely) {
+ ++sent_count;
+ }
+ } else {
+ peer.server_connection_statistics->mutate_dropped_packets(
+ peer.server_connection_statistics->dropped_packets() + 1);
+ }
+ }
+
+ if (logged_remotely) {
+ if (sent_count == 0) {
+ VLOG(1) << "No clients, rejecting";
+ HandleFailure(fbb.Release());
+ } else {
+ sent_messages_.emplace_back(fbb.Release());
+ }
+ } else {
+ VLOG(1) << "Not bothering to track this message since nobody cares.";
+ }
+
+ // TODO(austin): Limit the size of this queue. Flush messages to disk
+ // which are too old. We really care about messages which didn't make it
+ // to a logger... Which is a new concept or two.
+
+ // Need to handle logging and disk in another thread. Need other thread
+ // since we sometimes want to skip disk, and synchronization is easier.
+ // This thread then spins on the queue until empty, then polls at 1-10 hz.
+
+ // TODO(austin): ~10 MB chunks on disk and push them over the logging
+ // channel? Threadsafe disk backed queue object which can handle restarts
+ // and flushes. Whee.
+}
+
+void ChannelState::HandleDelivery(sctp_assoc_t /*rcv_assoc_id*/,
+ uint16_t /*ssn*/,
+ absl::Span<const uint8_t> data) {
+ const logger::MessageHeader *message_header =
+ flatbuffers::GetRoot<logger::MessageHeader>(data.data());
+ while (sent_messages_.size() > 0u) {
+ if (sent_messages_.begin()->message().monotonic_sent_time() ==
+ message_header->monotonic_sent_time()) {
+ sent_messages_.pop_front();
+ continue;
+ }
+
+ if (sent_messages_.begin()->message().monotonic_sent_time() <
+ message_header->monotonic_sent_time()) {
+ VLOG(1) << "Delivery looks wrong, rejecting";
+ HandleFailure(std::move(sent_messages_.front()));
+ sent_messages_.pop_front();
+ continue;
+ }
+
+ break;
+ }
+}
+
+void ChannelState::HandleFailure(
+ SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message) {
+ // TODO(austin): Put it in the log queue.
+ LOG(INFO) << "Failed to send " << FlatbufferToJson(message);
+
+ // Note: this may be really out of order when we avoid the queue... We
+ // have the ones we know didn't make it immediately, and the ones which
+ // time out eventually. Need to sort that out.
+}
+
+void ChannelState::AddPeer(const Connection *connection,
+ ServerConnection *server_connection_statistics,
+ bool logged_remotely) {
+ peers_.emplace_back(0, 0, connection, server_connection_statistics,
+ logged_remotely);
+}
+
+void ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
+ for (ChannelState::Peer &peer : peers_) {
+ if (peer.sac_assoc_id == assoc_id) {
+ // TODO(austin): This will not handle multiple clients from
+ // a single node. But that should be rare.
+ peer.server_connection_statistics->mutate_state(State::DISCONNECTED);
+ peer.sac_assoc_id = 0;
+ peer.stream = 0;
+ break;
+ }
+ }
+}
+
+void ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
+ int stream, SctpServer *server) {
+ for (ChannelState::Peer &peer : peers_) {
+ if (peer.connection->name()->string_view() == node->name()->string_view()) {
+ peer.sac_assoc_id = assoc_id;
+ peer.stream = stream;
+ peer.server_connection_statistics->mutate_state(State::CONNECTED);
+ server->SetStreamPriority(assoc_id, stream, peer.connection->priority());
+
+ break;
+ }
+ }
+}
+
+MessageBridgeServer::MessageBridgeServer(aos::ShmEventLoop *event_loop)
+ : event_loop_(event_loop),
+ sender_(event_loop_->MakeSender<ServerStatistics>("/aos")),
+ statistics_(MakeServerStatistics(
+ configuration::DestinationNodeNames(event_loop->configuration(),
+ event_loop->node()),
+ event_loop->configuration())),
+ server_("::", event_loop->node()->port()) {
+ CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
+
+ // TODO(austin): Time sync. sctp gives us filtered round trip time, not
+ // target time.
+
+ // TODO(austin): Logging synchronization.
+ //
+ // TODO(austin): How do we handle parameter channels? The oldest value
+ // needs to be sent regardless on connection (though probably only if it has
+ // changed).
+ event_loop_->epoll()->OnReadable(server_.fd(),
+ [this]() { MessageReceived(); });
+
+ LOG(INFO) << "Hostname: " << event_loop_->node()->hostname()->string_view();
+
+ int channel_index = 0;
+ for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ CHECK(channel->has_source_node());
+ if (channel->source_node()->string_view() ==
+ event_loop_->node()->name()->string_view() &&
+ channel->has_destination_nodes()) {
+ std::unique_ptr<ChannelState> state(
+ new ChannelState{channel, channel_index});
+
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const Node *other_node = configuration::GetNode(
+ event_loop_->configuration(), connection->name()->string_view());
+ state->AddPeer(
+ connection,
+ FindServerConnection(statistics_.mutable_message(),
+ connection->name()->string_view()),
+ configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
+ }
+
+ // Call SendData for every message.
+ ChannelState *state_ptr = state.get();
+ event_loop_->MakeRawWatcher(
+ channel,
+ [this, state_ptr](const Context &context, const void * /*message*/) {
+ state_ptr->SendData(&server_, context);
+ });
+ channels_.emplace_back(std::move(state));
+ } else {
+ channels_.emplace_back(nullptr);
+ }
+ ++channel_index;
+ }
+
+ statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+ event_loop_->OnRun([this]() {
+ statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
+ chrono::seconds(1));
+ });
+}
+
+void MessageBridgeServer::NodeConnected(sctp_assoc_t assoc_id) {
+ server_.SetPriorityScheduler(assoc_id);
+}
+
+void MessageBridgeServer::NodeDisconnected(sctp_assoc_t assoc_id) {
+ // Find any matching peers and remove them.
+ for (std::unique_ptr<ChannelState> &channel_state : channels_) {
+ if (channel_state.get() == nullptr) {
+ continue;
+ }
+
+ channel_state->NodeDisconnected(assoc_id);
+ }
+}
+
+void MessageBridgeServer::MessageReceived() {
+ aos::unique_c_ptr<Message> message = server_.Read();
+
+ if (message->message_type == Message::kNotification) {
+ const union sctp_notification *snp =
+ (const union sctp_notification *)message->data();
+
+ switch (snp->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE: {
+ const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+ switch (sac->sac_state) {
+ case SCTP_COMM_UP:
+ NodeConnected(sac->sac_assoc_id);
+ VLOG(1) << "Peer connected";
+ break;
+ case SCTP_COMM_LOST:
+ case SCTP_SHUTDOWN_COMP:
+ case SCTP_CANT_STR_ASSOC:
+ NodeDisconnected(sac->sac_assoc_id);
+ VLOG(1) << "Disconnect";
+ break;
+ case SCTP_RESTART:
+ LOG(FATAL) << "Never seen this before.";
+ break;
+ }
+ } break;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ PrintNotification(message.get());
+ }
+ } else if (message->message_type == Message::kMessage) {
+ HandleData(message.get());
+ }
+}
+
+void MessageBridgeServer::HandleData(const Message *message) {
+ VLOG(1) << "Received data of length " << message->size;
+
+ if (message->header.rcvinfo.rcv_sid == kConnectStream()) {
+ // Control channel!
+ const Connect *connect = flatbuffers::GetRoot<Connect>(message->data());
+ VLOG(1) << FlatbufferToJson(connect);
+
+ // Account for the control channel and delivery times channel.
+ size_t channel_index = kControlStreams();
+ for (const Channel *channel : *connect->channels_to_transfer()) {
+ bool matched = false;
+ for (std::unique_ptr<ChannelState> &channel_state : channels_) {
+ if (channel_state.get() == nullptr) {
+ continue;
+ }
+ if (channel_state->Matches(channel)) {
+ channel_state->NodeConnected(connect->node(),
+ message->header.rcvinfo.rcv_assoc_id,
+ channel_index, &server_);
+
+ matched = true;
+ break;
+ }
+ }
+ if (!matched) {
+ LOG(ERROR) << "Remote tried registering for unknown channel "
+ << FlatbufferToJson(channel);
+ } else {
+ ++channel_index;
+ }
+ }
+ } else if (message->header.rcvinfo.rcv_sid == kTimestampStream()) {
+ // Message delivery
+ const logger::MessageHeader *message_header =
+ flatbuffers::GetRoot<logger::MessageHeader>(message->data());
+
+ channels_[message_header->channel_index()]->HandleDelivery(
+ message->header.rcvinfo.rcv_assoc_id, message->header.rcvinfo.rcv_ssn,
+ absl::Span<const uint8_t>(message->data(), message->size));
+ }
+
+ if (VLOG_IS_ON(1)) {
+ message->LogRcvInfo();
+ }
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
new file mode 100644
index 0000000..f202fc9
--- /dev/null
+++ b/aos/network/message_bridge_server_lib.h
@@ -0,0 +1,130 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
+
+#include <deque>
+
+#include "absl/types/span.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/sctp_server.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// See message_bridge_protocol.h for more details about the protocol.
+
+// Class to encapsulate all the state per channel. This is the dispatcher for a
+// new message from the event loop.
+class ChannelState {
+ public:
+ ChannelState(const Channel *channel, int channel_index)
+ : channel_index_(channel_index), channel_(channel) {}
+
+ // Class to encapsulate all the state per client on a channel. A client may
+ // be subscribed to multiple channels.
+ struct Peer {
+ Peer(sctp_assoc_t new_sac_assoc_id, size_t new_stream,
+ const Connection *new_connection,
+ ServerConnection *new_server_connection_statistics,
+ bool new_logged_remotely)
+ : sac_assoc_id(new_sac_assoc_id),
+ stream(new_stream),
+ connection(new_connection),
+ server_connection_statistics(new_server_connection_statistics),
+ logged_remotely(new_logged_remotely) {}
+
+ // Valid if != 0.
+ sctp_assoc_t sac_assoc_id = 0;
+
+ size_t stream;
+ const aos::Connection *connection;
+ ServerConnection *server_connection_statistics;
+
+ // If true, this message will be logged on a receiving node. We need to
+ // keep it around to log it locally if that fails.
+ bool logged_remotely = false;
+ };
+
+ // Needs to be called when a node (might have) disconnected.
+ void NodeDisconnected(sctp_assoc_t assoc_id);
+ void NodeConnected(const Node *node, sctp_assoc_t assoc_id, int stream,
+ SctpServer *server);
+
+ // Adds a new peer.
+ void AddPeer(const Connection *connection,
+ ServerConnection *server_connection_statistics,
+ bool logged_remotely);
+
+ // Returns true if this channel has the same name and type as the other
+ // channel.
+ bool Matches(const Channel *other_channel);
+
+ // Sends the data in context using the provided server.
+ void SendData(SctpServer *server, const Context &context);
+
+ // Handles reception of delivery times.
+ void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
+ absl::Span<const uint8_t> data);
+
+ // Handles (by consuming) failure to deliver a message.
+ void HandleFailure(
+ SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message);
+
+ private:
+ const int channel_index_;
+ const Channel *const channel_;
+
+ std::vector<Peer> peers_;
+
+ std::deque<SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader>>
+ sent_messages_;
+};
+
+// This encapsulates the state required to talk to *all* the clients from this
+// node. It handles the session and dispatches data to the ChannelState.
+class MessageBridgeServer {
+ public:
+ MessageBridgeServer(aos::ShmEventLoop *event_loop);
+
+ ~MessageBridgeServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
+
+ private:
+ // Reads a message from the socket. Could be a notification.
+ void MessageReceived();
+
+ // Called when the server connection succeeds.
+ void NodeConnected(sctp_assoc_t assoc_id);
+ // Called when the server connection disconnects.
+ void NodeDisconnected(sctp_assoc_t assoc_id);
+
+ // Called when data (either a connection request or delivery timestamps) is
+ // received.
+ void HandleData(const Message *message);
+
+ // Sends out the statistics that are continually updated by the
+ // ChannelState's.
+ void SendStatistics() { sender_.Send(statistics_); }
+
+ // Event loop to schedule everything on.
+ aos::ShmEventLoop *event_loop_;
+
+ // Statistics, timer, and associated sender.
+ aos::Sender<ServerStatistics> sender_;
+ aos::TimerHandler *statistics_timer_;
+ FlatbufferDetachedBuffer<ServerStatistics> statistics_;
+
+ SctpServer server_;
+
+ // List of channels. The entries that aren't sent from this node are left
+ // null.
+ std::vector<std::unique_ptr<ChannelState>> channels_;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
new file mode 100644
index 0000000..383c1c4
--- /dev/null
+++ b/aos/network/message_bridge_test.cc
@@ -0,0 +1,154 @@
+#include "gtest/gtest.h"
+
+#include <chrono>
+#include <thread>
+
+#include "aos/events/ping_generated.h"
+#include "aos/events/pong_generated.h"
+#include "aos/network/message_bridge_client_lib.h"
+#include "aos/network/message_bridge_server_lib.h"
+
+DECLARE_string(override_hostname);
+DECLARE_string(application_name);
+
+namespace aos {
+namespace message_bridge {
+namespace testing {
+
+namespace chrono = std::chrono;
+
+// Test that we can send a ping message over sctp and receive it.
+TEST(MessageBridgeTest, PingPong) {
+ // This is rather annoying to set up. We need to start up a client and
+ // server, on the same node, but get them to think that they are on different
+ // nodes.
+ //
+ // We then get to wait until they are connected.
+ //
+ // After they are connected, we send a Ping message.
+ //
+ // On the other end, we receive a Pong message.
+ //
+ // But, we need the client to not post directly to "/test" like it would in a
+ // real system, otherwise we will re-send the ping message... So, use an
+ // application specific map to have the client post somewhere else.
+ //
+ // To top this all off, each of these needs to be done with a ShmEventLoop,
+ // which needs to run in a separate thread... And it is really hard to get
+ // everything started up reliably. So just be super generous on timeouts and
+ // hope for the best. We can be more generous in the future if we need to.
+ //
+ // We are faking the application names by passing in --application_name=foo
+ aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_server_config.json");
+ aos::FlatbufferDetachedBuffer<aos::Configuration> client_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_client_config.json");
+
+ FLAGS_application_name = "pi1_message_bridge_server";
+ // Force ourselves to be "raspberrypi" and allocate everything.
+ FLAGS_override_hostname = "raspberrypi";
+ aos::ShmEventLoop server_event_loop(&server_config.message());
+ MessageBridgeServer message_bridge_server(&server_event_loop);
+
+ // And build the app which sends the pings.
+ FLAGS_application_name = "ping";
+ aos::ShmEventLoop ping_event_loop(&server_config.message());
+ aos::Sender<examples::Ping> ping_sender =
+ ping_event_loop.MakeSender<examples::Ping>("/test");
+
+ // Now do it for "raspberrypi2", the client.
+ FLAGS_application_name = "pi2_message_bridge_client";
+ FLAGS_override_hostname = "raspberrypi2";
+ aos::ShmEventLoop client_event_loop(&client_config.message());
+ MessageBridgeClient message_bridge_client(&client_event_loop);
+
+ // And build the app which sends the pongs.
+ FLAGS_application_name = "pong";
+ aos::ShmEventLoop pong_event_loop(&client_config.message());
+
+ // Count the pongs.
+ int pong_count = 0;
+ pong_event_loop.MakeWatcher(
+ "/test2", [&pong_count, &ping_event_loop](const examples::Ping &ping) {
+ ++pong_count;
+ LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
+ if (pong_count >= 2) {
+ LOG(INFO) << "That's enough bailing early.";
+ // And Exit is async safe, so thread safe is easy.
+ ping_event_loop.Exit();
+ }
+ });
+
+ FLAGS_override_hostname = "";
+
+ // Start everything up. Pong is the only thing we don't know how to wait on,
+ // so start it first.
+ std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
+
+ std::thread server_thread(
+ [&server_event_loop]() { server_event_loop.Run(); });
+ std::thread client_thread(
+ [&client_event_loop]() { client_event_loop.Run(); });
+
+ // Wait until we are connected, then send.
+ int ping_count = 0;
+ ping_event_loop.MakeWatcher(
+ "/aos/pi1", [&ping_count, &client_event_loop,
+ &ping_sender](const ServerStatistics &stats) {
+ LOG(INFO) << FlatbufferToJson(&stats);
+
+ ASSERT_TRUE(stats.has_connections());
+ EXPECT_EQ(stats.connections()->size(), 1);
+
+ bool connected = false;
+ for (const ServerConnection *connection : *stats.connections()) {
+ if (connection->node()->name()->string_view() ==
+ client_event_loop.node()->name()->string_view()) {
+ if (connection->state() == State::CONNECTED) {
+ connected = true;
+ }
+ break;
+ }
+ }
+
+ if (connected) {
+ LOG(INFO) << "Connected! Sent ping.";
+ auto builder = ping_sender.MakeBuilder();
+ examples::Ping::Builder ping_builder =
+ builder.MakeBuilder<examples::Ping>();
+ ping_builder.add_value(ping_count + 971);
+ builder.Send(ping_builder.Finish());
+ ++ping_count;
+ }
+ });
+
+ // Time ourselves out after a while if Pong doesn't do it for us.
+ aos::TimerHandler *quit = ping_event_loop.AddTimer(
+ [&ping_event_loop]() { ping_event_loop.Exit(); });
+ ping_event_loop.OnRun([quit, &ping_event_loop]() {
+ quit->Setup(ping_event_loop.monotonic_now() + chrono::seconds(10));
+ });
+
+
+ // And go!
+ ping_event_loop.Run();
+
+ // Shut everyone else down
+ server_event_loop.Exit();
+ client_event_loop.Exit();
+ pong_event_loop.Exit();
+ server_thread.join();
+ client_thread.join();
+ pong_thread.join();
+
+ // Make sure we sent something.
+ EXPECT_GE(ping_count, 1);
+ // And got something back.
+ EXPECT_GE(pong_count, 1);
+}
+
+} // namespace testing
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/message_bridge_test_client.json b/aos/network/message_bridge_test_client.json
new file mode 100644
index 0000000..65d28d2
--- /dev/null
+++ b/aos/network/message_bridge_test_client.json
@@ -0,0 +1,17 @@
+{
+ "imports": [
+ "message_bridge_test_common.json"
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "localhost",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2",
+ "port": 9971
+ }
+ ]
+}
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
new file mode 100644
index 0000000..d0085c2
--- /dev/null
+++ b/aos/network/message_bridge_test_common.json
@@ -0,0 +1,113 @@
+{
+ "channels": [
+ {
+ "name": "/aos/pi1",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi1",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi1",
+ "type": "aos.timing.Report",
+ "source_node": "pi1",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.timing.Report",
+ "source_node": "pi2",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "pi1"
+ }
+ ]
+ },
+ {
+ "name": "/test2",
+ "type": "aos.examples.Ping",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong",
+ "source_node": "pi2",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "pi1"
+ }
+ ]
+ }
+ ],
+ "applications": [
+ {
+ "name": "pi2_message_bridge_client",
+ "maps": [
+ {
+ "match": {
+ "name": "/test",
+ "type": "aos.examples.Ping"
+ },
+ "rename": {
+ "name": "/test2"
+ }
+ }
+ ]
+ }
+ ],
+ "maps": [
+ {
+ "match": {
+ "name": "/aos",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/aos/pi1"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/aos/pi2"
+ }
+ }
+ ]
+}
diff --git a/aos/network/message_bridge_test_server.json b/aos/network/message_bridge_test_server.json
new file mode 100644
index 0000000..eea92e7
--- /dev/null
+++ b/aos/network/message_bridge_test_server.json
@@ -0,0 +1,17 @@
+{
+ "imports": [
+ "message_bridge_test_common.json"
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "localhost",
+ "port": 9971
+ }
+ ]
+}
diff --git a/aos/network/sctp_client.cc b/aos/network/sctp_client.cc
new file mode 100644
index 0000000..9e7f6c1
--- /dev/null
+++ b/aos/network/sctp_client.cc
@@ -0,0 +1,144 @@
+#include "aos/network/sctp_client.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netinet/sctp.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <string_view>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+SctpClient::SctpClient(std::string_view remote_host, int remote_port,
+ int streams, std::string_view local_host, int local_port)
+ : sockaddr_remote_(ResolveSocket(remote_host, remote_port)),
+ sockaddr_local_(ResolveSocket(local_host, local_port)),
+ fd_(socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP)) {
+ LOG(INFO) << "socket(" << Family(sockaddr_local_)
+ << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
+ PCHECK(fd_ != -1);
+
+ {
+ // Allow the kernel to deliver messages from different streams in any order.
+ int full_interleaving = 2;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
+ &full_interleaving, sizeof(full_interleaving)) == 0);
+ }
+
+ {
+ struct sctp_initmsg initmsg;
+ memset(&initmsg, 0, sizeof(struct sctp_initmsg));
+ initmsg.sinit_num_ostreams = streams;
+ initmsg.sinit_max_instreams = streams;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
+ sizeof(struct sctp_initmsg)) == 0);
+ }
+
+ {
+ int on = 1;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
+ 0);
+ }
+ {
+ // Servers send promptly. Clients don't.
+ // TODO(austin): Revisit this assumption when we have time sync.
+ int on = 0;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
+ }
+
+ {
+ // TODO(austin): This is the old style registration... But, the sctp
+ // stack out in the wild for linux is old and primitive.
+ struct sctp_event_subscribe subscribe;
+ memset(&subscribe, 0, sizeof(subscribe));
+ subscribe.sctp_data_io_event = 1;
+ subscribe.sctp_association_event = 1;
+ PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+ sizeof(subscribe)) == 0);
+ }
+
+ PCHECK(bind(fd_, (struct sockaddr *)&sockaddr_local_,
+ sockaddr_local_.ss_family == AF_INET6
+ ? sizeof(struct sockaddr_in6)
+ : sizeof(struct sockaddr_in)) == 0);
+ VLOG(1) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
+}
+
+aos::unique_c_ptr<Message> SctpClient::Read() {
+ return ReadSctpMessage(fd_, max_size_);
+}
+
+bool SctpClient::Send(int stream, std::string_view data, int time_to_live) {
+ struct iovec iov;
+ iov.iov_base = const_cast<char *>(data.data());
+ iov.iov_len = data.size();
+
+ struct msghdr outmsg;
+ // Target to send to.
+ outmsg.msg_name = &sockaddr_remote_;
+ outmsg.msg_namelen = sizeof(struct sockaddr_storage);
+ VLOG(1) << "Sending to " << Address(sockaddr_remote_);
+
+ // Data to send.
+ outmsg.msg_iov = &iov;
+ outmsg.msg_iovlen = 1;
+
+ // Build up the sndinfo message.
+ char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+ outmsg.msg_control = outcmsg;
+ outmsg.msg_controllen = sizeof(outcmsg);
+ outmsg.msg_flags = 0;
+
+ struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+ cmsg->cmsg_level = IPPROTO_SCTP;
+ cmsg->cmsg_type = SCTP_SNDRCV;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+ outmsg.msg_controllen = cmsg->cmsg_len;
+ struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+ memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+ sinfo->sinfo_ppid = rand();
+ sinfo->sinfo_stream = stream;
+ sinfo->sinfo_context = 19;
+ sinfo->sinfo_flags = 0;
+ sinfo->sinfo_timetolive = time_to_live;
+
+ // And send.
+ const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (size == -1) {
+ if (errno != EPIPE && errno != EAGAIN) {
+ PCHECK(size == static_cast<ssize_t>(data.size()));
+ } else {
+ return false;
+ }
+ } else {
+ CHECK_EQ(static_cast<ssize_t>(data.size()), size);
+ }
+
+ VLOG(1) << "Sent " << data.size();
+ return true;
+}
+
+void SctpClient::LogSctpStatus(sctp_assoc_t assoc_id) {
+ message_bridge::LogSctpStatus(fd(), assoc_id);
+}
+
+void SctpClient::SetPriorityScheduler(sctp_assoc_t assoc_id) {
+ struct sctp_assoc_value scheduler;
+ memset(&scheduler, 0, sizeof(scheduler));
+ scheduler.assoc_id = assoc_id;
+ scheduler.assoc_value = SCTP_SS_PRIO;
+ if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER, &scheduler,
+ sizeof(scheduler)) != 0) {
+ PLOG(WARNING) << "Failed to set scheduler";
+ }
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
new file mode 100644
index 0000000..926e59b
--- /dev/null
+++ b/aos/network/sctp_client.h
@@ -0,0 +1,57 @@
+#ifndef AOS_NETWORK_SCTP_CLIENT_H_
+#define AOS_NETWORK_SCTP_CLIENT_H_
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string_view>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Class to encapsulate everything needed to be a SCTP client.
+class SctpClient {
+ public:
+ SctpClient(std::string_view remote_host, int remote_port, int streams,
+ std::string_view local_host = "0.0.0.0", int local_port = 9971);
+
+ ~SctpClient() {
+ LOG(INFO) << "close(" << fd_ << ")";
+ PCHECK(close(fd_) == 0);
+ }
+
+ // Receives the next packet from the remote.
+ aos::unique_c_ptr<Message> Read();
+
+ // Sends a block of data on a stream with a TTL.
+ bool Send(int stream, std::string_view data, int time_to_live);
+
+ int fd() { return fd_; }
+
+ // Enables the priority scheduler. This is a SCTP feature which lets us
+ // configure the priority per stream so that higher priority packets don't get
+ // backed up behind lower priority packets in the networking queues.
+ void SetPriorityScheduler(sctp_assoc_t assoc_id);
+
+ // Remote to send to.
+ struct sockaddr_storage sockaddr_remote() const {
+ return sockaddr_remote_;
+ }
+
+ void LogSctpStatus(sctp_assoc_t assoc_id);
+
+ private:
+ struct sockaddr_storage sockaddr_remote_;
+ struct sockaddr_storage sockaddr_local_;
+ int fd_;
+
+ size_t max_size_ = 1000;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_SCTP_CLIENT_H_
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
new file mode 100644
index 0000000..1ed816b
--- /dev/null
+++ b/aos/network/sctp_lib.cc
@@ -0,0 +1,229 @@
+#include "aos/network/sctp_lib.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/sctp.h>
+
+#include <string_view>
+
+DEFINE_string(interface, "", "ipv6 interface");
+
+namespace aos {
+namespace message_bridge {
+
+namespace {
+const char *sac_state_tbl[] = {"COMMUNICATION_UP", "COMMUNICATION_LOST",
+ "RESTART", "SHUTDOWN_COMPLETE",
+ "CANT_START_ASSOCICATION"};
+
+typedef union {
+ struct sctp_initmsg init;
+ struct sctp_sndrcvinfo sndrcvinfo;
+} _sctp_cmsg_data_t;
+
+} // namespace
+
+struct sockaddr_storage ResolveSocket(std::string_view host, int port) {
+ struct sockaddr_storage result;
+ struct addrinfo *addrinfo_result;
+ struct sockaddr_in *t_addr = (struct sockaddr_in *)&result;
+ struct sockaddr_in6 *t_addr6 = (struct sockaddr_in6 *)&result;
+
+ PCHECK(getaddrinfo(std::string(host).c_str(), 0, NULL, &addrinfo_result) ==
+ 0);
+
+ switch (addrinfo_result->ai_family) {
+ case AF_INET:
+ memcpy(t_addr, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
+ t_addr->sin_family = addrinfo_result->ai_family;
+ t_addr->sin_port = htons(port);
+
+ break;
+ case AF_INET6:
+ memcpy(t_addr6, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
+ t_addr6->sin6_family = addrinfo_result->ai_family;
+ t_addr6->sin6_port = htons(port);
+
+ if (FLAGS_interface.size() > 0) {
+ t_addr6->sin6_scope_id = if_nametoindex(FLAGS_interface.c_str());
+ }
+
+ break;
+ }
+
+ // Now print it back out nicely.
+ char host_string[NI_MAXHOST];
+ char service_string[NI_MAXSERV];
+
+ int error = getnameinfo((struct sockaddr *)&result,
+ addrinfo_result->ai_addrlen, host_string, NI_MAXHOST,
+ service_string, NI_MAXSERV, NI_NUMERICHOST);
+
+ if (error) {
+ LOG(ERROR) << "Reverse lookup failed ... " << gai_strerror(error);
+ }
+
+ LOG(INFO) << "remote:addr=" << host_string << ", port=" << service_string
+ << ", family=" << addrinfo_result->ai_family;
+
+ freeaddrinfo(addrinfo_result);
+
+ return result;
+}
+
+std::string_view Family(const struct sockaddr_storage &sockaddr) {
+ if (sockaddr.ss_family == AF_INET) {
+ return "AF_INET";
+ } else if (sockaddr.ss_family == AF_INET6) {
+ return "AF_INET6";
+ } else {
+ return "unknown";
+ }
+}
+std::string Address(const struct sockaddr_storage &sockaddr) {
+ char addrbuf[INET6_ADDRSTRLEN];
+ if (sockaddr.ss_family == AF_INET) {
+ const struct sockaddr_in *sin = (const struct sockaddr_in *)&sockaddr;
+ return std::string(
+ inet_ntop(AF_INET, &sin->sin_addr, addrbuf, INET6_ADDRSTRLEN));
+ } else {
+ const struct sockaddr_in6 *sin6 = (const struct sockaddr_in6 *)&sockaddr;
+ return std::string(
+ inet_ntop(AF_INET6, &sin6->sin6_addr, addrbuf, INET6_ADDRSTRLEN));
+ }
+}
+
+void PrintNotification(const Message *msg) {
+ const union sctp_notification *snp =
+ (const union sctp_notification *)msg->data();
+
+ LOG(INFO) << "Notification:";
+
+ switch (snp->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE: {
+ const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+ LOG(INFO) << "SCTP_ASSOC_CHANGE(" << sac_state_tbl[sac->sac_state] << ")";
+ VLOG(1) << " (assoc_change: state=" << sac->sac_state
+ << ", error=" << sac->sac_error
+ << ", instr=" << sac->sac_inbound_streams
+ << " outstr=" << sac->sac_outbound_streams
+ << ", assoc=" << sac->sac_assoc_id << ")";
+ } break;
+ case SCTP_PEER_ADDR_CHANGE: {
+ const struct sctp_paddr_change *spc = &snp->sn_paddr_change;
+ LOG(INFO) << " SlCTP_PEER_ADDR_CHANGE";
+ VLOG(1) << "\t\t(peer_addr_change: " << Address(spc->spc_aaddr)
+ << " state=" << spc->spc_state << ", error=" << spc->spc_error
+ << ")";
+ } break;
+ case SCTP_SEND_FAILED: {
+ const struct sctp_send_failed *ssf = &snp->sn_send_failed;
+ LOG(INFO) << " SCTP_SEND_FAILED";
+ VLOG(1) << "\t\t(sendfailed: len=" << ssf->ssf_length
+ << " err=" << ssf->ssf_error << ")";
+ } break;
+ case SCTP_REMOTE_ERROR: {
+ const struct sctp_remote_error *sre = &snp->sn_remote_error;
+ LOG(INFO) << " SCTP_REMOTE_ERROR";
+ VLOG(1) << "\t\t(remote_error: err=" << ntohs(sre->sre_error) << ")";
+ } break;
+ case SCTP_SHUTDOWN_EVENT: {
+ LOG(INFO) << " SCTP_SHUTDOWN_EVENT";
+ } break;
+ default:
+ LOG(INFO) << " Unknown type: " << snp->sn_header.sn_type;
+ break;
+ }
+}
+
+std::string GetHostname() {
+ char buf[256];
+ buf[sizeof(buf) - 1] = '\0';
+ PCHECK(gethostname(buf, sizeof(buf) - 1) == 0);
+ return buf;
+}
+
+std::string Message::PeerAddress() const { return Address(sin); }
+
+void LogSctpStatus(int fd, sctp_assoc_t assoc_id) {
+ struct sctp_status status;
+ memset(&status, 0, sizeof(status));
+ status.sstat_assoc_id = assoc_id;
+
+ socklen_t size = sizeof(status);
+ PCHECK(getsockopt(fd, SOL_SCTP, SCTP_STATUS,
+ reinterpret_cast<void *>(&status), &size) == 0);
+
+ LOG(INFO) << "sctp_status) sstat_assoc_id:" << status.sstat_assoc_id
+ << " sstat_state:" << status.sstat_state
+ << " sstat_rwnd:" << status.sstat_rwnd
+ << " sstat_unackdata:" << status.sstat_unackdata
+ << " sstat_penddata:" << status.sstat_penddata
+ << " sstat_instrms:" << status.sstat_instrms
+ << " sstat_outstrms:" << status.sstat_outstrms
+ << " sstat_fragmentation_point:" << status.sstat_fragmentation_point
+ << " sstat_primary.spinfo_srtt:" << status.sstat_primary.spinfo_srtt
+ << " sstat_primary.spinfo_rto:" << status.sstat_primary.spinfo_rto;
+}
+
+aos::unique_c_ptr<Message> ReadSctpMessage(int fd, int max_size) {
+ char incmsg[CMSG_SPACE(sizeof(_sctp_cmsg_data_t))];
+ struct iovec iov;
+ struct msghdr inmessage;
+
+ memset(&inmessage, 0, sizeof(struct msghdr));
+
+ aos::unique_c_ptr<Message> result(
+ reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size)));
+
+ iov.iov_len = max_size;
+ iov.iov_base = result->mutable_data();
+
+ inmessage.msg_iov = &iov;
+ inmessage.msg_iovlen = 1;
+
+ inmessage.msg_control = incmsg;
+ inmessage.msg_controllen = sizeof(incmsg);
+
+ inmessage.msg_namelen = sizeof(struct sockaddr_storage);
+ inmessage.msg_name = &result->sin;
+
+ ssize_t size;
+ PCHECK((size = recvmsg(fd, &inmessage, 0)) > 0);
+
+ result->size = size;
+
+ if ((MSG_NOTIFICATION & inmessage.msg_flags)) {
+ result->message_type = Message::kNotification;
+ } else {
+ result->message_type = Message::kMessage;
+ }
+
+ for (struct cmsghdr *scmsg = CMSG_FIRSTHDR(&inmessage); scmsg != NULL;
+ scmsg = CMSG_NXTHDR(&inmessage, scmsg)) {
+ switch (scmsg->cmsg_type) {
+ case SCTP_RCVINFO: {
+ struct sctp_rcvinfo *data = (struct sctp_rcvinfo *)CMSG_DATA(scmsg);
+ result->header.rcvinfo = *data;
+ } break;
+ default:
+ LOG(INFO) << "\tUnknown type: " << scmsg->cmsg_type;
+ break;
+ }
+ }
+
+ return result;
+}
+
+void Message::LogRcvInfo() const {
+ LOG(INFO) << "\tSNDRCV (stream=" << header.rcvinfo.rcv_sid
+ << " ssn=" << header.rcvinfo.rcv_ssn
+ << " tsn=" << header.rcvinfo.rcv_tsn << " flags=0x" << std::hex
+ << header.rcvinfo.rcv_flags << std::dec
+ << " ppid=" << header.rcvinfo.rcv_ppid
+ << " cumtsn=" << header.rcvinfo.rcv_cumtsn << ")";
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
new file mode 100644
index 0000000..0f90f87
--- /dev/null
+++ b/aos/network/sctp_lib.h
@@ -0,0 +1,78 @@
+#ifndef AOS_NETWORK_SCTP_LIB_H_
+#define AOS_NETWORK_SCTP_LIB_H_
+
+#include <arpa/inet.h>
+#include <netinet/sctp.h>
+
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include "aos/unique_malloc_ptr.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Resolves a socket and returns the address. This can be either an ipv4 or
+// ipv6 address.
+struct sockaddr_storage ResolveSocket(std::string_view host, int port);
+
+// Returns a formatted version of the address.
+std::string Address(const struct sockaddr_storage &sockaddr);
+// Returns a formatted version of the address family.
+std::string_view Family(const struct sockaddr_storage &sockaddr);
+
+// Message received.
+// This message is malloced bigger than needed and the extra space after it is
+// the data.
+struct Message {
+ // Struct to let us force data to be well aligned.
+ struct OveralignedChar {
+ uint8_t data alignas(32);
+ };
+
+ // Headers.
+ struct {
+ struct sctp_rcvinfo rcvinfo;
+ } header;
+
+ // Address of the sender.
+ struct sockaddr_storage sin;
+
+ // Data type. Is it a block of data, or is it a struct sctp_notification?
+ enum MessageType { kMessage, kNotification } message_type;
+
+ size_t size = 0u;
+ uint8_t *mutable_data() {
+ return reinterpret_cast<uint8_t *>(&actual_data[0].data);
+ }
+ const uint8_t *data() const {
+ return reinterpret_cast<const uint8_t *>(&actual_data[0].data);
+ }
+
+ // Returns a human readable peer IP address.
+ std::string PeerAddress() const;
+
+ // Prints out the RcvInfo structure.
+ void LogRcvInfo() const;
+
+ // The start of the data.
+ OveralignedChar actual_data[];
+};
+
+void PrintNotification(const Message *msg);
+
+std::string GetHostname();
+
+// Gets and logs the contents of the sctp_status message.
+void LogSctpStatus(int fd, sctp_assoc_t assoc_id);
+
+// Read and allocate a message.
+aos::unique_c_ptr<Message> ReadSctpMessage(int fd, int max_size);
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_SCTP_LIB_H_
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
new file mode 100644
index 0000000..70d5b28
--- /dev/null
+++ b/aos/network/sctp_server.cc
@@ -0,0 +1,143 @@
+#include "aos/network/sctp_server.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/sctp.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <memory>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+SctpServer::SctpServer(std::string_view local_host, int local_port)
+ : sockaddr_local_(ResolveSocket(local_host, local_port)),
+ fd_(socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP)) {
+ LOG(INFO) << "socket(" << Family(sockaddr_local_)
+ << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
+ PCHECK(fd_ != -1);
+
+ {
+ struct sctp_event_subscribe subscribe;
+ memset(&subscribe, 0, sizeof(subscribe));
+ subscribe.sctp_data_io_event = 1;
+ subscribe.sctp_association_event = 1;
+ subscribe.sctp_send_failure_event = 1;
+ subscribe.sctp_partial_delivery_event = 1;
+
+ PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+ sizeof(subscribe)) == 0);
+ }
+ {
+ // Enable recvinfo when a packet arrives.
+ int on = 1;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
+ 0);
+ }
+ {
+ // Allow one packet on the wire to have multiple source packets.
+ int full_interleaving = 2;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
+ &full_interleaving, sizeof(full_interleaving)) == 0);
+ }
+ {
+ // Turn off the NAGLE algorithm.
+ int on = 1;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
+ }
+
+ // And go!
+ PCHECK(bind(fd_, (struct sockaddr *)&sockaddr_local_,
+ sockaddr_local_.ss_family == AF_INET6
+ ? sizeof(struct sockaddr_in6)
+ : sizeof(struct sockaddr_in)) == 0);
+ LOG(INFO) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
+
+ PCHECK(listen(fd_, 100) == 0);
+
+ PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size_,
+ sizeof(max_size_)) == 0);
+}
+
+aos::unique_c_ptr<Message> SctpServer::Read() {
+ return ReadSctpMessage(fd_, max_size_);
+}
+
+void SctpServer::Send(std::string_view data, sctp_assoc_t snd_assoc_id,
+ int stream, int timetolive) {
+ struct iovec iov;
+ iov.iov_base = const_cast<char *>(data.data());
+ iov.iov_len = data.size();
+
+ // Use the assoc_id for the destination instead of the msg_name.
+ struct msghdr outmsg;
+ outmsg.msg_namelen = 0;
+
+ // Data to send.
+ outmsg.msg_iov = &iov;
+ outmsg.msg_iovlen = 1;
+
+ // Build up the sndinfo message.
+ char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+ outmsg.msg_control = outcmsg;
+ outmsg.msg_controllen = CMSG_SPACE(sizeof(struct sctp_sndrcvinfo));
+ outmsg.msg_flags = 0;
+
+ struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+ cmsg->cmsg_level = IPPROTO_SCTP;
+ cmsg->cmsg_type = SCTP_SNDRCV;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+ struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+ memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+ sinfo->sinfo_ppid = ++ppid_;
+ sinfo->sinfo_stream = stream;
+ sinfo->sinfo_flags = 0;
+ sinfo->sinfo_assoc_id = snd_assoc_id;
+ sinfo->sinfo_timetolive = timetolive;
+
+ // And send.
+ const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (size == -1) {
+ if (errno != EPIPE) {
+ PCHECK(size == static_cast<ssize_t>(data.size()));
+ }
+ } else {
+ CHECK_EQ(static_cast<ssize_t>(data.size()), size);
+ }
+}
+
+void SctpServer::SetPriorityScheduler(sctp_assoc_t assoc_id) {
+ struct sctp_assoc_value scheduler;
+ memset(&scheduler, 0, sizeof(scheduler));
+ scheduler.assoc_id = assoc_id;
+ scheduler.assoc_value = SCTP_SS_PRIO;
+ if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER, &scheduler,
+ sizeof(scheduler)) != 0) {
+ PLOG(WARNING) << "Failed to set scheduler";
+ }
+}
+
+void SctpServer::SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
+ uint16_t priority) {
+ struct sctp_stream_value sctp_priority;
+ memset(&sctp_priority, 0, sizeof(sctp_priority));
+ sctp_priority.assoc_id = assoc_id;
+ sctp_priority.stream_id = stream_id;
+ sctp_priority.stream_value = priority;
+ if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER_VALUE,
+ &sctp_priority, sizeof(sctp_priority)) != 0) {
+ PLOG(WARNING) << "Failed to set scheduler";
+ }
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
new file mode 100644
index 0000000..a3086d9
--- /dev/null
+++ b/aos/network/sctp_server.h
@@ -0,0 +1,63 @@
+#ifndef AOS_NETWORK_SCTP_SERVER_H_
+#define AOS_NETWORK_SCTP_SERVER_H_
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/sctp.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <memory>
+#include <sys/socket.h>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+class SctpServer {
+ public:
+ SctpServer(std::string_view local_host = "0.0.0.0", int local_port = 9971);
+
+ ~SctpServer() {
+ LOG(INFO) << "close(" << fd_ << ")";
+ PCHECK(close(fd_) == 0);
+ }
+
+ // Receives the next packet from the remote.
+ aos::unique_c_ptr<Message> Read();
+
+ // Sends a block of data to a client on a stream with a TTL.
+ void Send(std::string_view data, sctp_assoc_t snd_assoc_id, int stream,
+ int timetolive);
+
+ int fd() { return fd_; }
+
+ // Enables the priority scheduler. This is a SCTP feature which lets us
+ // configure the priority per stream so that higher priority packets don't get
+ // backed up behind lower priority packets in the networking queues.
+ void SetPriorityScheduler(sctp_assoc_t assoc_id);
+
+ // Sets the priority of a specific stream.
+ void SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
+ uint16_t priority);
+
+ private:
+ struct sockaddr_storage sockaddr_local_;
+ int fd_;
+
+ // TODO(austin): Configure this.
+ size_t max_size_ = 1000;
+
+ int ppid_ = 1;
+};
+
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_SCTP_SERVER_H_
diff --git a/aos/testdata/config1_multinode.json b/aos/testdata/config1_multinode.json
index 32bce5c..f12d927 100644
--- a/aos/testdata/config1_multinode.json
+++ b/aos/testdata/config1_multinode.json
@@ -4,12 +4,22 @@
"name": "/foo",
"type": ".aos.bar",
"max_size": 5,
- "source_node": "pi2"
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1"
+ }
+ ]
},
{
"name": "/foo2",
"type": ".aos.bar",
- "source_node": "pi1"
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2"
+ }
+ ]
}
],
"applications": [
diff --git a/aos/testdata/expected_multinode.json b/aos/testdata/expected_multinode.json
index 46d052e..0946007 100644
--- a/aos/testdata/expected_multinode.json
+++ b/aos/testdata/expected_multinode.json
@@ -4,12 +4,22 @@
"name": "/foo",
"type": ".aos.bar",
"max_size": 5,
- "source_node": "pi2"
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1"
+ }
+ ]
},
{
"name": "/foo2",
"type": ".aos.bar",
- "source_node": "pi1"
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2"
+ }
+ ]
},
{
"name": "/foo3",
diff --git a/compilers/linaro_linux_gcc.BUILD b/compilers/linaro_linux_gcc.BUILD
index db2e9bb..aea261d 100644
--- a/compilers/linaro_linux_gcc.BUILD
+++ b/compilers/linaro_linux_gcc.BUILD
@@ -63,7 +63,10 @@
"libexec/**",
"lib/gcc/arm-linux-gnueabihf/**",
"include/**",
- ]),
+ ], exclude=["arm-linux-gnueabihf/libc/usr/include/linux/sctp.h"]) +
+ [
+ "@org_frc971//third_party/linux:sctp",
+ ],
)
filegroup(
diff --git a/third_party/linux/BUILD b/third_party/linux/BUILD
new file mode 100644
index 0000000..007b913
--- /dev/null
+++ b/third_party/linux/BUILD
@@ -0,0 +1,10 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+filegroup(
+ name = "sctp",
+ srcs = [
+ "sctp.h",
+ ],
+)
diff --git a/third_party/linux/sctp.h b/third_party/linux/sctp.h
new file mode 100644
index 0000000..bd82046
--- /dev/null
+++ b/third_party/linux/sctp.h
@@ -0,0 +1,1105 @@
+/* SPDX-License-Identifier: GPL-2.0+ WITH Linux-syscall-note */
+/* SCTP kernel implementation
+ * (C) Copyright IBM Corp. 2001, 2004
+ * Copyright (c) 1999-2000 Cisco, Inc.
+ * Copyright (c) 1999-2001 Motorola, Inc.
+ * Copyright (c) 2002 Intel Corp.
+ *
+ * This file is part of the SCTP kernel implementation
+ *
+ * This header represents the structures and constants needed to support
+ * the SCTP Extension to the Sockets API.
+ *
+ * This SCTP implementation is free software;
+ * you can redistribute it and/or modify it under the terms of
+ * the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * This SCTP implementation is distributed in the hope that it
+ * will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * ************************
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU CC; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Please send any bug reports or fixes you make to the
+ * email address(es):
+ * lksctp developers <linux-sctp@vger.kernel.org>
+ *
+ * Or submit a bug report through the following website:
+ * http://www.sf.net/projects/lksctp
+ *
+ * Written or modified by:
+ * La Monte H.P. Yarroll <piggy@acm.org>
+ * R. Stewart <randall@sctp.chicago.il.us>
+ * K. Morneau <kmorneau@cisco.com>
+ * Q. Xie <qxie1@email.mot.com>
+ * Karl Knutson <karl@athena.chicago.il.us>
+ * Jon Grimm <jgrimm@us.ibm.com>
+ * Daisy Chang <daisyc@us.ibm.com>
+ * Ryan Layer <rmlayer@us.ibm.com>
+ * Ardelle Fan <ardelle.fan@intel.com>
+ * Sridhar Samudrala <sri@us.ibm.com>
+ * Inaky Perez-Gonzalez <inaky.gonzalez@intel.com>
+ * Vlad Yasevich <vladislav.yasevich@hp.com>
+ *
+ * Any bugs reported given to us we will try to fix... any fixes shared will
+ * be incorporated into the next SCTP release.
+ */
+
+#ifndef _SCTP_H
+#define _SCTP_H
+
+#include <linux/types.h>
+#include <linux/socket.h>
+
+typedef __s32 sctp_assoc_t;
+
+/* The following symbols come from the Sockets API Extensions for
+ * SCTP <draft-ietf-tsvwg-sctpsocket-07.txt>.
+ */
+#define SCTP_RTOINFO 0
+#define SCTP_ASSOCINFO 1
+#define SCTP_INITMSG 2
+#define SCTP_NODELAY 3 /* Get/set nodelay option. */
+#define SCTP_AUTOCLOSE 4
+#define SCTP_SET_PEER_PRIMARY_ADDR 5
+#define SCTP_PRIMARY_ADDR 6
+#define SCTP_ADAPTATION_LAYER 7
+#define SCTP_DISABLE_FRAGMENTS 8
+#define SCTP_PEER_ADDR_PARAMS 9
+#define SCTP_DEFAULT_SEND_PARAM 10
+#define SCTP_EVENTS 11
+#define SCTP_I_WANT_MAPPED_V4_ADDR 12 /* Turn on/off mapped v4 addresses */
+#define SCTP_MAXSEG 13 /* Get/set maximum fragment. */
+#define SCTP_STATUS 14
+#define SCTP_GET_PEER_ADDR_INFO 15
+#define SCTP_DELAYED_ACK_TIME 16
+#define SCTP_DELAYED_ACK SCTP_DELAYED_ACK_TIME
+#define SCTP_DELAYED_SACK SCTP_DELAYED_ACK_TIME
+#define SCTP_CONTEXT 17
+#define SCTP_FRAGMENT_INTERLEAVE 18
+#define SCTP_PARTIAL_DELIVERY_POINT 19 /* Set/Get partial delivery point */
+#define SCTP_MAX_BURST 20 /* Set/Get max burst */
+#define SCTP_AUTH_CHUNK 21 /* Set only: add a chunk type to authenticate */
+#define SCTP_HMAC_IDENT 22
+#define SCTP_AUTH_KEY 23
+#define SCTP_AUTH_ACTIVE_KEY 24
+#define SCTP_AUTH_DELETE_KEY 25
+#define SCTP_PEER_AUTH_CHUNKS 26 /* Read only */
+#define SCTP_LOCAL_AUTH_CHUNKS 27 /* Read only */
+#define SCTP_GET_ASSOC_NUMBER 28 /* Read only */
+#define SCTP_GET_ASSOC_ID_LIST 29 /* Read only */
+#define SCTP_AUTO_ASCONF 30
+#define SCTP_PEER_ADDR_THLDS 31
+#define SCTP_RECVRCVINFO 32
+#define SCTP_RECVNXTINFO 33
+#define SCTP_DEFAULT_SNDINFO 34
+
+/* Internal Socket Options. Some of the sctp library functions are
+ * implemented using these socket options.
+ */
+#define SCTP_SOCKOPT_BINDX_ADD 100 /* BINDX requests for adding addrs */
+#define SCTP_SOCKOPT_BINDX_REM 101 /* BINDX requests for removing addrs. */
+#define SCTP_SOCKOPT_PEELOFF 102 /* peel off association. */
+/* Options 104-106 are deprecated and removed. Do not use this space */
+#define SCTP_SOCKOPT_CONNECTX_OLD 107 /* CONNECTX old requests. */
+#define SCTP_GET_PEER_ADDRS 108 /* Get all peer address. */
+#define SCTP_GET_LOCAL_ADDRS 109 /* Get all local address. */
+#define SCTP_SOCKOPT_CONNECTX 110 /* CONNECTX requests. */
+#define SCTP_SOCKOPT_CONNECTX3 111 /* CONNECTX requests (updated) */
+#define SCTP_GET_ASSOC_STATS 112 /* Read only */
+#define SCTP_PR_SUPPORTED 113
+#define SCTP_DEFAULT_PRINFO 114
+#define SCTP_PR_ASSOC_STATUS 115
+#define SCTP_PR_STREAM_STATUS 116
+#define SCTP_RECONFIG_SUPPORTED 117
+#define SCTP_ENABLE_STREAM_RESET 118
+#define SCTP_RESET_STREAMS 119
+#define SCTP_RESET_ASSOC 120
+#define SCTP_ADD_STREAMS 121
+#define SCTP_SOCKOPT_PEELOFF_FLAGS 122
+#define SCTP_STREAM_SCHEDULER 123
+#define SCTP_STREAM_SCHEDULER_VALUE 124
+
+/* PR-SCTP policies */
+#define SCTP_PR_SCTP_NONE 0x0000
+#define SCTP_PR_SCTP_TTL 0x0010
+#define SCTP_PR_SCTP_RTX 0x0020
+#define SCTP_PR_SCTP_PRIO 0x0030
+#define SCTP_PR_SCTP_MAX SCTP_PR_SCTP_PRIO
+#define SCTP_PR_SCTP_MASK 0x0030
+
+#define __SCTP_PR_INDEX(x) ((x >> 4) - 1)
+#define SCTP_PR_INDEX(x) __SCTP_PR_INDEX(SCTP_PR_SCTP_ ## x)
+
+#define SCTP_PR_POLICY(x) ((x) & SCTP_PR_SCTP_MASK)
+#define SCTP_PR_SET_POLICY(flags, x) \
+ do { \
+ flags &= ~SCTP_PR_SCTP_MASK; \
+ flags |= x; \
+ } while (0)
+
+#define SCTP_PR_TTL_ENABLED(x) (SCTP_PR_POLICY(x) == SCTP_PR_SCTP_TTL)
+#define SCTP_PR_RTX_ENABLED(x) (SCTP_PR_POLICY(x) == SCTP_PR_SCTP_RTX)
+#define SCTP_PR_PRIO_ENABLED(x) (SCTP_PR_POLICY(x) == SCTP_PR_SCTP_PRIO)
+
+/* For enable stream reset */
+#define SCTP_ENABLE_RESET_STREAM_REQ 0x01
+#define SCTP_ENABLE_RESET_ASSOC_REQ 0x02
+#define SCTP_ENABLE_CHANGE_ASSOC_REQ 0x04
+#define SCTP_ENABLE_STRRESET_MASK 0x07
+
+#define SCTP_STREAM_RESET_INCOMING 0x01
+#define SCTP_STREAM_RESET_OUTGOING 0x02
+
+/* These are bit fields for msghdr->msg_flags. See section 5.1. */
+/* On user space Linux, these live in <bits/socket.h> as an enum. */
+enum sctp_msg_flags {
+ MSG_NOTIFICATION = 0x8000,
+#define MSG_NOTIFICATION MSG_NOTIFICATION
+};
+
+/* 5.3.1 SCTP Initiation Structure (SCTP_INIT)
+ *
+ * This cmsghdr structure provides information for initializing new
+ * SCTP associations with sendmsg(). The SCTP_INITMSG socket option
+ * uses this same data structure. This structure is not used for
+ * recvmsg().
+ *
+ * cmsg_level cmsg_type cmsg_data[]
+ * ------------ ------------ ----------------------
+ * IPPROTO_SCTP SCTP_INIT struct sctp_initmsg
+ */
+struct sctp_initmsg {
+ __u16 sinit_num_ostreams;
+ __u16 sinit_max_instreams;
+ __u16 sinit_max_attempts;
+ __u16 sinit_max_init_timeo;
+};
+
+/* 5.3.2 SCTP Header Information Structure (SCTP_SNDRCV)
+ *
+ * This cmsghdr structure specifies SCTP options for sendmsg() and
+ * describes SCTP header information about a received message through
+ * recvmsg().
+ *
+ * cmsg_level cmsg_type cmsg_data[]
+ * ------------ ------------ ----------------------
+ * IPPROTO_SCTP SCTP_SNDRCV struct sctp_sndrcvinfo
+ */
+struct sctp_sndrcvinfo {
+ __u16 sinfo_stream;
+ __u16 sinfo_ssn;
+ __u16 sinfo_flags;
+ __u32 sinfo_ppid;
+ __u32 sinfo_context;
+ __u32 sinfo_timetolive;
+ __u32 sinfo_tsn;
+ __u32 sinfo_cumtsn;
+ sctp_assoc_t sinfo_assoc_id;
+};
+
+/* 5.3.4 SCTP Send Information Structure (SCTP_SNDINFO)
+ *
+ * This cmsghdr structure specifies SCTP options for sendmsg().
+ *
+ * cmsg_level cmsg_type cmsg_data[]
+ * ------------ ------------ -------------------
+ * IPPROTO_SCTP SCTP_SNDINFO struct sctp_sndinfo
+ */
+struct sctp_sndinfo {
+ __u16 snd_sid;
+ __u16 snd_flags;
+ __u32 snd_ppid;
+ __u32 snd_context;
+ sctp_assoc_t snd_assoc_id;
+};
+
+/* 5.3.5 SCTP Receive Information Structure (SCTP_RCVINFO)
+ *
+ * This cmsghdr structure describes SCTP receive information
+ * about a received message through recvmsg().
+ *
+ * cmsg_level cmsg_type cmsg_data[]
+ * ------------ ------------ -------------------
+ * IPPROTO_SCTP SCTP_RCVINFO struct sctp_rcvinfo
+ */
+struct sctp_rcvinfo {
+ __u16 rcv_sid;
+ __u16 rcv_ssn;
+ __u16 rcv_flags;
+ __u32 rcv_ppid;
+ __u32 rcv_tsn;
+ __u32 rcv_cumtsn;
+ __u32 rcv_context;
+ sctp_assoc_t rcv_assoc_id;
+};
+
+/* 5.3.6 SCTP Next Receive Information Structure (SCTP_NXTINFO)
+ *
+ * This cmsghdr structure describes SCTP receive information
+ * of the next message that will be delivered through recvmsg()
+ * if this information is already available when delivering
+ * the current message.
+ *
+ * cmsg_level cmsg_type cmsg_data[]
+ * ------------ ------------ -------------------
+ * IPPROTO_SCTP SCTP_NXTINFO struct sctp_nxtinfo
+ */
+struct sctp_nxtinfo {
+ __u16 nxt_sid;
+ __u16 nxt_flags;
+ __u32 nxt_ppid;
+ __u32 nxt_length;
+ sctp_assoc_t nxt_assoc_id;
+};
+
+/*
+ * sinfo_flags: 16 bits (unsigned integer)
+ *
+ * This field may contain any of the following flags and is composed of
+ * a bitwise OR of these values.
+ */
+enum sctp_sinfo_flags {
+ SCTP_UNORDERED = (1 << 0), /* Send/receive message unordered. */
+ SCTP_ADDR_OVER = (1 << 1), /* Override the primary destination. */
+ SCTP_ABORT = (1 << 2), /* Send an ABORT message to the peer. */
+ SCTP_SACK_IMMEDIATELY = (1 << 3), /* SACK should be sent without delay. */
+ SCTP_NOTIFICATION = MSG_NOTIFICATION, /* Next message is not user msg but notification. */
+ SCTP_EOF = MSG_FIN, /* Initiate graceful shutdown process. */
+};
+
+typedef union {
+ __u8 raw;
+ struct sctp_initmsg init;
+ struct sctp_sndrcvinfo sndrcv;
+} sctp_cmsg_data_t;
+
+/* These are cmsg_types. */
+typedef enum sctp_cmsg_type {
+ SCTP_INIT, /* 5.2.1 SCTP Initiation Structure */
+#define SCTP_INIT SCTP_INIT
+ SCTP_SNDRCV, /* 5.2.2 SCTP Header Information Structure */
+#define SCTP_SNDRCV SCTP_SNDRCV
+ SCTP_SNDINFO, /* 5.3.4 SCTP Send Information Structure */
+#define SCTP_SNDINFO SCTP_SNDINFO
+ SCTP_RCVINFO, /* 5.3.5 SCTP Receive Information Structure */
+#define SCTP_RCVINFO SCTP_RCVINFO
+ SCTP_NXTINFO, /* 5.3.6 SCTP Next Receive Information Structure */
+#define SCTP_NXTINFO SCTP_NXTINFO
+} sctp_cmsg_t;
+
+/*
+ * 5.3.1.1 SCTP_ASSOC_CHANGE
+ *
+ * Communication notifications inform the ULP that an SCTP association
+ * has either begun or ended. The identifier for a new association is
+ * provided by this notificaion. The notification information has the
+ * following format:
+ *
+ */
+struct sctp_assoc_change {
+ __u16 sac_type;
+ __u16 sac_flags;
+ __u32 sac_length;
+ __u16 sac_state;
+ __u16 sac_error;
+ __u16 sac_outbound_streams;
+ __u16 sac_inbound_streams;
+ sctp_assoc_t sac_assoc_id;
+ __u8 sac_info[0];
+};
+
+/*
+ * sac_state: 32 bits (signed integer)
+ *
+ * This field holds one of a number of values that communicate the
+ * event that happened to the association. They include:
+ *
+ * Note: The following state names deviate from the API draft as
+ * the names clash too easily with other kernel symbols.
+ */
+enum sctp_sac_state {
+ SCTP_COMM_UP,
+ SCTP_COMM_LOST,
+ SCTP_RESTART,
+ SCTP_SHUTDOWN_COMP,
+ SCTP_CANT_STR_ASSOC,
+};
+
+/*
+ * 5.3.1.2 SCTP_PEER_ADDR_CHANGE
+ *
+ * When a destination address on a multi-homed peer encounters a change
+ * an interface details event is sent. The information has the
+ * following structure:
+ */
+struct sctp_paddr_change {
+ __u16 spc_type;
+ __u16 spc_flags;
+ __u32 spc_length;
+ struct sockaddr_storage spc_aaddr;
+ int spc_state;
+ int spc_error;
+ sctp_assoc_t spc_assoc_id;
+} __attribute__((packed, aligned(4)));
+
+/*
+ * spc_state: 32 bits (signed integer)
+ *
+ * This field holds one of a number of values that communicate the
+ * event that happened to the address. They include:
+ */
+enum sctp_spc_state {
+ SCTP_ADDR_AVAILABLE,
+ SCTP_ADDR_UNREACHABLE,
+ SCTP_ADDR_REMOVED,
+ SCTP_ADDR_ADDED,
+ SCTP_ADDR_MADE_PRIM,
+ SCTP_ADDR_CONFIRMED,
+};
+
+
+/*
+ * 5.3.1.3 SCTP_REMOTE_ERROR
+ *
+ * A remote peer may send an Operational Error message to its peer.
+ * This message indicates a variety of error conditions on an
+ * association. The entire error TLV as it appears on the wire is
+ * included in a SCTP_REMOTE_ERROR event. Please refer to the SCTP
+ * specification [SCTP] and any extensions for a list of possible
+ * error formats. SCTP error TLVs have the format:
+ */
+struct sctp_remote_error {
+ __u16 sre_type;
+ __u16 sre_flags;
+ __u32 sre_length;
+ __be16 sre_error;
+ sctp_assoc_t sre_assoc_id;
+ __u8 sre_data[0];
+};
+
+
+/*
+ * 5.3.1.4 SCTP_SEND_FAILED
+ *
+ * If SCTP cannot deliver a message it may return the message as a
+ * notification.
+ */
+struct sctp_send_failed {
+ __u16 ssf_type;
+ __u16 ssf_flags;
+ __u32 ssf_length;
+ __u32 ssf_error;
+ struct sctp_sndrcvinfo ssf_info;
+ sctp_assoc_t ssf_assoc_id;
+ __u8 ssf_data[0];
+};
+
+/*
+ * ssf_flags: 16 bits (unsigned integer)
+ *
+ * The flag value will take one of the following values
+ *
+ * SCTP_DATA_UNSENT - Indicates that the data was never put on
+ * the wire.
+ *
+ * SCTP_DATA_SENT - Indicates that the data was put on the wire.
+ * Note that this does not necessarily mean that the
+ * data was (or was not) successfully delivered.
+ */
+enum sctp_ssf_flags {
+ SCTP_DATA_UNSENT,
+ SCTP_DATA_SENT,
+};
+
+/*
+ * 5.3.1.5 SCTP_SHUTDOWN_EVENT
+ *
+ * When a peer sends a SHUTDOWN, SCTP delivers this notification to
+ * inform the application that it should cease sending data.
+ */
+struct sctp_shutdown_event {
+ __u16 sse_type;
+ __u16 sse_flags;
+ __u32 sse_length;
+ sctp_assoc_t sse_assoc_id;
+};
+
+/*
+ * 5.3.1.6 SCTP_ADAPTATION_INDICATION
+ *
+ * When a peer sends a Adaptation Layer Indication parameter , SCTP
+ * delivers this notification to inform the application
+ * that of the peers requested adaptation layer.
+ */
+struct sctp_adaptation_event {
+ __u16 sai_type;
+ __u16 sai_flags;
+ __u32 sai_length;
+ __u32 sai_adaptation_ind;
+ sctp_assoc_t sai_assoc_id;
+};
+
+/*
+ * 5.3.1.7 SCTP_PARTIAL_DELIVERY_EVENT
+ *
+ * When a receiver is engaged in a partial delivery of a
+ * message this notification will be used to indicate
+ * various events.
+ */
+struct sctp_pdapi_event {
+ __u16 pdapi_type;
+ __u16 pdapi_flags;
+ __u32 pdapi_length;
+ __u32 pdapi_indication;
+ sctp_assoc_t pdapi_assoc_id;
+};
+
+enum { SCTP_PARTIAL_DELIVERY_ABORTED=0, };
+
+/*
+ * 5.3.1.8. SCTP_AUTHENTICATION_EVENT
+ *
+ * When a receiver is using authentication this message will provide
+ * notifications regarding new keys being made active as well as errors.
+ */
+struct sctp_authkey_event {
+ __u16 auth_type;
+ __u16 auth_flags;
+ __u32 auth_length;
+ __u16 auth_keynumber;
+ __u16 auth_altkeynumber;
+ __u32 auth_indication;
+ sctp_assoc_t auth_assoc_id;
+};
+
+enum { SCTP_AUTH_NEWKEY = 0, };
+
+/*
+ * 6.1.9. SCTP_SENDER_DRY_EVENT
+ *
+ * When the SCTP stack has no more user data to send or retransmit, this
+ * notification is given to the user. Also, at the time when a user app
+ * subscribes to this event, if there is no data to be sent or
+ * retransmit, the stack will immediately send up this notification.
+ */
+struct sctp_sender_dry_event {
+ __u16 sender_dry_type;
+ __u16 sender_dry_flags;
+ __u32 sender_dry_length;
+ sctp_assoc_t sender_dry_assoc_id;
+};
+
+#define SCTP_STREAM_RESET_INCOMING_SSN 0x0001
+#define SCTP_STREAM_RESET_OUTGOING_SSN 0x0002
+#define SCTP_STREAM_RESET_DENIED 0x0004
+#define SCTP_STREAM_RESET_FAILED 0x0008
+struct sctp_stream_reset_event {
+ __u16 strreset_type;
+ __u16 strreset_flags;
+ __u32 strreset_length;
+ sctp_assoc_t strreset_assoc_id;
+ __u16 strreset_stream_list[];
+};
+
+#define SCTP_ASSOC_RESET_DENIED 0x0004
+#define SCTP_ASSOC_RESET_FAILED 0x0008
+struct sctp_assoc_reset_event {
+ __u16 assocreset_type;
+ __u16 assocreset_flags;
+ __u32 assocreset_length;
+ sctp_assoc_t assocreset_assoc_id;
+ __u32 assocreset_local_tsn;
+ __u32 assocreset_remote_tsn;
+};
+
+#define SCTP_ASSOC_CHANGE_DENIED 0x0004
+#define SCTP_ASSOC_CHANGE_FAILED 0x0008
+#define SCTP_STREAM_CHANGE_DENIED SCTP_ASSOC_CHANGE_DENIED
+#define SCTP_STREAM_CHANGE_FAILED SCTP_ASSOC_CHANGE_FAILED
+struct sctp_stream_change_event {
+ __u16 strchange_type;
+ __u16 strchange_flags;
+ __u32 strchange_length;
+ sctp_assoc_t strchange_assoc_id;
+ __u16 strchange_instrms;
+ __u16 strchange_outstrms;
+};
+
+/*
+ * Described in Section 7.3
+ * Ancillary Data and Notification Interest Options
+ */
+struct sctp_event_subscribe {
+ __u8 sctp_data_io_event;
+ __u8 sctp_association_event;
+ __u8 sctp_address_event;
+ __u8 sctp_send_failure_event;
+ __u8 sctp_peer_error_event;
+ __u8 sctp_shutdown_event;
+ __u8 sctp_partial_delivery_event;
+ __u8 sctp_adaptation_layer_event;
+ __u8 sctp_authentication_event;
+ __u8 sctp_sender_dry_event;
+ __u8 sctp_stream_reset_event;
+ __u8 sctp_assoc_reset_event;
+ __u8 sctp_stream_change_event;
+};
+
+/*
+ * 5.3.1 SCTP Notification Structure
+ *
+ * The notification structure is defined as the union of all
+ * notification types.
+ *
+ */
+union sctp_notification {
+ struct {
+ __u16 sn_type; /* Notification type. */
+ __u16 sn_flags;
+ __u32 sn_length;
+ } sn_header;
+ struct sctp_assoc_change sn_assoc_change;
+ struct sctp_paddr_change sn_paddr_change;
+ struct sctp_remote_error sn_remote_error;
+ struct sctp_send_failed sn_send_failed;
+ struct sctp_shutdown_event sn_shutdown_event;
+ struct sctp_adaptation_event sn_adaptation_event;
+ struct sctp_pdapi_event sn_pdapi_event;
+ struct sctp_authkey_event sn_authkey_event;
+ struct sctp_sender_dry_event sn_sender_dry_event;
+ struct sctp_stream_reset_event sn_strreset_event;
+ struct sctp_assoc_reset_event sn_assocreset_event;
+ struct sctp_stream_change_event sn_strchange_event;
+};
+
+/* Section 5.3.1
+ * All standard values for sn_type flags are greater than 2^15.
+ * Values from 2^15 and down are reserved.
+ */
+
+enum sctp_sn_type {
+ SCTP_SN_TYPE_BASE = (1<<15),
+ SCTP_ASSOC_CHANGE,
+#define SCTP_ASSOC_CHANGE SCTP_ASSOC_CHANGE
+ SCTP_PEER_ADDR_CHANGE,
+#define SCTP_PEER_ADDR_CHANGE SCTP_PEER_ADDR_CHANGE
+ SCTP_SEND_FAILED,
+#define SCTP_SEND_FAILED SCTP_SEND_FAILED
+ SCTP_REMOTE_ERROR,
+#define SCTP_REMOTE_ERROR SCTP_REMOTE_ERROR
+ SCTP_SHUTDOWN_EVENT,
+#define SCTP_SHUTDOWN_EVENT SCTP_SHUTDOWN_EVENT
+ SCTP_PARTIAL_DELIVERY_EVENT,
+#define SCTP_PARTIAL_DELIVERY_EVENT SCTP_PARTIAL_DELIVERY_EVENT
+ SCTP_ADAPTATION_INDICATION,
+#define SCTP_ADAPTATION_INDICATION SCTP_ADAPTATION_INDICATION
+ SCTP_AUTHENTICATION_EVENT,
+#define SCTP_AUTHENTICATION_INDICATION SCTP_AUTHENTICATION_EVENT
+ SCTP_SENDER_DRY_EVENT,
+#define SCTP_SENDER_DRY_EVENT SCTP_SENDER_DRY_EVENT
+ SCTP_STREAM_RESET_EVENT,
+#define SCTP_STREAM_RESET_EVENT SCTP_STREAM_RESET_EVENT
+ SCTP_ASSOC_RESET_EVENT,
+#define SCTP_ASSOC_RESET_EVENT SCTP_ASSOC_RESET_EVENT
+ SCTP_STREAM_CHANGE_EVENT,
+#define SCTP_STREAM_CHANGE_EVENT SCTP_STREAM_CHANGE_EVENT
+};
+
+/* Notification error codes used to fill up the error fields in some
+ * notifications.
+ * SCTP_PEER_ADDRESS_CHAGE : spc_error
+ * SCTP_ASSOC_CHANGE : sac_error
+ * These names should be potentially included in the draft 04 of the SCTP
+ * sockets API specification.
+ */
+typedef enum sctp_sn_error {
+ SCTP_FAILED_THRESHOLD,
+ SCTP_RECEIVED_SACK,
+ SCTP_HEARTBEAT_SUCCESS,
+ SCTP_RESPONSE_TO_USER_REQ,
+ SCTP_INTERNAL_ERROR,
+ SCTP_SHUTDOWN_GUARD_EXPIRES,
+ SCTP_PEER_FAULTY,
+} sctp_sn_error_t;
+
+/*
+ * 7.1.1 Retransmission Timeout Parameters (SCTP_RTOINFO)
+ *
+ * The protocol parameters used to initialize and bound retransmission
+ * timeout (RTO) are tunable. See [SCTP] for more information on how
+ * these parameters are used in RTO calculation.
+ */
+struct sctp_rtoinfo {
+ sctp_assoc_t srto_assoc_id;
+ __u32 srto_initial;
+ __u32 srto_max;
+ __u32 srto_min;
+};
+
+/*
+ * 7.1.2 Association Parameters (SCTP_ASSOCINFO)
+ *
+ * This option is used to both examine and set various association and
+ * endpoint parameters.
+ */
+struct sctp_assocparams {
+ sctp_assoc_t sasoc_assoc_id;
+ __u16 sasoc_asocmaxrxt;
+ __u16 sasoc_number_peer_destinations;
+ __u32 sasoc_peer_rwnd;
+ __u32 sasoc_local_rwnd;
+ __u32 sasoc_cookie_life;
+};
+
+/*
+ * 7.1.9 Set Peer Primary Address (SCTP_SET_PEER_PRIMARY_ADDR)
+ *
+ * Requests that the peer mark the enclosed address as the association
+ * primary. The enclosed address must be one of the association's
+ * locally bound addresses. The following structure is used to make a
+ * set primary request:
+ */
+struct sctp_setpeerprim {
+ sctp_assoc_t sspp_assoc_id;
+ struct sockaddr_storage sspp_addr;
+} __attribute__((packed, aligned(4)));
+
+/*
+ * 7.1.10 Set Primary Address (SCTP_PRIMARY_ADDR)
+ *
+ * Requests that the local SCTP stack use the enclosed peer address as
+ * the association primary. The enclosed address must be one of the
+ * association peer's addresses. The following structure is used to
+ * make a set peer primary request:
+ */
+struct sctp_prim {
+ sctp_assoc_t ssp_assoc_id;
+ struct sockaddr_storage ssp_addr;
+} __attribute__((packed, aligned(4)));
+
+/* For backward compatibility use, define the old name too */
+#define sctp_setprim sctp_prim
+
+/*
+ * 7.1.11 Set Adaptation Layer Indicator (SCTP_ADAPTATION_LAYER)
+ *
+ * Requests that the local endpoint set the specified Adaptation Layer
+ * Indication parameter for all future INIT and INIT-ACK exchanges.
+ */
+struct sctp_setadaptation {
+ __u32 ssb_adaptation_ind;
+};
+
+/*
+ * 7.1.13 Peer Address Parameters (SCTP_PEER_ADDR_PARAMS)
+ *
+ * Applications can enable or disable heartbeats for any peer address
+ * of an association, modify an address's heartbeat interval, force a
+ * heartbeat to be sent immediately, and adjust the address's maximum
+ * number of retransmissions sent before an address is considered
+ * unreachable. The following structure is used to access and modify an
+ * address's parameters:
+ */
+enum sctp_spp_flags {
+ SPP_HB_ENABLE = 1<<0, /*Enable heartbeats*/
+ SPP_HB_DISABLE = 1<<1, /*Disable heartbeats*/
+ SPP_HB = SPP_HB_ENABLE | SPP_HB_DISABLE,
+ SPP_HB_DEMAND = 1<<2, /*Send heartbeat immediately*/
+ SPP_PMTUD_ENABLE = 1<<3, /*Enable PMTU discovery*/
+ SPP_PMTUD_DISABLE = 1<<4, /*Disable PMTU discovery*/
+ SPP_PMTUD = SPP_PMTUD_ENABLE | SPP_PMTUD_DISABLE,
+ SPP_SACKDELAY_ENABLE = 1<<5, /*Enable SACK*/
+ SPP_SACKDELAY_DISABLE = 1<<6, /*Disable SACK*/
+ SPP_SACKDELAY = SPP_SACKDELAY_ENABLE | SPP_SACKDELAY_DISABLE,
+ SPP_HB_TIME_IS_ZERO = 1<<7, /* Set HB delay to 0 */
+};
+
+struct sctp_paddrparams {
+ sctp_assoc_t spp_assoc_id;
+ struct sockaddr_storage spp_address;
+ __u32 spp_hbinterval;
+ __u16 spp_pathmaxrxt;
+ __u32 spp_pathmtu;
+ __u32 spp_sackdelay;
+ __u32 spp_flags;
+} __attribute__((packed, aligned(4)));
+
+/*
+ * 7.1.18. Add a chunk that must be authenticated (SCTP_AUTH_CHUNK)
+ *
+ * This set option adds a chunk type that the user is requesting to be
+ * received only in an authenticated way. Changes to the list of chunks
+ * will only effect future associations on the socket.
+ */
+struct sctp_authchunk {
+ __u8 sauth_chunk;
+};
+
+/*
+ * 7.1.19. Get or set the list of supported HMAC Identifiers (SCTP_HMAC_IDENT)
+ *
+ * This option gets or sets the list of HMAC algorithms that the local
+ * endpoint requires the peer to use.
+ */
+/* This here is only used by user space as is. It might not be a good idea
+ * to export/reveal the whole structure with reserved fields etc.
+ */
+enum {
+ SCTP_AUTH_HMAC_ID_SHA1 = 1,
+ SCTP_AUTH_HMAC_ID_SHA256 = 3,
+};
+
+struct sctp_hmacalgo {
+ __u32 shmac_num_idents;
+ __u16 shmac_idents[];
+};
+
+/* Sadly, user and kernel space have different names for
+ * this structure member, so this is to not break anything.
+ */
+#define shmac_number_of_idents shmac_num_idents
+
+/*
+ * 7.1.20. Set a shared key (SCTP_AUTH_KEY)
+ *
+ * This option will set a shared secret key which is used to build an
+ * association shared key.
+ */
+struct sctp_authkey {
+ sctp_assoc_t sca_assoc_id;
+ __u16 sca_keynumber;
+ __u16 sca_keylength;
+ __u8 sca_key[];
+};
+
+/*
+ * 7.1.21. Get or set the active shared key (SCTP_AUTH_ACTIVE_KEY)
+ *
+ * This option will get or set the active shared key to be used to build
+ * the association shared key.
+ */
+
+struct sctp_authkeyid {
+ sctp_assoc_t scact_assoc_id;
+ __u16 scact_keynumber;
+};
+
+
+/*
+ * 7.1.23. Get or set delayed ack timer (SCTP_DELAYED_SACK)
+ *
+ * This option will effect the way delayed acks are performed. This
+ * option allows you to get or set the delayed ack time, in
+ * milliseconds. It also allows changing the delayed ack frequency.
+ * Changing the frequency to 1 disables the delayed sack algorithm. If
+ * the assoc_id is 0, then this sets or gets the endpoints default
+ * values. If the assoc_id field is non-zero, then the set or get
+ * effects the specified association for the one to many model (the
+ * assoc_id field is ignored by the one to one model). Note that if
+ * sack_delay or sack_freq are 0 when setting this option, then the
+ * current values will remain unchanged.
+ */
+struct sctp_sack_info {
+ sctp_assoc_t sack_assoc_id;
+ uint32_t sack_delay;
+ uint32_t sack_freq;
+};
+
+struct sctp_assoc_value {
+ sctp_assoc_t assoc_id;
+ uint32_t assoc_value;
+};
+
+struct sctp_stream_value {
+ sctp_assoc_t assoc_id;
+ uint16_t stream_id;
+ uint16_t stream_value;
+};
+
+/*
+ * 7.2.2 Peer Address Information
+ *
+ * Applications can retrieve information about a specific peer address
+ * of an association, including its reachability state, congestion
+ * window, and retransmission timer values. This information is
+ * read-only. The following structure is used to access this
+ * information:
+ */
+struct sctp_paddrinfo {
+ sctp_assoc_t spinfo_assoc_id;
+ struct sockaddr_storage spinfo_address;
+ __s32 spinfo_state;
+ __u32 spinfo_cwnd;
+ __u32 spinfo_srtt;
+ __u32 spinfo_rto;
+ __u32 spinfo_mtu;
+} __attribute__((packed, aligned(4)));
+
+/* Peer addresses's state. */
+/* UNKNOWN: Peer address passed by the upper layer in sendmsg or connect[x]
+ * calls.
+ * UNCONFIRMED: Peer address received in INIT/INIT-ACK address parameters.
+ * Not yet confirmed by a heartbeat and not available for data
+ * transfers.
+ * ACTIVE : Peer address confirmed, active and available for data transfers.
+ * INACTIVE: Peer address inactive and not available for data transfers.
+ */
+enum sctp_spinfo_state {
+ SCTP_INACTIVE,
+ SCTP_PF,
+ SCTP_ACTIVE,
+ SCTP_UNCONFIRMED,
+ SCTP_UNKNOWN = 0xffff /* Value used for transport state unknown */
+};
+
+/*
+ * 7.2.1 Association Status (SCTP_STATUS)
+ *
+ * Applications can retrieve current status information about an
+ * association, including association state, peer receiver window size,
+ * number of unacked data chunks, and number of data chunks pending
+ * receipt. This information is read-only. The following structure is
+ * used to access this information:
+ */
+struct sctp_status {
+ sctp_assoc_t sstat_assoc_id;
+ __s32 sstat_state;
+ __u32 sstat_rwnd;
+ __u16 sstat_unackdata;
+ __u16 sstat_penddata;
+ __u16 sstat_instrms;
+ __u16 sstat_outstrms;
+ __u32 sstat_fragmentation_point;
+ struct sctp_paddrinfo sstat_primary;
+};
+
+/*
+ * 7.2.3. Get the list of chunks the peer requires to be authenticated
+ * (SCTP_PEER_AUTH_CHUNKS)
+ *
+ * This option gets a list of chunks for a specified association that
+ * the peer requires to be received authenticated only.
+ */
+struct sctp_authchunks {
+ sctp_assoc_t gauth_assoc_id;
+ __u32 gauth_number_of_chunks;
+ uint8_t gauth_chunks[];
+};
+
+/* The broken spelling has been released already in lksctp-tools header,
+ * so don't break anyone, now that it's fixed.
+ */
+#define guth_number_of_chunks gauth_number_of_chunks
+
+/* Association states. */
+enum sctp_sstat_state {
+ SCTP_EMPTY = 0,
+ SCTP_CLOSED = 1,
+ SCTP_COOKIE_WAIT = 2,
+ SCTP_COOKIE_ECHOED = 3,
+ SCTP_ESTABLISHED = 4,
+ SCTP_SHUTDOWN_PENDING = 5,
+ SCTP_SHUTDOWN_SENT = 6,
+ SCTP_SHUTDOWN_RECEIVED = 7,
+ SCTP_SHUTDOWN_ACK_SENT = 8,
+};
+
+/*
+ * 8.2.6. Get the Current Identifiers of Associations
+ * (SCTP_GET_ASSOC_ID_LIST)
+ *
+ * This option gets the current list of SCTP association identifiers of
+ * the SCTP associations handled by a one-to-many style socket.
+ */
+struct sctp_assoc_ids {
+ __u32 gaids_number_of_ids;
+ sctp_assoc_t gaids_assoc_id[];
+};
+
+/*
+ * 8.3, 8.5 get all peer/local addresses in an association.
+ * This parameter struct is used by SCTP_GET_PEER_ADDRS and
+ * SCTP_GET_LOCAL_ADDRS socket options used internally to implement
+ * sctp_getpaddrs() and sctp_getladdrs() API.
+ */
+struct sctp_getaddrs_old {
+ sctp_assoc_t assoc_id;
+ int addr_num;
+ struct sockaddr *addrs;
+};
+
+struct sctp_getaddrs {
+ sctp_assoc_t assoc_id; /*input*/
+ __u32 addr_num; /*output*/
+ __u8 addrs[0]; /*output, variable size*/
+};
+
+/* A socket user request obtained via SCTP_GET_ASSOC_STATS that retrieves
+ * association stats. All stats are counts except sas_maxrto and
+ * sas_obs_rto_ipaddr. maxrto is the max observed rto + transport since
+ * the last call. Will return 0 when RTO was not update since last call
+ */
+struct sctp_assoc_stats {
+ sctp_assoc_t sas_assoc_id; /* Input */
+ /* Transport of observed max RTO */
+ struct sockaddr_storage sas_obs_rto_ipaddr;
+ __u64 sas_maxrto; /* Maximum Observed RTO for period */
+ __u64 sas_isacks; /* SACKs received */
+ __u64 sas_osacks; /* SACKs sent */
+ __u64 sas_opackets; /* Packets sent */
+ __u64 sas_ipackets; /* Packets received */
+ __u64 sas_rtxchunks; /* Retransmitted Chunks */
+ __u64 sas_outofseqtsns;/* TSN received > next expected */
+ __u64 sas_idupchunks; /* Dups received (ordered+unordered) */
+ __u64 sas_gapcnt; /* Gap Acknowledgements Received */
+ __u64 sas_ouodchunks; /* Unordered data chunks sent */
+ __u64 sas_iuodchunks; /* Unordered data chunks received */
+ __u64 sas_oodchunks; /* Ordered data chunks sent */
+ __u64 sas_iodchunks; /* Ordered data chunks received */
+ __u64 sas_octrlchunks; /* Control chunks sent */
+ __u64 sas_ictrlchunks; /* Control chunks received */
+};
+
+/*
+ * 8.1 sctp_bindx()
+ *
+ * The flags parameter is formed from the bitwise OR of zero or more of the
+ * following currently defined flags:
+ */
+#define SCTP_BINDX_ADD_ADDR 0x01
+#define SCTP_BINDX_REM_ADDR 0x02
+
+/* This is the structure that is passed as an argument(optval) to
+ * getsockopt(SCTP_SOCKOPT_PEELOFF).
+ */
+typedef struct {
+ sctp_assoc_t associd;
+ int sd;
+} sctp_peeloff_arg_t;
+
+typedef struct {
+ sctp_peeloff_arg_t p_arg;
+ unsigned flags;
+} sctp_peeloff_flags_arg_t;
+
+/*
+ * Peer Address Thresholds socket option
+ */
+struct sctp_paddrthlds {
+ sctp_assoc_t spt_assoc_id;
+ struct sockaddr_storage spt_address;
+ __u16 spt_pathmaxrxt;
+ __u16 spt_pathpfthld;
+};
+
+/*
+ * Socket Option for Getting the Association/Stream-Specific PR-SCTP Status
+ */
+struct sctp_prstatus {
+ sctp_assoc_t sprstat_assoc_id;
+ __u16 sprstat_sid;
+ __u16 sprstat_policy;
+ __u64 sprstat_abandoned_unsent;
+ __u64 sprstat_abandoned_sent;
+};
+
+struct sctp_default_prinfo {
+ sctp_assoc_t pr_assoc_id;
+ __u32 pr_value;
+ __u16 pr_policy;
+};
+
+struct sctp_info {
+ __u32 sctpi_tag;
+ __u32 sctpi_state;
+ __u32 sctpi_rwnd;
+ __u16 sctpi_unackdata;
+ __u16 sctpi_penddata;
+ __u16 sctpi_instrms;
+ __u16 sctpi_outstrms;
+ __u32 sctpi_fragmentation_point;
+ __u32 sctpi_inqueue;
+ __u32 sctpi_outqueue;
+ __u32 sctpi_overall_error;
+ __u32 sctpi_max_burst;
+ __u32 sctpi_maxseg;
+ __u32 sctpi_peer_rwnd;
+ __u32 sctpi_peer_tag;
+ __u8 sctpi_peer_capable;
+ __u8 sctpi_peer_sack;
+ __u16 __reserved1;
+
+ /* assoc status info */
+ __u64 sctpi_isacks;
+ __u64 sctpi_osacks;
+ __u64 sctpi_opackets;
+ __u64 sctpi_ipackets;
+ __u64 sctpi_rtxchunks;
+ __u64 sctpi_outofseqtsns;
+ __u64 sctpi_idupchunks;
+ __u64 sctpi_gapcnt;
+ __u64 sctpi_ouodchunks;
+ __u64 sctpi_iuodchunks;
+ __u64 sctpi_oodchunks;
+ __u64 sctpi_iodchunks;
+ __u64 sctpi_octrlchunks;
+ __u64 sctpi_ictrlchunks;
+
+ /* primary transport info */
+ struct sockaddr_storage sctpi_p_address;
+ __s32 sctpi_p_state;
+ __u32 sctpi_p_cwnd;
+ __u32 sctpi_p_srtt;
+ __u32 sctpi_p_rto;
+ __u32 sctpi_p_hbinterval;
+ __u32 sctpi_p_pathmaxrxt;
+ __u32 sctpi_p_sackdelay;
+ __u32 sctpi_p_sackfreq;
+ __u32 sctpi_p_ssthresh;
+ __u32 sctpi_p_partial_bytes_acked;
+ __u32 sctpi_p_flight_size;
+ __u16 sctpi_p_error;
+ __u16 __reserved2;
+
+ /* sctp sock info */
+ __u32 sctpi_s_autoclose;
+ __u32 sctpi_s_adaptation_ind;
+ __u32 sctpi_s_pd_point;
+ __u8 sctpi_s_nodelay;
+ __u8 sctpi_s_disable_fragments;
+ __u8 sctpi_s_v4mapped;
+ __u8 sctpi_s_frag_interleave;
+ __u32 sctpi_s_type;
+ __u32 __reserved3;
+};
+
+struct sctp_reset_streams {
+ sctp_assoc_t srs_assoc_id;
+ uint16_t srs_flags;
+ uint16_t srs_number_streams; /* 0 == ALL */
+ uint16_t srs_stream_list[]; /* list if srs_num_streams is not 0 */
+};
+
+struct sctp_add_streams {
+ sctp_assoc_t sas_assoc_id;
+ uint16_t sas_instrms;
+ uint16_t sas_outstrms;
+};
+
+/* SCTP Stream schedulers */
+enum sctp_sched_type {
+ SCTP_SS_FCFS,
+ SCTP_SS_DEFAULT = SCTP_SS_FCFS,
+ SCTP_SS_PRIO,
+ SCTP_SS_RR,
+ SCTP_SS_MAX = SCTP_SS_RR
+};
+
+#endif /* _SCTP_H */
diff --git a/third_party/lksctp-tools/BUILD b/third_party/lksctp-tools/BUILD
new file mode 100644
index 0000000..b127b77
--- /dev/null
+++ b/third_party/lksctp-tools/BUILD
@@ -0,0 +1,35 @@
+licenses(["notice"])
+
+genrule(
+ name = "sctp_copy",
+ srcs = [
+ "src/include/netinet/sctp.h.in",
+ ],
+ outs = [
+ "src/include/netinet/sctp.h",
+ ],
+ cmd = "cp $< $@",
+)
+
+cc_library(
+ name = "sctp",
+ srcs = [
+ "src/lib/addrs.c",
+ "src/lib/bindx.c",
+ "src/lib/opt_info.c",
+ "src/lib/peeloff.c",
+ "src/lib/recvmsg.c",
+ "src/lib/sendmsg.c",
+ ],
+ hdrs = [
+ "src/include/netinet/sctp.h",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ "-Wno-cast-qual",
+ ],
+ includes = [
+ "src/include",
+ ],
+ visibility = ["//visibility:public"],
+)
diff --git a/third_party/lksctp-tools/src/include/netinet/sctp.h.in b/third_party/lksctp-tools/src/include/netinet/sctp.h.in
index 2009f1c..c1d4d2c 100644
--- a/third_party/lksctp-tools/src/include/netinet/sctp.h.in
+++ b/third_party/lksctp-tools/src/include/netinet/sctp.h.in
@@ -60,15 +60,17 @@
#define HAVE_SCTP_ADDIP
#define HAVE_SCTP_CANSET_PRIMARY
-#undef HAVE_SCTP_STREAM_RESET_EVENT
-#undef HAVE_SCTP_STREAM_RECONFIG
-#undef HAVE_SCTP_PEELOFF_FLAGS
-#undef HAVE_SCTP_PDAPI_EVENT_PDAPI_STREAM
-#undef HAVE_SCTP_PDAPI_EVENT_PDAPI_SEQ
-#undef HAVE_SCTP_SENDV
-#undef HAVE_SCTP_AUTH_NO_AUTH
-#undef HAVE_SCTP_SPP_IPV6_FLOWLABEL
-#undef HAVE_SCTP_SPP_DSCP
+#define HAVE_SCTP_STREAM_RESET_EVENT 1
+/* #undef HAVE_SCTP_ASSOC_RESET_EVENT */
+/* #undef HAVE_SCTP_STREAM_CHANGE_EVENT */
+#define HAVE_SCTP_STREAM_RECONFIG 1
+/* #undef HAVE_SCTP_PEELOFF_FLAGS */
+#define HAVE_SCTP_PDAPI_EVENT_PDAPI_STREAM 1
+#define HAVE_SCTP_PDAPI_EVENT_PDAPI_SEQ 1
+/* #undef HAVE_SCTP_SENDV */
+/* #undef HAVE_SCTP_AUTH_NO_AUTH */
+/* #undef HAVE_SCTP_SPP_IPV6_FLOWLABEL */
+/* #undef HAVE_SCTP_SPP_DSCP */
int sctp_bindx(int sd, struct sockaddr *addrs, int addrcnt, int flags);
diff --git a/tools/cpp/CROSSTOOL b/tools/cpp/CROSSTOOL
index 39f29d8..20f47e3 100644
--- a/tools/cpp/CROSSTOOL
+++ b/tools/cpp/CROSSTOOL
@@ -932,6 +932,8 @@
compiler_flag: "external/linaro_linux_gcc_repo/include/c++/7.4.1"
compiler_flag: "-isystem"
compiler_flag: "external/linaro_linux_gcc_repo/arm-linux-gnueabihf/libc/usr/include"
+ compiler_flag: "-isystem"
+ compiler_flag: "external/org_frc971/third_party"
compiler_flag: "-D__STDC_FORMAT_MACROS"
compiler_flag: "-D__STDC_CONSTANT_MACROS"
compiler_flag: "-D__STDC_LIMIT_MACROS"