Merge changes Icd0187f2,I81a0da10
* changes:
Add drivetrain replay application
Enable renaming logged locations
diff --git a/aos/config.bzl b/aos/config.bzl
index 3f78422..0766329 100644
--- a/aos/config.bzl
+++ b/aos/config.bzl
@@ -2,7 +2,7 @@
AosConfigInfo = provider(fields = ["transitive_flatbuffers", "transitive_src"])
-def aos_config(name, src, flatbuffers, deps = [], visibility = None):
+def aos_config(name, src, flatbuffers = [], deps = [], visibility = None):
_aos_config(
name = name,
src = src,
diff --git a/aos/configuration.cc b/aos/configuration.cc
index 46f07ac..a40c47c 100644
--- a/aos/configuration.cc
+++ b/aos/configuration.cc
@@ -7,6 +7,8 @@
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <unistd.h>
+
+#include <set>
#include <string_view>
#include "absl/container/btree_set.h"
@@ -730,5 +732,57 @@
<< static_cast<int>(connection->timestamp_logger());
}
+std::vector<std::string_view> SourceNodeNames(const Configuration *config,
+ const Node *my_node) {
+ std::set<std::string_view> result_set;
+
+ for (const Channel *channel : *config->channels()) {
+ if (channel->has_destination_nodes()) {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (connection->name()->string_view() ==
+ my_node->name()->string_view()) {
+ result_set.insert(channel->source_node()->string_view());
+ }
+ }
+ }
+ }
+
+ std::vector<std::string_view> result;
+ for (const std::string_view source : result_set) {
+ VLOG(1) << "Found a source node of " << source;
+ result.emplace_back(source);
+ }
+ return result;
+}
+
+std::vector<std::string_view> DestinationNodeNames(const Configuration *config,
+ const Node *my_node) {
+ std::vector<std::string_view> result;
+
+ for (const Channel *channel : *config->channels()) {
+ if (channel->has_source_node() && channel->source_node()->string_view() ==
+ my_node->name()->string_view()) {
+ if (!channel->has_destination_nodes()) continue;
+
+ if (channel->source_node()->string_view() !=
+ my_node->name()->string_view()) {
+ continue;
+ }
+
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (std::find(result.begin(), result.end(),
+ connection->name()->string_view()) == result.end()) {
+ result.emplace_back(connection->name()->string_view());
+ }
+ }
+ }
+ }
+
+ for (const std::string_view destination : result) {
+ VLOG(1) << "Found a destination node of " << destination;
+ }
+ return result;
+}
+
} // namespace configuration
} // namespace aos
diff --git a/aos/configuration.h b/aos/configuration.h
index a078dea..ef30fce 100644
--- a/aos/configuration.h
+++ b/aos/configuration.h
@@ -90,6 +90,14 @@
// Prints a channel to json, but without the schema.
std::string CleanedChannelToString(const Channel *channel);
+// Returns the node names that this node should be forwarding to.
+std::vector<std::string_view> DestinationNodeNames(const Configuration *config,
+ const Node *my_node);
+
+// Returns the node names that this node should be receiving messages from.
+std::vector<std::string_view> SourceNodeNames(const Configuration *config,
+ const Node *my_node);
+
// TODO(austin): GetSchema<T>(const Flatbuffer<Configuration> &config);
} // namespace configuration
diff --git a/aos/configuration_test.cc b/aos/configuration_test.cc
index fab6745..b8fc5b6 100644
--- a/aos/configuration_test.cc
+++ b/aos/configuration_test.cc
@@ -7,6 +7,7 @@
#include "flatbuffers/reflection.h"
#include "glog/logging.h"
#include "gtest/gtest.h"
+#include "gmock/gmock.h"
namespace aos {
namespace configuration {
@@ -567,6 +568,37 @@
&logged_on_both_channel.message(), &baz_node.message(),
&baz_node.message()));
}
+
+// Tests that we can deduce source nodes from a multinode config.
+TEST_F(ConfigurationTest, SourceNodeNames) {
+ FlatbufferDetachedBuffer<Configuration> config =
+ ReadConfig("aos/testdata/config1_multinode.json");
+
+ // This is a bit simplistic in that it doesn't test deduplication, but it does
+ // exercise a lot of the logic.
+ EXPECT_THAT(
+ SourceNodeNames(&config.message(), config.message().nodes()->Get(0)),
+ ::testing::ElementsAreArray({"pi2"}));
+ EXPECT_THAT(
+ SourceNodeNames(&config.message(), config.message().nodes()->Get(1)),
+ ::testing::ElementsAreArray({"pi1"}));
+}
+
+// Tests that we can deduce destination nodes from a multinode config.
+TEST_F(ConfigurationTest, DestinationNodeNames) {
+ FlatbufferDetachedBuffer<Configuration> config =
+ ReadConfig("aos/testdata/config1_multinode.json");
+
+ // This is a bit simplistic in that it doesn't test deduplication, but it does
+ // exercise a lot of the logic.
+ EXPECT_THAT(
+ DestinationNodeNames(&config.message(), config.message().nodes()->Get(0)),
+ ::testing::ElementsAreArray({"pi2"}));
+ EXPECT_THAT(
+ DestinationNodeNames(&config.message(), config.message().nodes()->Get(1)),
+ ::testing::ElementsAreArray({"pi1"}));
+}
+
} // namespace testing
} // namespace configuration
} // namespace aos
diff --git a/aos/events/BUILD b/aos/events/BUILD
index e40ab99..79200bd 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -121,6 +121,8 @@
flatbuffers = [
":ping_fbs",
":pong_fbs",
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:message_bridge_server_fbs",
],
deps = [":config"],
)
diff --git a/aos/events/logging/BUILD b/aos/events/logging/BUILD
index 97d5783..b2be972 100644
--- a/aos/events/logging/BUILD
+++ b/aos/events/logging/BUILD
@@ -1,4 +1,5 @@
load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:config.bzl", "aos_config")
flatbuffer_cc_library(
name = "logger_fbs",
@@ -84,11 +85,21 @@
],
)
+aos_config(
+ name = "multinode_pingpong_config",
+ src = "multinode_pingpong.json",
+ flatbuffers = [
+ "//aos/events:ping_fbs",
+ "//aos/events:pong_fbs",
+ ],
+ deps = ["//aos/events:config"],
+)
+
cc_test(
name = "logger_test",
srcs = ["logger_test.cc"],
data = [
- "//aos/events:multinode_pingpong_config.json",
+ ":multinode_pingpong_config.json",
"//aos/events:pingpong_config.json",
],
deps = [
diff --git a/aos/events/logging/logger_test.cc b/aos/events/logging/logger_test.cc
index e1808b1..98f6a74 100644
--- a/aos/events/logging/logger_test.cc
+++ b/aos/events/logging/logger_test.cc
@@ -144,7 +144,7 @@
public:
MultinodeLoggerTest()
: config_(aos::configuration::ReadConfig(
- "aos/events/multinode_pingpong_config.json")),
+ "aos/events/logging/multinode_pingpong_config.json")),
event_loop_factory_(&config_.message(), "pi1"),
ping_event_loop_(event_loop_factory_.MakeEventLoop("ping")),
ping_(ping_event_loop_.get()) {}
diff --git a/aos/events/logging/multinode_pingpong.json b/aos/events/logging/multinode_pingpong.json
new file mode 100644
index 0000000..f85a4e1
--- /dev/null
+++ b/aos/events/logging/multinode_pingpong.json
@@ -0,0 +1,93 @@
+{
+ "channels": [
+ /* Logged on pi1 locally */
+ {
+ "name": "/aos/pi1",
+ "type": "aos.timing.Report",
+ "source_node": "pi1",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.timing.Report",
+ "source_node": "pi2",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ /* Forwarded to pi2.
+ * Doesn't matter where timestamps are logged for the test.
+ */
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "pi1",
+ "time_to_live": 5000000
+ }
+ ]
+ },
+ /* Forwarded back to pi1.
+ * The message is logged both on the sending node and the receiving node
+ * (to make it easier to look at the results for now).
+ *
+ * The timestamps are logged on the receiving node.
+ */
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong",
+ "source_node": "pi2",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
+ }
+ ]
+ }
+ ],
+ "maps": [
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/aos/pi1"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/aos/pi2"
+ }
+ }
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2",
+ "port": 9971
+ }
+ ]
+}
diff --git a/aos/events/multinode_pingpong.json b/aos/events/multinode_pingpong.json
index f0e532e..c0c5087 100644
--- a/aos/events/multinode_pingpong.json
+++ b/aos/events/multinode_pingpong.json
@@ -2,6 +2,54 @@
"channels": [
{
"name": "/aos/pi1",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi3",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi3",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/roborio",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "roborio",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi1",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi3",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi3",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/roborio",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "roborio",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi1",
"type": "aos.timing.Report",
"source_node": "pi1",
"frequency": 50,
@@ -25,6 +73,14 @@
"max_size": 2048
},
{
+ "name": "/aos/roborio",
+ "type": "aos.timing.Report",
+ "source_node": "roborio",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
"name": "/test",
"type": "aos.examples.Ping",
"source_node": "pi1",
@@ -32,8 +88,8 @@
{
"name": "pi2",
"priority": 1,
- "timestamp_logger": "REMOTE_LOGGER",
- "timestamp_logger_node": "pi1"
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
}
]
},
@@ -41,14 +97,12 @@
"name": "/test",
"type": "aos.examples.Pong",
"source_node": "pi2",
- "logger": "LOCAL_AND_REMOTE_LOGGER",
- "logger_node": "pi1",
"destination_nodes": [
{
"name": "pi1",
"priority": 1,
- "timestamp_logger": "REMOTE_LOGGER",
- "timestamp_logger_node": "pi1"
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
}
]
},
@@ -60,8 +114,8 @@
{
"name": "pi3",
"priority": 1,
- "timestamp_logger": "REMOTE_LOGGER",
- "timestamp_logger_node": "pi1"
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
}
]
},
@@ -69,14 +123,12 @@
"name": "/test2",
"type": "aos.examples.Pong",
"source_node": "pi3",
- "logger": "LOCAL_AND_REMOTE_LOGGER",
- "logger_node": "pi1",
"destination_nodes": [
{
"name": "pi1",
"priority": 1,
- "timestamp_logger": "REMOTE_LOGGER",
- "timestamp_logger_node": "pi1"
+ "timestamp_logger": "LOCAL_LOGGER",
+ "time_to_live": 5000000
}
]
}
@@ -111,6 +163,96 @@
"rename": {
"name": "/aos/pi3"
}
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.timing.Report",
+ "source_node": "roborio"
+ },
+ "rename": {
+ "name": "/aos/roborio"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/aos/pi1"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/aos/pi2"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi3"
+ },
+ "rename": {
+ "name": "/aos/pi3"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "roborio"
+ },
+ "rename": {
+ "name": "/aos/roborio"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/aos/pi1"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/aos/pi2"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi3"
+ },
+ "rename": {
+ "name": "/aos/pi3"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "roborio"
+ },
+ "rename": {
+ "name": "/aos/roborio"
+ }
}
],
"nodes": [
@@ -128,6 +270,11 @@
"name": "pi3",
"hostname": "raspberrypi3",
"port": 9971
+ },
+ {
+ "name": "roborio",
+ "hostname": "roboRIO-6971-FRC",
+ "port": 9971
}
],
"applications": [
@@ -156,6 +303,32 @@
}
}
]
+ },
+ {
+ "name": "ping3",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test3"
+ }
+ }
+ ]
+ },
+ {
+ "name": "pong3",
+ "maps": [
+ {
+ "match": {
+ "name": "/test"
+ },
+ "rename": {
+ "name": "/test3"
+ }
+ }
+ ]
}
]
}
diff --git a/aos/events/shm_event_loop.cc b/aos/events/shm_event_loop.cc
index 3ced933..bdee4f2 100644
--- a/aos/events/shm_event_loop.cc
+++ b/aos/events/shm_event_loop.cc
@@ -21,10 +21,26 @@
#include "aos/util/phased_loop.h"
#include "glog/logging.h"
+namespace {
+
+// Returns the portion of the path after the last /. This very much assumes
+// that the application name is null terminated.
+const char *Filename(const char *path) {
+ const std::string_view path_string_view = path;
+ auto last_slash_pos = path_string_view.find_last_of("/");
+
+ return last_slash_pos == std::string_view::npos ? path
+ : path + last_slash_pos + 1;
+}
+
+} // namespace
+
DEFINE_string(shm_base, "/dev/shm/aos",
"Directory to place queue backing mmaped files in.");
DEFINE_uint32(permissions, 0770,
"Permissions to make shared memory files and folders.");
+DEFINE_string(application_name, Filename(program_invocation_name),
+ "The application name");
namespace aos {
@@ -135,15 +151,6 @@
namespace {
-// Returns the portion of the path after the last /.
-std::string_view Filename(std::string_view path) {
- auto last_slash_pos = path.find_last_of("/");
-
- return last_slash_pos == std::string_view::npos
- ? path
- : path.substr(last_slash_pos + 1, path.size());
-}
-
const Node *MaybeMyNode(const Configuration *configuration) {
if (!configuration->has_nodes()) {
return nullptr;
@@ -158,7 +165,7 @@
ShmEventLoop::ShmEventLoop(const Configuration *configuration)
: EventLoop(configuration),
- name_(Filename(program_invocation_name)),
+ name_(FLAGS_application_name),
node_(MaybeMyNode(configuration)) {
if (configuration->has_nodes()) {
CHECK(node_ != nullptr) << ": Couldn't find node in config.";
@@ -802,15 +809,16 @@
}
SignalHandler::global()->Unregister(this);
+
+ // Trigger any remaining senders or fetchers to be cleared before destroying
+ // the event loop so the book keeping matches. Do this in the thread that
+ // created the timing reporter.
+ timing_report_sender_.reset();
}
void ShmEventLoop::Exit() { epoll_.Quit(); }
ShmEventLoop::~ShmEventLoop() {
- // Trigger any remaining senders or fetchers to be cleared before destroying
- // the event loop so the book keeping matches.
- timing_report_sender_.reset();
-
// Force everything with a registered fd with epoll to be destroyed now.
timers_.clear();
phased_loops_.clear();
diff --git a/aos/events/shm_event_loop.h b/aos/events/shm_event_loop.h
index d10989e..bb4ad77 100644
--- a/aos/events/shm_event_loop.h
+++ b/aos/events/shm_event_loop.h
@@ -71,6 +71,8 @@
int priority() const override { return priority_; }
+ internal::EPoll *epoll() { return &epoll_; }
+
private:
friend class internal::WatcherState;
friend class internal::TimerHandlerState;
diff --git a/aos/flatbuffers.h b/aos/flatbuffers.h
index 4e4520c..4388687 100644
--- a/aos/flatbuffers.h
+++ b/aos/flatbuffers.h
@@ -5,6 +5,7 @@
#include <string_view>
#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
namespace aos {
@@ -31,15 +32,19 @@
// This class is a fixed memory allocator which holds the data for a flatbuffer
// in an array.
-template <size_t S>
class FixedAllocator : public FixedAllocatorBase {
public:
+ FixedAllocator(size_t size) : buffer_(size, 0) {}
+
uint8_t *data() override { return &buffer_[0]; }
const uint8_t *data() const override { return &buffer_[0]; }
size_t size() const override { return buffer_.size(); }
+ // Releases the data in the buffer.
+ std::vector<uint8_t> release() { return std::move(buffer_); }
+
private:
- std::array<uint8_t, S> buffer_;
+ std::vector<uint8_t> buffer_;
};
// This class adapts a preallocated memory region to an Allocator.
@@ -82,46 +87,6 @@
virtual size_t size() const = 0;
};
-// Array backed flatbuffer.
-template <typename T>
-class FlatbufferArray : public Flatbuffer<T> {
- public:
- // Builds a Flatbuffer by copying the data from the other flatbuffer.
- FlatbufferArray(const Flatbuffer<T> &other) {
- CHECK_LE(other.size(), data_.size());
-
- memcpy(data_.data(), other.data(), other.size());
- size_ = other.size();
- }
-
- // Coppies the data from the other flatbuffer.
- FlatbufferArray &operator=(const Flatbuffer<T> &other) {
- CHECK_LE(other.size(), data_.size());
-
- memcpy(data_.data(), other.data(), other.size());
- size_ = other.size();
- return *this;
- }
-
- virtual ~FlatbufferArray() override {}
-
- // Creates a builder wrapping the underlying data.
- flatbuffers::FlatBufferBuilder FlatBufferBuilder() {
- data_.deallocate(data_.data(), data_.size());
- flatbuffers::FlatBufferBuilder fbb(data_.size(), &data_);
- fbb.ForceDefaults(1);
- return fbb;
- }
-
- const uint8_t *data() const override { return data_.data(); }
- uint8_t *data() override { return data_.data(); }
- size_t size() const override { return size_; }
-
- private:
- FixedAllocator<8 * 1024> data_;
- size_t size_ = data_.size();
-};
-
// String backed flatbuffer.
template <typename T>
class FlatbufferString : public Flatbuffer<T> {
@@ -228,6 +193,48 @@
flatbuffers::DetachedBuffer buffer_;
};
+// This object associates the message type with the memory storing the
+// flatbuffer. This only stores root tables.
+//
+// From a usage point of view, pointers to the data are very different than
+// pointers to the tables.
+template <typename T>
+class SizePrefixedFlatbufferDetachedBuffer final : public Flatbuffer<T> {
+ public:
+ // Builds a Flatbuffer by taking ownership of the buffer.
+ SizePrefixedFlatbufferDetachedBuffer(flatbuffers::DetachedBuffer &&buffer)
+ : buffer_(::std::move(buffer)) {
+ CHECK_GE(buffer_.size(), sizeof(flatbuffers::uoffset_t));
+ }
+
+ // Builds a flatbuffer by taking ownership of the buffer from the other
+ // flatbuffer.
+ SizePrefixedFlatbufferDetachedBuffer(
+ SizePrefixedFlatbufferDetachedBuffer &&fb)
+ : buffer_(::std::move(fb.buffer_)) {}
+ SizePrefixedFlatbufferDetachedBuffer &operator=(
+ SizePrefixedFlatbufferDetachedBuffer &&fb) {
+ ::std::swap(buffer_, fb.buffer_);
+ return *this;
+ }
+
+ virtual ~SizePrefixedFlatbufferDetachedBuffer() override {}
+
+ // Returns references to the buffer, and the data.
+ const flatbuffers::DetachedBuffer &buffer() const { return buffer_; }
+ const uint8_t *data() const override {
+ return buffer_.data() + sizeof(flatbuffers::uoffset_t);
+ }
+ uint8_t *data() override {
+ return buffer_.data() + sizeof(flatbuffers::uoffset_t);
+ }
+ size_t size() const override {
+ return buffer_.size() - sizeof(flatbuffers::uoffset_t);
+ }
+
+ private:
+ flatbuffers::DetachedBuffer buffer_;
+};
// TODO(austin): Need a way to get our hands on the max size. Can start with
// "large" for now.
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 5d96183..9d3b3c4 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -1,5 +1,36 @@
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("//aos:config.bzl", "aos_config")
+
package(default_visibility = ["//visibility:public"])
+flatbuffer_cc_library(
+ name = "connect_fbs",
+ srcs = ["connect.fbs"],
+ gen_reflections = 1,
+ includes = [
+ "//aos:configuration_fbs_includes",
+ ],
+)
+
+flatbuffer_cc_library(
+ name = "message_bridge_client_fbs",
+ srcs = ["message_bridge_client.fbs"],
+ gen_reflections = 1,
+ includes = [
+ ":message_bridge_server_fbs_includes",
+ "//aos:configuration_fbs_includes",
+ ],
+)
+
+flatbuffer_cc_library(
+ name = "message_bridge_server_fbs",
+ srcs = ["message_bridge_server.fbs"],
+ gen_reflections = 1,
+ includes = [
+ "//aos:configuration_fbs_includes",
+ ],
+)
+
cc_library(
name = "team_number",
srcs = [
@@ -25,3 +56,182 @@
"//aos/testing:googletest",
],
)
+
+cc_library(
+ name = "sctp_lib",
+ srcs = [
+ "sctp_lib.cc",
+ ],
+ hdrs = [
+ "sctp_lib.h",
+ ],
+ copts = [
+ # The casts required to read datastructures from sockets trip -Wcast-align.
+ "-Wno-cast-align",
+ ],
+ deps = [
+ "//aos:unique_malloc_ptr",
+ "//third_party/lksctp-tools:sctp",
+ "@com_github_google_glog//:glog",
+ ],
+)
+
+cc_library(
+ name = "sctp_server",
+ srcs = [
+ "sctp_server.cc",
+ ],
+ hdrs = [
+ "sctp_server.h",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ ],
+ deps = [
+ ":sctp_lib",
+ "//third_party/lksctp-tools:sctp",
+ ],
+)
+
+cc_library(
+ name = "message_bridge_protocol",
+ hdrs = [
+ "message_bridge_protocol.h",
+ ],
+)
+
+cc_library(
+ name = "message_bridge_server_lib",
+ srcs = [
+ "message_bridge_server_lib.cc",
+ ],
+ hdrs = [
+ "message_bridge_server_lib.h",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ ],
+ deps = [
+ ":connect_fbs",
+ ":message_bridge_protocol",
+ ":message_bridge_server_fbs",
+ ":sctp_lib",
+ ":sctp_server",
+ "//aos:unique_malloc_ptr",
+ "//aos/events:shm_event_loop",
+ "//aos/events/logging:logger",
+ "//third_party/lksctp-tools:sctp",
+ ],
+)
+
+cc_binary(
+ name = "message_bridge_server",
+ srcs = [
+ "message_bridge_server.cc",
+ ],
+ deps = [
+ ":message_bridge_server_lib",
+ "//aos:init",
+ "//aos:json_to_flatbuffer",
+ "//aos/events:shm_event_loop",
+ ],
+)
+
+cc_library(
+ name = "sctp_client",
+ srcs = [
+ "sctp_client.cc",
+ ],
+ hdrs = [
+ "sctp_client.h",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ ],
+ deps = [
+ ":sctp_lib",
+ "//third_party/lksctp-tools:sctp",
+ ],
+)
+
+cc_library(
+ name = "message_bridge_client_lib",
+ srcs = [
+ "message_bridge_client_lib.cc",
+ ],
+ hdrs = [
+ "message_bridge_client_lib.h",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ ],
+ deps = [
+ ":connect_fbs",
+ ":message_bridge_client_fbs",
+ ":message_bridge_protocol",
+ ":message_bridge_server_fbs",
+ ":sctp_client",
+ "//aos/events:shm_event_loop",
+ "//aos/events/logging:logger",
+ ],
+)
+
+cc_binary(
+ name = "message_bridge_client",
+ srcs = [
+ "message_bridge_client.cc",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ ],
+ deps = [
+ ":message_bridge_client_lib",
+ "//aos:init",
+ "//aos:json_to_flatbuffer",
+ "//aos/events:shm_event_loop",
+ ],
+)
+
+aos_config(
+ name = "message_bridge_test_common_config",
+ src = "message_bridge_test_common.json",
+ flatbuffers = [
+ "//aos/events:ping_fbs",
+ "//aos/events:pong_fbs",
+ "//aos/network:message_bridge_client_fbs",
+ "//aos/network:message_bridge_server_fbs",
+ ],
+ deps = ["//aos/events:config"],
+)
+
+aos_config(
+ name = "message_bridge_test_server_config",
+ src = "message_bridge_test_server.json",
+ deps = [":message_bridge_test_common_config"],
+)
+
+aos_config(
+ name = "message_bridge_test_client_config",
+ src = "message_bridge_test_client.json",
+ deps = [":message_bridge_test_common_config"],
+)
+
+cc_test(
+ name = "message_bridge_test",
+ srcs = [
+ "message_bridge_test.cc",
+ ],
+ data = [
+ ":message_bridge_test_client_config.json",
+ ":message_bridge_test_server_config.json",
+ ],
+ deps = [
+ ":message_bridge_client_lib",
+ ":message_bridge_server_lib",
+ "//aos:json_to_flatbuffer",
+ "//aos/events:ping_fbs",
+ "//aos/events:pong_fbs",
+ "//aos/events:shm_event_loop",
+ "//aos/testing:googletest",
+ ],
+)
diff --git a/aos/network/connect.fbs b/aos/network/connect.fbs
new file mode 100644
index 0000000..32893b8
--- /dev/null
+++ b/aos/network/connect.fbs
@@ -0,0 +1,13 @@
+include "aos/configuration.fbs";
+
+namespace aos.message_bridge;
+
+// This is the message sent to initiate a connection to a message_bridge.
+// It communicates the channels that need to be forwarded back.
+table Connect {
+ // The node making the request.
+ node:aos.Node;
+
+ // The channels that we want transfered to this client.
+ channels_to_transfer:[Channel];
+}
diff --git a/aos/network/message_bridge_client.cc b/aos/network/message_bridge_client.cc
new file mode 100644
index 0000000..c44eee0
--- /dev/null
+++ b/aos/network/message_bridge_client.cc
@@ -0,0 +1,36 @@
+#include "aos/network/message_bridge_client_lib.h"
+
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+
+DEFINE_string(config, "multinode_pingpong_config.json", "Path to the config.");
+
+namespace aos {
+namespace message_bridge {
+
+int Main() {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(FLAGS_config);
+
+ aos::ShmEventLoop event_loop(&config.message());
+
+ MessageBridgeClient app(&event_loop);
+
+ // TODO(austin): Save messages into a vector to be logged. One file per
+ // channel? Need to sort out ordering.
+ //
+ // TODO(austin): Low priority, "reliable" logging channel.
+
+ event_loop.Run();
+
+ return EXIT_SUCCESS;
+}
+
+} // namespace message_bridge
+} // namespace aos
+
+int main(int argc, char **argv) {
+ aos::InitGoogle(&argc, &argv);
+
+ return aos::message_bridge::Main();
+}
diff --git a/aos/network/message_bridge_client.fbs b/aos/network/message_bridge_client.fbs
new file mode 100644
index 0000000..58b653a
--- /dev/null
+++ b/aos/network/message_bridge_client.fbs
@@ -0,0 +1,24 @@
+include "aos/network/message_bridge_server.fbs";
+
+namespace aos.message_bridge;
+
+// Statistics from a single client connection to a server.
+table ClientConnection {
+ // The node that we are connected to.
+ node:Node;
+
+ // Health of this connection. Connected or not?
+ state:State;
+
+ // Number of packets received on all channels.
+ received_packets:uint;
+
+ // TODO(austin): Per channel counts?
+}
+
+// Statistics for all clients.
+table ClientStatistics {
+ connections:[ClientConnection];
+}
+
+root_type ClientStatistics;
diff --git a/aos/network/message_bridge_client_lib.cc b/aos/network/message_bridge_client_lib.cc
new file mode 100644
index 0000000..3652db2
--- /dev/null
+++ b/aos/network/message_bridge_client_lib.cc
@@ -0,0 +1,361 @@
+#include "aos/network/message_bridge_client_lib.h"
+
+#include <chrono>
+#include <string_view>
+
+#include "aos/events/logging/logger.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/message_bridge_protocol.h"
+#include "aos/network/sctp_client.h"
+#include "aos/unique_malloc_ptr.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+// This application receives messages from another node and re-publishes them on
+// this node.
+//
+// To simulate packet loss for testing, run:
+// tc qdisc add dev eth0 root netem loss random 10
+// To restore it, run:
+// tc qdisc del dev eth0 root netem
+
+namespace aos {
+namespace message_bridge {
+namespace {
+namespace chrono = std::chrono;
+
+aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect> MakeConnectMessage(
+ const Configuration *config, const Node *my_node,
+ std::string_view remote_name) {
+ CHECK(config->has_nodes()) << ": Config must have nodes to transfer.";
+
+ flatbuffers::FlatBufferBuilder fbb;
+
+ flatbuffers::Offset<Node> node_offset = CopyFlatBuffer<Node>(my_node, &fbb);
+ const std::string_view node_name = my_node->name()->string_view();
+
+ std::vector<flatbuffers::Offset<Channel>> channel_offsets;
+ for (const Channel *channel : *config->channels()) {
+ if (channel->has_destination_nodes()) {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (connection->name()->string_view() == node_name &&
+ channel->source_node()->string_view() == remote_name) {
+ channel_offsets.emplace_back(CopyFlatBuffer<Channel>(channel, &fbb));
+ }
+ }
+ }
+ }
+
+ flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<Channel>>>
+ channels_offset = fbb.CreateVector(channel_offsets);
+
+ Connect::Builder connect_builder(fbb);
+ connect_builder.add_channels_to_transfer(channels_offset);
+ connect_builder.add_node(node_offset);
+ fbb.Finish(connect_builder.Finish());
+
+ return fbb.Release();
+}
+
+std::vector<int> StreamToChannel(const Configuration *config,
+ const Node *my_node, const Node *other_node) {
+ std::vector<int> stream_to_channel;
+ int channel_index = 0;
+ for (const Channel *channel : *config->channels()) {
+ if (configuration::ChannelIsSendableOnNode(channel, other_node)) {
+ const Connection *connection =
+ configuration::ConnectionToNode(channel, my_node);
+ if (connection != nullptr) {
+ stream_to_channel.emplace_back(channel_index);
+ }
+ }
+ ++channel_index;
+ }
+
+ return stream_to_channel;
+}
+
+std::vector<bool> StreamReplyWithTimestamp(const Configuration *config,
+ const Node *my_node,
+ const Node *other_node) {
+ std::vector<bool> stream_reply_with_timestamp;
+ int channel_index = 0;
+ for (const Channel *channel : *config->channels()) {
+ if (configuration::ChannelIsSendableOnNode(channel, other_node)) {
+ const Connection *connection =
+ configuration::ConnectionToNode(channel, my_node);
+ if (connection != nullptr) {
+ // We want to reply with a timestamp if the other node is logging the
+ // timestamp (and it therefore needs the timestamp), or if we are
+ // logging the message and it needs to know if we received it so it can
+ // log (in the future) it through different mechanisms on failure.
+ stream_reply_with_timestamp.emplace_back(
+ configuration::ConnectionDeliveryTimeIsLoggedOnNode(connection,
+ other_node) ||
+ configuration::ChannelMessageIsLoggedOnNode(channel, my_node));
+ }
+ }
+ ++channel_index;
+ }
+
+ return stream_reply_with_timestamp;
+}
+
+aos::FlatbufferDetachedBuffer<aos::logger::MessageHeader>
+MakeMessageHeaderReply() {
+ flatbuffers::FlatBufferBuilder fbb;
+ logger::MessageHeader::Builder message_header_builder(fbb);
+ message_header_builder.add_channel_index(0);
+ message_header_builder.add_monotonic_sent_time(0);
+ message_header_builder.add_monotonic_remote_time(0);
+ message_header_builder.add_realtime_remote_time(0);
+ message_header_builder.add_remote_queue_index(0);
+ fbb.Finish(message_header_builder.Finish());
+
+ return fbb.Release();
+}
+
+FlatbufferDetachedBuffer<ClientStatistics> MakeClientStatistics(
+ const std::vector<std::string_view> &source_node_names,
+ const Configuration *configuration) {
+ flatbuffers::FlatBufferBuilder fbb;
+
+ std::vector<flatbuffers::Offset<ClientConnection>> connection_offsets;
+ for (const std::string_view node_name : source_node_names) {
+ flatbuffers::Offset<Node> node_offset =
+ CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+ ClientConnection::Builder connection_builder(fbb);
+ connection_builder.add_node(node_offset);
+ connection_builder.add_state(State::DISCONNECTED);
+ // TODO(austin): Track dropped packets.
+ connection_builder.add_received_packets(0);
+ connection_offsets.emplace_back(connection_builder.Finish());
+ }
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<ClientConnection>>>
+ connections_offset = fbb.CreateVector(connection_offsets);
+
+ ClientStatistics::Builder client_statistics_builder(fbb);
+ client_statistics_builder.add_connections(connections_offset);
+ fbb.Finish(client_statistics_builder.Finish());
+
+ return fbb.Release();
+}
+
+} // namespace
+
+SctpClientConnection::SctpClientConnection(
+ aos::ShmEventLoop *const event_loop, std::string_view remote_name,
+ const Node *my_node, std::string_view local_host,
+ std::vector<std::unique_ptr<aos::RawSender>> *channels,
+ ClientConnection *connection)
+ : event_loop_(event_loop),
+ connect_message_(MakeConnectMessage(event_loop->configuration(), my_node,
+ remote_name)),
+ message_reception_reply_(MakeMessageHeaderReply()),
+ remote_node_(CHECK_NOTNULL(
+ configuration::GetNode(event_loop->configuration(), remote_name))),
+ client_(remote_node_->hostname()->string_view(), remote_node_->port(),
+ connect_message_.message().channels_to_transfer()->size() +
+ kControlStreams(),
+ local_host, 0),
+ channels_(channels),
+ stream_to_channel_(
+ StreamToChannel(event_loop->configuration(), my_node, remote_node_)),
+ stream_reply_with_timestamp_(StreamReplyWithTimestamp(
+ event_loop->configuration(), my_node, remote_node_)),
+ connection_(connection) {
+ VLOG(1) << "Connect request for " << remote_node_->name()->string_view()
+ << ": " << FlatbufferToJson(connect_message_);
+
+ connect_timer_ = event_loop_->AddTimer([this]() { SendConnect(); });
+ event_loop_->OnRun(
+ [this]() { connect_timer_->Setup(event_loop_->monotonic_now()); });
+
+ event_loop_->epoll()->OnReadable(client_.fd(),
+ [this]() { MessageReceived(); });
+}
+
+void SctpClientConnection::MessageReceived() {
+ // Dispatch the message to the correct receiver.
+ aos::unique_c_ptr<Message> message = client_.Read();
+
+ if (message->message_type == Message::kNotification) {
+ const union sctp_notification *snp =
+ (const union sctp_notification *)message->data();
+
+ switch (snp->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE: {
+ const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+ switch (sac->sac_state) {
+ case SCTP_COMM_UP:
+ NodeConnected(sac->sac_assoc_id);
+
+ VLOG(1) << "Received up from " << message->PeerAddress() << " on "
+ << sac->sac_assoc_id;
+ break;
+ case SCTP_COMM_LOST:
+ case SCTP_SHUTDOWN_COMP:
+ case SCTP_CANT_STR_ASSOC: {
+ NodeDisconnected();
+ } break;
+ case SCTP_RESTART:
+ LOG(FATAL) << "Never seen this before.";
+ break;
+ }
+ } break;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ PrintNotification(message.get());
+ }
+ } else if (message->message_type == Message::kMessage) {
+ HandleData(message.get());
+ }
+}
+
+void SctpClientConnection::SendConnect() {
+ // Try to send the connect message. If that fails, retry.
+ if (!client_.Send(kConnectStream(),
+ std::string_view(
+ reinterpret_cast<const char *>(connect_message_.data()),
+ connect_message_.size()),
+ 0)) {
+ NodeDisconnected();
+ }
+}
+
+void SctpClientConnection::NodeConnected(sctp_assoc_t assoc_id) {
+ connect_timer_->Disable();
+
+ // We want to tell the kernel to schedule the packets on this new stream with
+ // the priority scheduler. This only needs to be done once per stream.
+ client_.SetPriorityScheduler(assoc_id);
+
+ remote_assoc_id_ = assoc_id;
+ connection_->mutate_state(State::CONNECTED);
+}
+
+void SctpClientConnection::NodeDisconnected() {
+ connect_timer_->Setup(
+ event_loop_->monotonic_now() + chrono::milliseconds(100),
+ chrono::milliseconds(100));
+ remote_assoc_id_ = 0;
+ connection_->mutate_state(State::DISCONNECTED);
+}
+
+void SctpClientConnection::HandleData(const Message *message) {
+ const logger::MessageHeader *message_header =
+ flatbuffers::GetSizePrefixedRoot<logger::MessageHeader>(message->data());
+
+ connection_->mutate_received_packets(connection_->received_packets() + 1);
+
+ const int stream = message->header.rcvinfo.rcv_sid - kControlStreams();
+
+ // Publish the message.
+ RawSender *sender = (*channels_)[stream_to_channel_[stream]].get();
+ sender->Send(message_header->data()->data(), message_header->data()->size(),
+ aos::monotonic_clock::time_point(
+ chrono::nanoseconds(message_header->monotonic_sent_time())),
+ aos::realtime_clock::time_point(
+ chrono::nanoseconds(message_header->realtime_sent_time())),
+ message_header->queue_index());
+
+ if (stream_reply_with_timestamp_[stream]) {
+ // TODO(austin): Send back less if we are only acking. Maybe only a
+ // stream id? Nothing if we are only forwarding?
+
+ // Now fill out the message received reply. This uses a MessageHeader
+ // container so it can be directly logged.
+ message_reception_reply_.mutable_message()->mutate_channel_index(
+ message_header->channel_index());
+ message_reception_reply_.mutable_message()->mutate_monotonic_sent_time(
+ message_header->monotonic_sent_time());
+
+ // And capture the relevant data needed to generate the forwarding
+ // MessageHeader.
+ message_reception_reply_.mutable_message()->mutate_monotonic_remote_time(
+ sender->monotonic_sent_time().time_since_epoch().count());
+ message_reception_reply_.mutable_message()->mutate_realtime_remote_time(
+ sender->realtime_sent_time().time_since_epoch().count());
+ message_reception_reply_.mutable_message()->mutate_remote_queue_index(
+ sender->sent_queue_index());
+
+ // Unique ID is channel_index and monotonic clock.
+ // TODO(austin): Depending on if we are the logger node or not, we need to
+ // guarentee that this ack gets received too... Same path as the logger.
+ client_.Send(kTimestampStream(),
+ std::string_view(reinterpret_cast<const char *>(
+ message_reception_reply_.data()),
+ message_reception_reply_.size()),
+ 0);
+ }
+
+ VLOG(1) << "Received data of length " << message->size << " from "
+ << message->PeerAddress();
+
+ if (VLOG_IS_ON(1)) {
+ client_.LogSctpStatus(message->header.rcvinfo.rcv_assoc_id);
+ }
+
+ VLOG(2) << "\tSNDRCV (stream=" << message->header.rcvinfo.rcv_sid
+ << " ssn=" << message->header.rcvinfo.rcv_ssn
+ << " tsn=" << message->header.rcvinfo.rcv_tsn << " flags=0x"
+ << std::hex << message->header.rcvinfo.rcv_flags << std::dec
+ << " ppid=" << message->header.rcvinfo.rcv_ppid
+ << " cumtsn=" << message->header.rcvinfo.rcv_cumtsn << ")";
+}
+
+MessageBridgeClient::MessageBridgeClient(aos::ShmEventLoop *event_loop)
+ : event_loop_(event_loop),
+ sender_(event_loop_->MakeSender<ClientStatistics>("/aos")),
+ source_node_names_(configuration::SourceNodeNames(
+ event_loop->configuration(), event_loop->node())),
+ statistics_(MakeClientStatistics(source_node_names_,
+ event_loop->configuration())) {
+ std::string_view node_name = event_loop->node()->name()->string_view();
+
+ // Find all the channels which are supposed to be delivered to us.
+ channels_.resize(event_loop_->configuration()->channels()->size());
+ int channel_index = 0;
+ for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ if (channel->has_destination_nodes()) {
+ for (const Connection *connection : *channel->destination_nodes()) {
+ if (connection->name()->string_view() == node_name) {
+ // Give the config a chance to remap us. This helps with testing on a
+ // single node.
+ const Channel *mapped_channel = configuration::GetChannel(
+ event_loop_->configuration(), channel->name()->string_view(),
+ channel->type()->string_view(), event_loop_->name(),
+ event_loop_->node());
+ channels_[channel_index] = event_loop_->MakeRawSender(mapped_channel);
+ break;
+ }
+ }
+ }
+ ++channel_index;
+ }
+
+ // Now, for each source node, build a connection.
+ int node_index = 0;
+ for (const std::string_view source_node : source_node_names_) {
+ // Open an unspecified connection (:: in ipv6 terminology)
+ connections_.emplace_back(new SctpClientConnection(
+ event_loop, source_node, event_loop->node(), "::", &channels_,
+ statistics_.mutable_message()->mutable_connections()->GetMutableObject(
+ node_index)));
+ ++node_index;
+ }
+
+ // And kick it all off.
+ statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+ event_loop_->OnRun([this]() {
+ statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
+ chrono::seconds(1));
+ });
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/message_bridge_client_lib.h b/aos/network/message_bridge_client_lib.h
new file mode 100644
index 0000000..77bf2b7
--- /dev/null
+++ b/aos/network/message_bridge_client_lib.h
@@ -0,0 +1,116 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
+
+#include <string_view>
+
+#include "aos/events/event_loop.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_client_generated.h"
+#include "aos/network/sctp_client.h"
+#include "aos/network/sctp_lib.h"
+
+namespace aos {
+namespace message_bridge {
+
+// See message_bridge_protocol.h for more details about the protocol.
+
+// This class encapsulates all the state required to connect to a server and
+// transmit messages.
+class SctpClientConnection {
+ public:
+ SctpClientConnection(aos::ShmEventLoop *const event_loop,
+ std::string_view remote_name, const Node *my_node,
+ std::string_view local_host,
+ std::vector<std::unique_ptr<aos::RawSender>> *channels,
+ ClientConnection *connection);
+
+ ~SctpClientConnection() { event_loop_->epoll()->DeleteFd(client_.fd()); }
+
+ private:
+ // Reads a message from the socket. Could be a notification.
+ void MessageReceived();
+
+ // Sends a connection request message.
+ void SendConnect();
+
+ // Called when the server connection succeeds.
+ void NodeConnected(sctp_assoc_t assoc_id);
+ // Called when the server connection disconnects.
+ void NodeDisconnected();
+ void HandleData(const Message *message);
+
+ // Event loop to register the server on.
+ aos::ShmEventLoop *const event_loop_;
+
+ // Message to send on connect.
+ const aos::FlatbufferDetachedBuffer<aos::message_bridge::Connect>
+ connect_message_;
+
+ // Starting point for the message reception reply (including timestamps).
+ aos::FlatbufferDetachedBuffer<aos::logger::MessageHeader>
+ message_reception_reply_;
+
+ // Node we are sending to.
+ const aos::Node *const remote_node_;
+
+ // SCTP client. There is a client per connection so we don't have to deal
+ // with association ids nearly as badly.
+ SctpClient client_;
+
+ // Channels to send received messages on.
+ std::vector<std::unique_ptr<aos::RawSender>> *channels_;
+ // Stream number -> channel lookup.
+ std::vector<int> stream_to_channel_;
+ // Bitmask signaling if we should be replying back with delivery times.
+ std::vector<bool> stream_reply_with_timestamp_;
+
+ // Timer which fires to handle reconnections.
+ aos::TimerHandler *connect_timer_;
+
+ // ClientConnection statistics message to modify. This will be published
+ // periodicially.
+ ClientConnection *connection_;
+
+ // id of the server once known. This is only valid if connection_ says
+ // connected.
+ sctp_assoc_t remote_assoc_id_ = 0;
+};
+
+// This encapsulates the state required to talk to *all* the servers from this
+// node.
+class MessageBridgeClient {
+ public:
+ MessageBridgeClient(aos::ShmEventLoop *event_loop);
+
+ ~MessageBridgeClient() {}
+
+ private:
+ // Sends out the statistics that are continually updated by the
+ // SctpClientConnections.
+ void SendStatistics() { sender_.Send(statistics_); }
+
+ // Event loop to schedule everything on.
+ aos::ShmEventLoop *event_loop_;
+ // Sender to publish statistics on.
+ aos::Sender<ClientStatistics> sender_;
+ aos::TimerHandler *statistics_timer_;
+
+ // Nodes to receive data from.
+ const std::vector<std::string_view> source_node_names_;
+
+ // Data to publish.
+ FlatbufferDetachedBuffer<ClientStatistics> statistics_;
+
+ // Channels to send data over.
+ std::vector<std::unique_ptr<aos::RawSender>> channels_;
+
+ // List of connections. These correspond to the nodes in source_node_names_
+ std::vector<std::unique_ptr<SctpClientConnection>> connections_;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_MESSAGE_BRIDGE_CLIENT_LIB_H_
diff --git a/aos/network/message_bridge_protocol.h b/aos/network/message_bridge_protocol.h
new file mode 100644
index 0000000..1136188
--- /dev/null
+++ b/aos/network/message_bridge_protocol.h
@@ -0,0 +1,31 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
+
+namespace aos {
+namespace message_bridge {
+
+// The protocol between the message_bridge_client and server is pretty simple.
+// The overarching design philosophy is that the server sends data to the
+// client, and the client (optionally) sends timestamps back.
+//
+// 1) A connection is established by the client sending the server a Connect
+// flatbuffer on stream 0.
+// 2) The server then replies with the data, as it is available, on streams 2 +
+// channel_id in the Connect message.
+// 3) The client (optionally) replies on stream 1 with MessageHeader flatbuffers
+// with the timestamps that the messages were received.
+//
+// Most of the complexity from there is handling multiple clients and servers
+// and persuading SCTP to do what we want.
+
+// Number of streams reserved for control messages.
+constexpr size_t kControlStreams() { return 2; }
+// The stream on which Connect messages are sent.
+constexpr size_t kConnectStream() { return 0; }
+// The stream on which timestamp replies are sent.
+constexpr size_t kTimestampStream() { return 1; }
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_MESSAGE_BRIDGE_PROTOCOL_H_
diff --git a/aos/network/message_bridge_server.cc b/aos/network/message_bridge_server.cc
new file mode 100644
index 0000000..fa5e7c1
--- /dev/null
+++ b/aos/network/message_bridge_server.cc
@@ -0,0 +1,35 @@
+#include "aos/events/shm_event_loop.h"
+#include "aos/init.h"
+#include "aos/network/message_bridge_server_lib.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+DEFINE_string(config, "multinode_pingpong_config.json", "Path to the config.");
+
+namespace aos {
+namespace message_bridge {
+
+int Main() {
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(FLAGS_config);
+
+ aos::ShmEventLoop event_loop(&config.message());
+
+ MessageBridgeServer app(&event_loop);
+
+ // TODO(austin): Track which messages didn't make it in time and need to be
+ // logged locally and forwarded.
+
+ event_loop.Run();
+
+ return EXIT_SUCCESS;
+}
+
+} // namespace message_bridge
+} // namespace aos
+
+int main(int argc, char **argv) {
+ aos::InitGoogle(&argc, &argv);
+
+ return aos::message_bridge::Main();
+}
diff --git a/aos/network/message_bridge_server.fbs b/aos/network/message_bridge_server.fbs
new file mode 100644
index 0000000..75a6a15
--- /dev/null
+++ b/aos/network/message_bridge_server.fbs
@@ -0,0 +1,33 @@
+include "aos/configuration.fbs";
+
+namespace aos.message_bridge;
+
+// State of the connection.
+enum State: ubyte {
+ CONNECTED,
+ DISCONNECTED,
+}
+
+// Statistics from a single connection to a client from this server.
+table ServerConnection {
+ // The node that we are connected to.
+ node:Node;
+
+ // Health of this connection. Connected or not?
+ state:State;
+
+ // Number of packets that have been dropped (if known).
+ dropped_packets:uint;
+
+ // Number of packets received on all channels.
+ sent_packets:uint;
+
+ // TODO(austin): Per channel counts?
+}
+
+// Statistics for all connections to all the clients.
+table ServerStatistics {
+ connections:[ServerConnection];
+}
+
+root_type ServerStatistics;
diff --git a/aos/network/message_bridge_server_lib.cc b/aos/network/message_bridge_server_lib.cc
new file mode 100644
index 0000000..38958ba
--- /dev/null
+++ b/aos/network/message_bridge_server_lib.cc
@@ -0,0 +1,367 @@
+#include "aos/network/message_bridge_server_lib.h"
+
+#include "absl/types/span.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_protocol.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/sctp_server.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+namespace {
+
+namespace chrono = std::chrono;
+
+// Builds up the "empty" server statistics message to be pointed to by all the
+// connections, updated at runtime, and periodically sent.
+FlatbufferDetachedBuffer<ServerStatistics> MakeServerStatistics(
+ const std::vector<std::string_view> &source_node_names,
+ const Configuration *configuration) {
+ flatbuffers::FlatBufferBuilder fbb;
+
+ std::vector<flatbuffers::Offset<ServerConnection>> connection_offsets;
+ for (const std::string_view node_name : source_node_names) {
+ flatbuffers::Offset<Node> node_offset =
+ CopyFlatBuffer(configuration::GetNode(configuration, node_name), &fbb);
+ ServerConnection::Builder connection_builder(fbb);
+ connection_builder.add_node(node_offset);
+ connection_builder.add_state(State::DISCONNECTED);
+ connection_builder.add_dropped_packets(0);
+ connection_builder.add_sent_packets(0);
+ connection_offsets.emplace_back(connection_builder.Finish());
+ }
+ flatbuffers::Offset<
+ flatbuffers::Vector<flatbuffers::Offset<ServerConnection>>>
+ connections_offset = fbb.CreateVector(connection_offsets);
+
+ ServerStatistics::Builder server_statistics_builder(fbb);
+ server_statistics_builder.add_connections(connections_offset);
+ fbb.Finish(server_statistics_builder.Finish());
+
+ return fbb.Release();
+}
+
+// Finds the statistics for the provided node name.
+ServerConnection *FindServerConnection(ServerStatistics *statistics,
+ std::string_view node_name) {
+ ServerConnection *matching_server_connection = nullptr;
+ for (size_t i = 0; i < statistics->mutable_connections()->size(); ++i) {
+ ServerConnection *server_connection =
+ statistics->mutable_connections()->GetMutableObject(i);
+ if (server_connection->node()->name()->string_view() == node_name) {
+ matching_server_connection = server_connection;
+ break;
+ }
+ }
+
+ CHECK(matching_server_connection != nullptr) << ": Unknown client";
+
+ return matching_server_connection;
+}
+
+} // namespace
+
+bool ChannelState::Matches(const Channel *other_channel) {
+ // Confirm the normal tuple, plus make sure that the other side isn't going to
+ // send more data over than we expect with a mismatching size.
+ return (
+ channel_->name()->string_view() == other_channel->name()->string_view() &&
+ channel_->type()->string_view() == other_channel->type()->string_view() &&
+ channel_->max_size() == other_channel->max_size());
+}
+
+void ChannelState::SendData(SctpServer *server, const Context &context) {
+ // TODO(austin): I don't like allocating this buffer when we are just freeing
+ // it at the end of the function.
+ flatbuffers::FlatBufferBuilder fbb(channel_->max_size() + 100);
+ VLOG(1) << "Found " << peers_.size() << " peers on channel "
+ << channel_->name()->string_view() << " size " << context.size;
+
+ // TODO(austin): Use an iovec to build it up in 3 parts to avoid the copy?
+ // Only useful when not logging.
+ fbb.FinishSizePrefixed(logger::PackMessage(&fbb, context, channel_index_,
+ logger::LogType::kLogMessage));
+
+ // TODO(austin): Track which connections need to be reliable and handle
+ // resending properly.
+ size_t sent_count = 0;
+ bool logged_remotely = false;
+ for (Peer &peer : peers_) {
+ logged_remotely = logged_remotely || peer.logged_remotely;
+
+ if (peer.sac_assoc_id != 0) {
+ server->Send(std::string_view(
+ reinterpret_cast<const char *>(fbb.GetBufferPointer()),
+ fbb.GetSize()),
+ peer.sac_assoc_id, peer.stream,
+ peer.connection->time_to_live() / 1000000);
+ peer.server_connection_statistics->mutate_sent_packets(
+ peer.server_connection_statistics->sent_packets() + 1);
+ if (peer.logged_remotely) {
+ ++sent_count;
+ }
+ } else {
+ peer.server_connection_statistics->mutate_dropped_packets(
+ peer.server_connection_statistics->dropped_packets() + 1);
+ }
+ }
+
+ if (logged_remotely) {
+ if (sent_count == 0) {
+ VLOG(1) << "No clients, rejecting";
+ HandleFailure(fbb.Release());
+ } else {
+ sent_messages_.emplace_back(fbb.Release());
+ }
+ } else {
+ VLOG(1) << "Not bothering to track this message since nobody cares.";
+ }
+
+ // TODO(austin): Limit the size of this queue. Flush messages to disk
+ // which are too old. We really care about messages which didn't make it
+ // to a logger... Which is a new concept or two.
+
+ // Need to handle logging and disk in another thread. Need other thread
+ // since we sometimes want to skip disk, and synchronization is easier.
+ // This thread then spins on the queue until empty, then polls at 1-10 hz.
+
+ // TODO(austin): ~10 MB chunks on disk and push them over the logging
+ // channel? Threadsafe disk backed queue object which can handle restarts
+ // and flushes. Whee.
+}
+
+void ChannelState::HandleDelivery(sctp_assoc_t /*rcv_assoc_id*/,
+ uint16_t /*ssn*/,
+ absl::Span<const uint8_t> data) {
+ const logger::MessageHeader *message_header =
+ flatbuffers::GetRoot<logger::MessageHeader>(data.data());
+ while (sent_messages_.size() > 0u) {
+ if (sent_messages_.begin()->message().monotonic_sent_time() ==
+ message_header->monotonic_sent_time()) {
+ sent_messages_.pop_front();
+ continue;
+ }
+
+ if (sent_messages_.begin()->message().monotonic_sent_time() <
+ message_header->monotonic_sent_time()) {
+ VLOG(1) << "Delivery looks wrong, rejecting";
+ HandleFailure(std::move(sent_messages_.front()));
+ sent_messages_.pop_front();
+ continue;
+ }
+
+ break;
+ }
+}
+
+void ChannelState::HandleFailure(
+ SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message) {
+ // TODO(austin): Put it in the log queue.
+ LOG(INFO) << "Failed to send " << FlatbufferToJson(message);
+
+ // Note: this may be really out of order when we avoid the queue... We
+ // have the ones we know didn't make it immediately, and the ones which
+ // time out eventually. Need to sort that out.
+}
+
+void ChannelState::AddPeer(const Connection *connection,
+ ServerConnection *server_connection_statistics,
+ bool logged_remotely) {
+ peers_.emplace_back(0, 0, connection, server_connection_statistics,
+ logged_remotely);
+}
+
+void ChannelState::NodeDisconnected(sctp_assoc_t assoc_id) {
+ for (ChannelState::Peer &peer : peers_) {
+ if (peer.sac_assoc_id == assoc_id) {
+ // TODO(austin): This will not handle multiple clients from
+ // a single node. But that should be rare.
+ peer.server_connection_statistics->mutate_state(State::DISCONNECTED);
+ peer.sac_assoc_id = 0;
+ peer.stream = 0;
+ break;
+ }
+ }
+}
+
+void ChannelState::NodeConnected(const Node *node, sctp_assoc_t assoc_id,
+ int stream, SctpServer *server) {
+ for (ChannelState::Peer &peer : peers_) {
+ if (peer.connection->name()->string_view() == node->name()->string_view()) {
+ peer.sac_assoc_id = assoc_id;
+ peer.stream = stream;
+ peer.server_connection_statistics->mutate_state(State::CONNECTED);
+ server->SetStreamPriority(assoc_id, stream, peer.connection->priority());
+
+ break;
+ }
+ }
+}
+
+MessageBridgeServer::MessageBridgeServer(aos::ShmEventLoop *event_loop)
+ : event_loop_(event_loop),
+ sender_(event_loop_->MakeSender<ServerStatistics>("/aos")),
+ statistics_(MakeServerStatistics(
+ configuration::DestinationNodeNames(event_loop->configuration(),
+ event_loop->node()),
+ event_loop->configuration())),
+ server_("::", event_loop->node()->port()) {
+ CHECK(event_loop_->node() != nullptr) << ": No nodes configured.";
+
+ // TODO(austin): Time sync. sctp gives us filtered round trip time, not
+ // target time.
+
+ // TODO(austin): Logging synchronization.
+ //
+ // TODO(austin): How do we handle parameter channels? The oldest value
+ // needs to be sent regardless on connection (though probably only if it has
+ // changed).
+ event_loop_->epoll()->OnReadable(server_.fd(),
+ [this]() { MessageReceived(); });
+
+ LOG(INFO) << "Hostname: " << event_loop_->node()->hostname()->string_view();
+
+ int channel_index = 0;
+ for (const Channel *channel : *event_loop_->configuration()->channels()) {
+ CHECK(channel->has_source_node());
+ if (channel->source_node()->string_view() ==
+ event_loop_->node()->name()->string_view() &&
+ channel->has_destination_nodes()) {
+ std::unique_ptr<ChannelState> state(
+ new ChannelState{channel, channel_index});
+
+ for (const Connection *connection : *channel->destination_nodes()) {
+ const Node *other_node = configuration::GetNode(
+ event_loop_->configuration(), connection->name()->string_view());
+ state->AddPeer(
+ connection,
+ FindServerConnection(statistics_.mutable_message(),
+ connection->name()->string_view()),
+ configuration::ChannelMessageIsLoggedOnNode(channel, other_node));
+ }
+
+ // Call SendData for every message.
+ ChannelState *state_ptr = state.get();
+ event_loop_->MakeRawWatcher(
+ channel,
+ [this, state_ptr](const Context &context, const void * /*message*/) {
+ state_ptr->SendData(&server_, context);
+ });
+ channels_.emplace_back(std::move(state));
+ } else {
+ channels_.emplace_back(nullptr);
+ }
+ ++channel_index;
+ }
+
+ statistics_timer_ = event_loop_->AddTimer([this]() { SendStatistics(); });
+ event_loop_->OnRun([this]() {
+ statistics_timer_->Setup(event_loop_->monotonic_now() + chrono::seconds(1),
+ chrono::seconds(1));
+ });
+}
+
+void MessageBridgeServer::NodeConnected(sctp_assoc_t assoc_id) {
+ server_.SetPriorityScheduler(assoc_id);
+}
+
+void MessageBridgeServer::NodeDisconnected(sctp_assoc_t assoc_id) {
+ // Find any matching peers and remove them.
+ for (std::unique_ptr<ChannelState> &channel_state : channels_) {
+ if (channel_state.get() == nullptr) {
+ continue;
+ }
+
+ channel_state->NodeDisconnected(assoc_id);
+ }
+}
+
+void MessageBridgeServer::MessageReceived() {
+ aos::unique_c_ptr<Message> message = server_.Read();
+
+ if (message->message_type == Message::kNotification) {
+ const union sctp_notification *snp =
+ (const union sctp_notification *)message->data();
+
+ switch (snp->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE: {
+ const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+ switch (sac->sac_state) {
+ case SCTP_COMM_UP:
+ NodeConnected(sac->sac_assoc_id);
+ VLOG(1) << "Peer connected";
+ break;
+ case SCTP_COMM_LOST:
+ case SCTP_SHUTDOWN_COMP:
+ case SCTP_CANT_STR_ASSOC:
+ NodeDisconnected(sac->sac_assoc_id);
+ VLOG(1) << "Disconnect";
+ break;
+ case SCTP_RESTART:
+ LOG(FATAL) << "Never seen this before.";
+ break;
+ }
+ } break;
+ }
+
+ if (VLOG_IS_ON(1)) {
+ PrintNotification(message.get());
+ }
+ } else if (message->message_type == Message::kMessage) {
+ HandleData(message.get());
+ }
+}
+
+void MessageBridgeServer::HandleData(const Message *message) {
+ VLOG(1) << "Received data of length " << message->size;
+
+ if (message->header.rcvinfo.rcv_sid == kConnectStream()) {
+ // Control channel!
+ const Connect *connect = flatbuffers::GetRoot<Connect>(message->data());
+ VLOG(1) << FlatbufferToJson(connect);
+
+ // Account for the control channel and delivery times channel.
+ size_t channel_index = kControlStreams();
+ for (const Channel *channel : *connect->channels_to_transfer()) {
+ bool matched = false;
+ for (std::unique_ptr<ChannelState> &channel_state : channels_) {
+ if (channel_state.get() == nullptr) {
+ continue;
+ }
+ if (channel_state->Matches(channel)) {
+ channel_state->NodeConnected(connect->node(),
+ message->header.rcvinfo.rcv_assoc_id,
+ channel_index, &server_);
+
+ matched = true;
+ break;
+ }
+ }
+ if (!matched) {
+ LOG(ERROR) << "Remote tried registering for unknown channel "
+ << FlatbufferToJson(channel);
+ } else {
+ ++channel_index;
+ }
+ }
+ } else if (message->header.rcvinfo.rcv_sid == kTimestampStream()) {
+ // Message delivery
+ const logger::MessageHeader *message_header =
+ flatbuffers::GetRoot<logger::MessageHeader>(message->data());
+
+ channels_[message_header->channel_index()]->HandleDelivery(
+ message->header.rcvinfo.rcv_assoc_id, message->header.rcvinfo.rcv_ssn,
+ absl::Span<const uint8_t>(message->data(), message->size));
+ }
+
+ if (VLOG_IS_ON(1)) {
+ message->LogRcvInfo();
+ }
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/message_bridge_server_lib.h b/aos/network/message_bridge_server_lib.h
new file mode 100644
index 0000000..f202fc9
--- /dev/null
+++ b/aos/network/message_bridge_server_lib.h
@@ -0,0 +1,130 @@
+#ifndef AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
+#define AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
+
+#include <deque>
+
+#include "absl/types/span.h"
+#include "aos/events/logging/logger.h"
+#include "aos/events/logging/logger_generated.h"
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/message_bridge_server_generated.h"
+#include "aos/network/sctp_server.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// See message_bridge_protocol.h for more details about the protocol.
+
+// Class to encapsulate all the state per channel. This is the dispatcher for a
+// new message from the event loop.
+class ChannelState {
+ public:
+ ChannelState(const Channel *channel, int channel_index)
+ : channel_index_(channel_index), channel_(channel) {}
+
+ // Class to encapsulate all the state per client on a channel. A client may
+ // be subscribed to multiple channels.
+ struct Peer {
+ Peer(sctp_assoc_t new_sac_assoc_id, size_t new_stream,
+ const Connection *new_connection,
+ ServerConnection *new_server_connection_statistics,
+ bool new_logged_remotely)
+ : sac_assoc_id(new_sac_assoc_id),
+ stream(new_stream),
+ connection(new_connection),
+ server_connection_statistics(new_server_connection_statistics),
+ logged_remotely(new_logged_remotely) {}
+
+ // Valid if != 0.
+ sctp_assoc_t sac_assoc_id = 0;
+
+ size_t stream;
+ const aos::Connection *connection;
+ ServerConnection *server_connection_statistics;
+
+ // If true, this message will be logged on a receiving node. We need to
+ // keep it around to log it locally if that fails.
+ bool logged_remotely = false;
+ };
+
+ // Needs to be called when a node (might have) disconnected.
+ void NodeDisconnected(sctp_assoc_t assoc_id);
+ void NodeConnected(const Node *node, sctp_assoc_t assoc_id, int stream,
+ SctpServer *server);
+
+ // Adds a new peer.
+ void AddPeer(const Connection *connection,
+ ServerConnection *server_connection_statistics,
+ bool logged_remotely);
+
+ // Returns true if this channel has the same name and type as the other
+ // channel.
+ bool Matches(const Channel *other_channel);
+
+ // Sends the data in context using the provided server.
+ void SendData(SctpServer *server, const Context &context);
+
+ // Handles reception of delivery times.
+ void HandleDelivery(sctp_assoc_t rcv_assoc_id, uint16_t ssn,
+ absl::Span<const uint8_t> data);
+
+ // Handles (by consuming) failure to deliver a message.
+ void HandleFailure(
+ SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader> &&message);
+
+ private:
+ const int channel_index_;
+ const Channel *const channel_;
+
+ std::vector<Peer> peers_;
+
+ std::deque<SizePrefixedFlatbufferDetachedBuffer<logger::MessageHeader>>
+ sent_messages_;
+};
+
+// This encapsulates the state required to talk to *all* the clients from this
+// node. It handles the session and dispatches data to the ChannelState.
+class MessageBridgeServer {
+ public:
+ MessageBridgeServer(aos::ShmEventLoop *event_loop);
+
+ ~MessageBridgeServer() { event_loop_->epoll()->DeleteFd(server_.fd()); }
+
+ private:
+ // Reads a message from the socket. Could be a notification.
+ void MessageReceived();
+
+ // Called when the server connection succeeds.
+ void NodeConnected(sctp_assoc_t assoc_id);
+ // Called when the server connection disconnects.
+ void NodeDisconnected(sctp_assoc_t assoc_id);
+
+ // Called when data (either a connection request or delivery timestamps) is
+ // received.
+ void HandleData(const Message *message);
+
+ // Sends out the statistics that are continually updated by the
+ // ChannelState's.
+ void SendStatistics() { sender_.Send(statistics_); }
+
+ // Event loop to schedule everything on.
+ aos::ShmEventLoop *event_loop_;
+
+ // Statistics, timer, and associated sender.
+ aos::Sender<ServerStatistics> sender_;
+ aos::TimerHandler *statistics_timer_;
+ FlatbufferDetachedBuffer<ServerStatistics> statistics_;
+
+ SctpServer server_;
+
+ // List of channels. The entries that aren't sent from this node are left
+ // null.
+ std::vector<std::unique_ptr<ChannelState>> channels_;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_MESSAGE_BRIDGE_SERVER_LIB_H_
diff --git a/aos/network/message_bridge_test.cc b/aos/network/message_bridge_test.cc
new file mode 100644
index 0000000..383c1c4
--- /dev/null
+++ b/aos/network/message_bridge_test.cc
@@ -0,0 +1,154 @@
+#include "gtest/gtest.h"
+
+#include <chrono>
+#include <thread>
+
+#include "aos/events/ping_generated.h"
+#include "aos/events/pong_generated.h"
+#include "aos/network/message_bridge_client_lib.h"
+#include "aos/network/message_bridge_server_lib.h"
+
+DECLARE_string(override_hostname);
+DECLARE_string(application_name);
+
+namespace aos {
+namespace message_bridge {
+namespace testing {
+
+namespace chrono = std::chrono;
+
+// Test that we can send a ping message over sctp and receive it.
+TEST(MessageBridgeTest, PingPong) {
+ // This is rather annoying to set up. We need to start up a client and
+ // server, on the same node, but get them to think that they are on different
+ // nodes.
+ //
+ // We then get to wait until they are connected.
+ //
+ // After they are connected, we send a Ping message.
+ //
+ // On the other end, we receive a Pong message.
+ //
+ // But, we need the client to not post directly to "/test" like it would in a
+ // real system, otherwise we will re-send the ping message... So, use an
+ // application specific map to have the client post somewhere else.
+ //
+ // To top this all off, each of these needs to be done with a ShmEventLoop,
+ // which needs to run in a separate thread... And it is really hard to get
+ // everything started up reliably. So just be super generous on timeouts and
+ // hope for the best. We can be more generous in the future if we need to.
+ //
+ // We are faking the application names by passing in --application_name=foo
+ aos::FlatbufferDetachedBuffer<aos::Configuration> server_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_server_config.json");
+ aos::FlatbufferDetachedBuffer<aos::Configuration> client_config =
+ aos::configuration::ReadConfig(
+ "aos/network/message_bridge_test_client_config.json");
+
+ FLAGS_application_name = "pi1_message_bridge_server";
+ // Force ourselves to be "raspberrypi" and allocate everything.
+ FLAGS_override_hostname = "raspberrypi";
+ aos::ShmEventLoop server_event_loop(&server_config.message());
+ MessageBridgeServer message_bridge_server(&server_event_loop);
+
+ // And build the app which sends the pings.
+ FLAGS_application_name = "ping";
+ aos::ShmEventLoop ping_event_loop(&server_config.message());
+ aos::Sender<examples::Ping> ping_sender =
+ ping_event_loop.MakeSender<examples::Ping>("/test");
+
+ // Now do it for "raspberrypi2", the client.
+ FLAGS_application_name = "pi2_message_bridge_client";
+ FLAGS_override_hostname = "raspberrypi2";
+ aos::ShmEventLoop client_event_loop(&client_config.message());
+ MessageBridgeClient message_bridge_client(&client_event_loop);
+
+ // And build the app which sends the pongs.
+ FLAGS_application_name = "pong";
+ aos::ShmEventLoop pong_event_loop(&client_config.message());
+
+ // Count the pongs.
+ int pong_count = 0;
+ pong_event_loop.MakeWatcher(
+ "/test2", [&pong_count, &ping_event_loop](const examples::Ping &ping) {
+ ++pong_count;
+ LOG(INFO) << "Got ping back " << FlatbufferToJson(&ping);
+ if (pong_count >= 2) {
+ LOG(INFO) << "That's enough bailing early.";
+ // And Exit is async safe, so thread safe is easy.
+ ping_event_loop.Exit();
+ }
+ });
+
+ FLAGS_override_hostname = "";
+
+ // Start everything up. Pong is the only thing we don't know how to wait on,
+ // so start it first.
+ std::thread pong_thread([&pong_event_loop]() { pong_event_loop.Run(); });
+
+ std::thread server_thread(
+ [&server_event_loop]() { server_event_loop.Run(); });
+ std::thread client_thread(
+ [&client_event_loop]() { client_event_loop.Run(); });
+
+ // Wait until we are connected, then send.
+ int ping_count = 0;
+ ping_event_loop.MakeWatcher(
+ "/aos/pi1", [&ping_count, &client_event_loop,
+ &ping_sender](const ServerStatistics &stats) {
+ LOG(INFO) << FlatbufferToJson(&stats);
+
+ ASSERT_TRUE(stats.has_connections());
+ EXPECT_EQ(stats.connections()->size(), 1);
+
+ bool connected = false;
+ for (const ServerConnection *connection : *stats.connections()) {
+ if (connection->node()->name()->string_view() ==
+ client_event_loop.node()->name()->string_view()) {
+ if (connection->state() == State::CONNECTED) {
+ connected = true;
+ }
+ break;
+ }
+ }
+
+ if (connected) {
+ LOG(INFO) << "Connected! Sent ping.";
+ auto builder = ping_sender.MakeBuilder();
+ examples::Ping::Builder ping_builder =
+ builder.MakeBuilder<examples::Ping>();
+ ping_builder.add_value(ping_count + 971);
+ builder.Send(ping_builder.Finish());
+ ++ping_count;
+ }
+ });
+
+ // Time ourselves out after a while if Pong doesn't do it for us.
+ aos::TimerHandler *quit = ping_event_loop.AddTimer(
+ [&ping_event_loop]() { ping_event_loop.Exit(); });
+ ping_event_loop.OnRun([quit, &ping_event_loop]() {
+ quit->Setup(ping_event_loop.monotonic_now() + chrono::seconds(10));
+ });
+
+
+ // And go!
+ ping_event_loop.Run();
+
+ // Shut everyone else down
+ server_event_loop.Exit();
+ client_event_loop.Exit();
+ pong_event_loop.Exit();
+ server_thread.join();
+ client_thread.join();
+ pong_thread.join();
+
+ // Make sure we sent something.
+ EXPECT_GE(ping_count, 1);
+ // And got something back.
+ EXPECT_GE(pong_count, 1);
+}
+
+} // namespace testing
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/message_bridge_test_client.json b/aos/network/message_bridge_test_client.json
new file mode 100644
index 0000000..65d28d2
--- /dev/null
+++ b/aos/network/message_bridge_test_client.json
@@ -0,0 +1,17 @@
+{
+ "imports": [
+ "message_bridge_test_common.json"
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "localhost",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "raspberrypi2",
+ "port": 9971
+ }
+ ]
+}
diff --git a/aos/network/message_bridge_test_common.json b/aos/network/message_bridge_test_common.json
new file mode 100644
index 0000000..d0085c2
--- /dev/null
+++ b/aos/network/message_bridge_test_common.json
@@ -0,0 +1,113 @@
+{
+ "channels": [
+ {
+ "name": "/aos/pi1",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.message_bridge.ServerStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi1",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi1",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.message_bridge.ClientStatistics",
+ "source_node": "pi2",
+ "frequency": 2
+ },
+ {
+ "name": "/aos/pi1",
+ "type": "aos.timing.Report",
+ "source_node": "pi1",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/aos/pi2",
+ "type": "aos.timing.Report",
+ "source_node": "pi2",
+ "frequency": 50,
+ "num_senders": 20,
+ "max_size": 2048
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Ping",
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "pi1"
+ }
+ ]
+ },
+ {
+ "name": "/test2",
+ "type": "aos.examples.Ping",
+ "source_node": "pi2"
+ },
+ {
+ "name": "/test",
+ "type": "aos.examples.Pong",
+ "source_node": "pi2",
+ "logger": "LOCAL_AND_REMOTE_LOGGER",
+ "logger_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi1",
+ "priority": 1,
+ "timestamp_logger": "REMOTE_LOGGER",
+ "timestamp_logger_node": "pi1"
+ }
+ ]
+ }
+ ],
+ "applications": [
+ {
+ "name": "pi2_message_bridge_client",
+ "maps": [
+ {
+ "match": {
+ "name": "/test",
+ "type": "aos.examples.Ping"
+ },
+ "rename": {
+ "name": "/test2"
+ }
+ }
+ ]
+ }
+ ],
+ "maps": [
+ {
+ "match": {
+ "name": "/aos",
+ "source_node": "pi1"
+ },
+ "rename": {
+ "name": "/aos/pi1"
+ }
+ },
+ {
+ "match": {
+ "name": "/aos",
+ "source_node": "pi2"
+ },
+ "rename": {
+ "name": "/aos/pi2"
+ }
+ }
+ ]
+}
diff --git a/aos/network/message_bridge_test_server.json b/aos/network/message_bridge_test_server.json
new file mode 100644
index 0000000..eea92e7
--- /dev/null
+++ b/aos/network/message_bridge_test_server.json
@@ -0,0 +1,17 @@
+{
+ "imports": [
+ "message_bridge_test_common.json"
+ ],
+ "nodes": [
+ {
+ "name": "pi1",
+ "hostname": "raspberrypi",
+ "port": 9971
+ },
+ {
+ "name": "pi2",
+ "hostname": "localhost",
+ "port": 9971
+ }
+ ]
+}
diff --git a/aos/network/sctp_client.cc b/aos/network/sctp_client.cc
new file mode 100644
index 0000000..9e7f6c1
--- /dev/null
+++ b/aos/network/sctp_client.cc
@@ -0,0 +1,144 @@
+#include "aos/network/sctp_client.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netinet/sctp.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <string_view>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+SctpClient::SctpClient(std::string_view remote_host, int remote_port,
+ int streams, std::string_view local_host, int local_port)
+ : sockaddr_remote_(ResolveSocket(remote_host, remote_port)),
+ sockaddr_local_(ResolveSocket(local_host, local_port)),
+ fd_(socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP)) {
+ LOG(INFO) << "socket(" << Family(sockaddr_local_)
+ << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
+ PCHECK(fd_ != -1);
+
+ {
+ // Allow the kernel to deliver messages from different streams in any order.
+ int full_interleaving = 2;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
+ &full_interleaving, sizeof(full_interleaving)) == 0);
+ }
+
+ {
+ struct sctp_initmsg initmsg;
+ memset(&initmsg, 0, sizeof(struct sctp_initmsg));
+ initmsg.sinit_num_ostreams = streams;
+ initmsg.sinit_max_instreams = streams;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
+ sizeof(struct sctp_initmsg)) == 0);
+ }
+
+ {
+ int on = 1;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
+ 0);
+ }
+ {
+ // Servers send promptly. Clients don't.
+ // TODO(austin): Revisit this assumption when we have time sync.
+ int on = 0;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
+ }
+
+ {
+ // TODO(austin): This is the old style registration... But, the sctp
+ // stack out in the wild for linux is old and primitive.
+ struct sctp_event_subscribe subscribe;
+ memset(&subscribe, 0, sizeof(subscribe));
+ subscribe.sctp_data_io_event = 1;
+ subscribe.sctp_association_event = 1;
+ PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+ sizeof(subscribe)) == 0);
+ }
+
+ PCHECK(bind(fd_, (struct sockaddr *)&sockaddr_local_,
+ sockaddr_local_.ss_family == AF_INET6
+ ? sizeof(struct sockaddr_in6)
+ : sizeof(struct sockaddr_in)) == 0);
+ VLOG(1) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
+}
+
+aos::unique_c_ptr<Message> SctpClient::Read() {
+ return ReadSctpMessage(fd_, max_size_);
+}
+
+bool SctpClient::Send(int stream, std::string_view data, int time_to_live) {
+ struct iovec iov;
+ iov.iov_base = const_cast<char *>(data.data());
+ iov.iov_len = data.size();
+
+ struct msghdr outmsg;
+ // Target to send to.
+ outmsg.msg_name = &sockaddr_remote_;
+ outmsg.msg_namelen = sizeof(struct sockaddr_storage);
+ VLOG(1) << "Sending to " << Address(sockaddr_remote_);
+
+ // Data to send.
+ outmsg.msg_iov = &iov;
+ outmsg.msg_iovlen = 1;
+
+ // Build up the sndinfo message.
+ char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+ outmsg.msg_control = outcmsg;
+ outmsg.msg_controllen = sizeof(outcmsg);
+ outmsg.msg_flags = 0;
+
+ struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+ cmsg->cmsg_level = IPPROTO_SCTP;
+ cmsg->cmsg_type = SCTP_SNDRCV;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+ outmsg.msg_controllen = cmsg->cmsg_len;
+ struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+ memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+ sinfo->sinfo_ppid = rand();
+ sinfo->sinfo_stream = stream;
+ sinfo->sinfo_context = 19;
+ sinfo->sinfo_flags = 0;
+ sinfo->sinfo_timetolive = time_to_live;
+
+ // And send.
+ const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (size == -1) {
+ if (errno != EPIPE && errno != EAGAIN) {
+ PCHECK(size == static_cast<ssize_t>(data.size()));
+ } else {
+ return false;
+ }
+ } else {
+ CHECK_EQ(static_cast<ssize_t>(data.size()), size);
+ }
+
+ VLOG(1) << "Sent " << data.size();
+ return true;
+}
+
+void SctpClient::LogSctpStatus(sctp_assoc_t assoc_id) {
+ message_bridge::LogSctpStatus(fd(), assoc_id);
+}
+
+void SctpClient::SetPriorityScheduler(sctp_assoc_t assoc_id) {
+ struct sctp_assoc_value scheduler;
+ memset(&scheduler, 0, sizeof(scheduler));
+ scheduler.assoc_id = assoc_id;
+ scheduler.assoc_value = SCTP_SS_PRIO;
+ if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER, &scheduler,
+ sizeof(scheduler)) != 0) {
+ PLOG(WARNING) << "Failed to set scheduler";
+ }
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/sctp_client.h b/aos/network/sctp_client.h
new file mode 100644
index 0000000..926e59b
--- /dev/null
+++ b/aos/network/sctp_client.h
@@ -0,0 +1,57 @@
+#ifndef AOS_NETWORK_SCTP_CLIENT_H_
+#define AOS_NETWORK_SCTP_CLIENT_H_
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string_view>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Class to encapsulate everything needed to be a SCTP client.
+class SctpClient {
+ public:
+ SctpClient(std::string_view remote_host, int remote_port, int streams,
+ std::string_view local_host = "0.0.0.0", int local_port = 9971);
+
+ ~SctpClient() {
+ LOG(INFO) << "close(" << fd_ << ")";
+ PCHECK(close(fd_) == 0);
+ }
+
+ // Receives the next packet from the remote.
+ aos::unique_c_ptr<Message> Read();
+
+ // Sends a block of data on a stream with a TTL.
+ bool Send(int stream, std::string_view data, int time_to_live);
+
+ int fd() { return fd_; }
+
+ // Enables the priority scheduler. This is a SCTP feature which lets us
+ // configure the priority per stream so that higher priority packets don't get
+ // backed up behind lower priority packets in the networking queues.
+ void SetPriorityScheduler(sctp_assoc_t assoc_id);
+
+ // Remote to send to.
+ struct sockaddr_storage sockaddr_remote() const {
+ return sockaddr_remote_;
+ }
+
+ void LogSctpStatus(sctp_assoc_t assoc_id);
+
+ private:
+ struct sockaddr_storage sockaddr_remote_;
+ struct sockaddr_storage sockaddr_local_;
+ int fd_;
+
+ size_t max_size_ = 1000;
+};
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_SCTP_CLIENT_H_
diff --git a/aos/network/sctp_lib.cc b/aos/network/sctp_lib.cc
new file mode 100644
index 0000000..1ed816b
--- /dev/null
+++ b/aos/network/sctp_lib.cc
@@ -0,0 +1,229 @@
+#include "aos/network/sctp_lib.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/sctp.h>
+
+#include <string_view>
+
+DEFINE_string(interface, "", "ipv6 interface");
+
+namespace aos {
+namespace message_bridge {
+
+namespace {
+const char *sac_state_tbl[] = {"COMMUNICATION_UP", "COMMUNICATION_LOST",
+ "RESTART", "SHUTDOWN_COMPLETE",
+ "CANT_START_ASSOCICATION"};
+
+typedef union {
+ struct sctp_initmsg init;
+ struct sctp_sndrcvinfo sndrcvinfo;
+} _sctp_cmsg_data_t;
+
+} // namespace
+
+struct sockaddr_storage ResolveSocket(std::string_view host, int port) {
+ struct sockaddr_storage result;
+ struct addrinfo *addrinfo_result;
+ struct sockaddr_in *t_addr = (struct sockaddr_in *)&result;
+ struct sockaddr_in6 *t_addr6 = (struct sockaddr_in6 *)&result;
+
+ PCHECK(getaddrinfo(std::string(host).c_str(), 0, NULL, &addrinfo_result) ==
+ 0);
+
+ switch (addrinfo_result->ai_family) {
+ case AF_INET:
+ memcpy(t_addr, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
+ t_addr->sin_family = addrinfo_result->ai_family;
+ t_addr->sin_port = htons(port);
+
+ break;
+ case AF_INET6:
+ memcpy(t_addr6, addrinfo_result->ai_addr, addrinfo_result->ai_addrlen);
+ t_addr6->sin6_family = addrinfo_result->ai_family;
+ t_addr6->sin6_port = htons(port);
+
+ if (FLAGS_interface.size() > 0) {
+ t_addr6->sin6_scope_id = if_nametoindex(FLAGS_interface.c_str());
+ }
+
+ break;
+ }
+
+ // Now print it back out nicely.
+ char host_string[NI_MAXHOST];
+ char service_string[NI_MAXSERV];
+
+ int error = getnameinfo((struct sockaddr *)&result,
+ addrinfo_result->ai_addrlen, host_string, NI_MAXHOST,
+ service_string, NI_MAXSERV, NI_NUMERICHOST);
+
+ if (error) {
+ LOG(ERROR) << "Reverse lookup failed ... " << gai_strerror(error);
+ }
+
+ LOG(INFO) << "remote:addr=" << host_string << ", port=" << service_string
+ << ", family=" << addrinfo_result->ai_family;
+
+ freeaddrinfo(addrinfo_result);
+
+ return result;
+}
+
+std::string_view Family(const struct sockaddr_storage &sockaddr) {
+ if (sockaddr.ss_family == AF_INET) {
+ return "AF_INET";
+ } else if (sockaddr.ss_family == AF_INET6) {
+ return "AF_INET6";
+ } else {
+ return "unknown";
+ }
+}
+std::string Address(const struct sockaddr_storage &sockaddr) {
+ char addrbuf[INET6_ADDRSTRLEN];
+ if (sockaddr.ss_family == AF_INET) {
+ const struct sockaddr_in *sin = (const struct sockaddr_in *)&sockaddr;
+ return std::string(
+ inet_ntop(AF_INET, &sin->sin_addr, addrbuf, INET6_ADDRSTRLEN));
+ } else {
+ const struct sockaddr_in6 *sin6 = (const struct sockaddr_in6 *)&sockaddr;
+ return std::string(
+ inet_ntop(AF_INET6, &sin6->sin6_addr, addrbuf, INET6_ADDRSTRLEN));
+ }
+}
+
+void PrintNotification(const Message *msg) {
+ const union sctp_notification *snp =
+ (const union sctp_notification *)msg->data();
+
+ LOG(INFO) << "Notification:";
+
+ switch (snp->sn_header.sn_type) {
+ case SCTP_ASSOC_CHANGE: {
+ const struct sctp_assoc_change *sac = &snp->sn_assoc_change;
+ LOG(INFO) << "SCTP_ASSOC_CHANGE(" << sac_state_tbl[sac->sac_state] << ")";
+ VLOG(1) << " (assoc_change: state=" << sac->sac_state
+ << ", error=" << sac->sac_error
+ << ", instr=" << sac->sac_inbound_streams
+ << " outstr=" << sac->sac_outbound_streams
+ << ", assoc=" << sac->sac_assoc_id << ")";
+ } break;
+ case SCTP_PEER_ADDR_CHANGE: {
+ const struct sctp_paddr_change *spc = &snp->sn_paddr_change;
+ LOG(INFO) << " SlCTP_PEER_ADDR_CHANGE";
+ VLOG(1) << "\t\t(peer_addr_change: " << Address(spc->spc_aaddr)
+ << " state=" << spc->spc_state << ", error=" << spc->spc_error
+ << ")";
+ } break;
+ case SCTP_SEND_FAILED: {
+ const struct sctp_send_failed *ssf = &snp->sn_send_failed;
+ LOG(INFO) << " SCTP_SEND_FAILED";
+ VLOG(1) << "\t\t(sendfailed: len=" << ssf->ssf_length
+ << " err=" << ssf->ssf_error << ")";
+ } break;
+ case SCTP_REMOTE_ERROR: {
+ const struct sctp_remote_error *sre = &snp->sn_remote_error;
+ LOG(INFO) << " SCTP_REMOTE_ERROR";
+ VLOG(1) << "\t\t(remote_error: err=" << ntohs(sre->sre_error) << ")";
+ } break;
+ case SCTP_SHUTDOWN_EVENT: {
+ LOG(INFO) << " SCTP_SHUTDOWN_EVENT";
+ } break;
+ default:
+ LOG(INFO) << " Unknown type: " << snp->sn_header.sn_type;
+ break;
+ }
+}
+
+std::string GetHostname() {
+ char buf[256];
+ buf[sizeof(buf) - 1] = '\0';
+ PCHECK(gethostname(buf, sizeof(buf) - 1) == 0);
+ return buf;
+}
+
+std::string Message::PeerAddress() const { return Address(sin); }
+
+void LogSctpStatus(int fd, sctp_assoc_t assoc_id) {
+ struct sctp_status status;
+ memset(&status, 0, sizeof(status));
+ status.sstat_assoc_id = assoc_id;
+
+ socklen_t size = sizeof(status);
+ PCHECK(getsockopt(fd, SOL_SCTP, SCTP_STATUS,
+ reinterpret_cast<void *>(&status), &size) == 0);
+
+ LOG(INFO) << "sctp_status) sstat_assoc_id:" << status.sstat_assoc_id
+ << " sstat_state:" << status.sstat_state
+ << " sstat_rwnd:" << status.sstat_rwnd
+ << " sstat_unackdata:" << status.sstat_unackdata
+ << " sstat_penddata:" << status.sstat_penddata
+ << " sstat_instrms:" << status.sstat_instrms
+ << " sstat_outstrms:" << status.sstat_outstrms
+ << " sstat_fragmentation_point:" << status.sstat_fragmentation_point
+ << " sstat_primary.spinfo_srtt:" << status.sstat_primary.spinfo_srtt
+ << " sstat_primary.spinfo_rto:" << status.sstat_primary.spinfo_rto;
+}
+
+aos::unique_c_ptr<Message> ReadSctpMessage(int fd, int max_size) {
+ char incmsg[CMSG_SPACE(sizeof(_sctp_cmsg_data_t))];
+ struct iovec iov;
+ struct msghdr inmessage;
+
+ memset(&inmessage, 0, sizeof(struct msghdr));
+
+ aos::unique_c_ptr<Message> result(
+ reinterpret_cast<Message *>(malloc(sizeof(Message) + max_size)));
+
+ iov.iov_len = max_size;
+ iov.iov_base = result->mutable_data();
+
+ inmessage.msg_iov = &iov;
+ inmessage.msg_iovlen = 1;
+
+ inmessage.msg_control = incmsg;
+ inmessage.msg_controllen = sizeof(incmsg);
+
+ inmessage.msg_namelen = sizeof(struct sockaddr_storage);
+ inmessage.msg_name = &result->sin;
+
+ ssize_t size;
+ PCHECK((size = recvmsg(fd, &inmessage, 0)) > 0);
+
+ result->size = size;
+
+ if ((MSG_NOTIFICATION & inmessage.msg_flags)) {
+ result->message_type = Message::kNotification;
+ } else {
+ result->message_type = Message::kMessage;
+ }
+
+ for (struct cmsghdr *scmsg = CMSG_FIRSTHDR(&inmessage); scmsg != NULL;
+ scmsg = CMSG_NXTHDR(&inmessage, scmsg)) {
+ switch (scmsg->cmsg_type) {
+ case SCTP_RCVINFO: {
+ struct sctp_rcvinfo *data = (struct sctp_rcvinfo *)CMSG_DATA(scmsg);
+ result->header.rcvinfo = *data;
+ } break;
+ default:
+ LOG(INFO) << "\tUnknown type: " << scmsg->cmsg_type;
+ break;
+ }
+ }
+
+ return result;
+}
+
+void Message::LogRcvInfo() const {
+ LOG(INFO) << "\tSNDRCV (stream=" << header.rcvinfo.rcv_sid
+ << " ssn=" << header.rcvinfo.rcv_ssn
+ << " tsn=" << header.rcvinfo.rcv_tsn << " flags=0x" << std::hex
+ << header.rcvinfo.rcv_flags << std::dec
+ << " ppid=" << header.rcvinfo.rcv_ppid
+ << " cumtsn=" << header.rcvinfo.rcv_cumtsn << ")";
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/sctp_lib.h b/aos/network/sctp_lib.h
new file mode 100644
index 0000000..0f90f87
--- /dev/null
+++ b/aos/network/sctp_lib.h
@@ -0,0 +1,78 @@
+#ifndef AOS_NETWORK_SCTP_LIB_H_
+#define AOS_NETWORK_SCTP_LIB_H_
+
+#include <arpa/inet.h>
+#include <netinet/sctp.h>
+
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include "aos/unique_malloc_ptr.h"
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+// Resolves a socket and returns the address. This can be either an ipv4 or
+// ipv6 address.
+struct sockaddr_storage ResolveSocket(std::string_view host, int port);
+
+// Returns a formatted version of the address.
+std::string Address(const struct sockaddr_storage &sockaddr);
+// Returns a formatted version of the address family.
+std::string_view Family(const struct sockaddr_storage &sockaddr);
+
+// Message received.
+// This message is malloced bigger than needed and the extra space after it is
+// the data.
+struct Message {
+ // Struct to let us force data to be well aligned.
+ struct OveralignedChar {
+ uint8_t data alignas(32);
+ };
+
+ // Headers.
+ struct {
+ struct sctp_rcvinfo rcvinfo;
+ } header;
+
+ // Address of the sender.
+ struct sockaddr_storage sin;
+
+ // Data type. Is it a block of data, or is it a struct sctp_notification?
+ enum MessageType { kMessage, kNotification } message_type;
+
+ size_t size = 0u;
+ uint8_t *mutable_data() {
+ return reinterpret_cast<uint8_t *>(&actual_data[0].data);
+ }
+ const uint8_t *data() const {
+ return reinterpret_cast<const uint8_t *>(&actual_data[0].data);
+ }
+
+ // Returns a human readable peer IP address.
+ std::string PeerAddress() const;
+
+ // Prints out the RcvInfo structure.
+ void LogRcvInfo() const;
+
+ // The start of the data.
+ OveralignedChar actual_data[];
+};
+
+void PrintNotification(const Message *msg);
+
+std::string GetHostname();
+
+// Gets and logs the contents of the sctp_status message.
+void LogSctpStatus(int fd, sctp_assoc_t assoc_id);
+
+// Read and allocate a message.
+aos::unique_c_ptr<Message> ReadSctpMessage(int fd, int max_size);
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_SCTP_LIB_H_
diff --git a/aos/network/sctp_server.cc b/aos/network/sctp_server.cc
new file mode 100644
index 0000000..70d5b28
--- /dev/null
+++ b/aos/network/sctp_server.cc
@@ -0,0 +1,143 @@
+#include "aos/network/sctp_server.h"
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/sctp.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <memory>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+SctpServer::SctpServer(std::string_view local_host, int local_port)
+ : sockaddr_local_(ResolveSocket(local_host, local_port)),
+ fd_(socket(sockaddr_local_.ss_family, SOCK_SEQPACKET, IPPROTO_SCTP)) {
+ LOG(INFO) << "socket(" << Family(sockaddr_local_)
+ << ", SOCK_SEQPACKET, IPPROTOSCTP) = " << fd_;
+ PCHECK(fd_ != -1);
+
+ {
+ struct sctp_event_subscribe subscribe;
+ memset(&subscribe, 0, sizeof(subscribe));
+ subscribe.sctp_data_io_event = 1;
+ subscribe.sctp_association_event = 1;
+ subscribe.sctp_send_failure_event = 1;
+ subscribe.sctp_partial_delivery_event = 1;
+
+ PCHECK(setsockopt(fd_, SOL_SCTP, SCTP_EVENTS, (char *)&subscribe,
+ sizeof(subscribe)) == 0);
+ }
+ {
+ // Enable recvinfo when a packet arrives.
+ int on = 1;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_RECVRCVINFO, &on, sizeof(int)) ==
+ 0);
+ }
+ {
+ // Allow one packet on the wire to have multiple source packets.
+ int full_interleaving = 2;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_FRAGMENT_INTERLEAVE,
+ &full_interleaving, sizeof(full_interleaving)) == 0);
+ }
+ {
+ // Turn off the NAGLE algorithm.
+ int on = 1;
+ PCHECK(setsockopt(fd_, IPPROTO_SCTP, SCTP_NODELAY, &on, sizeof(int)) == 0);
+ }
+
+ // And go!
+ PCHECK(bind(fd_, (struct sockaddr *)&sockaddr_local_,
+ sockaddr_local_.ss_family == AF_INET6
+ ? sizeof(struct sockaddr_in6)
+ : sizeof(struct sockaddr_in)) == 0);
+ LOG(INFO) << "bind(" << fd_ << ", " << Address(sockaddr_local_) << ")";
+
+ PCHECK(listen(fd_, 100) == 0);
+
+ PCHECK(setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &max_size_,
+ sizeof(max_size_)) == 0);
+}
+
+aos::unique_c_ptr<Message> SctpServer::Read() {
+ return ReadSctpMessage(fd_, max_size_);
+}
+
+void SctpServer::Send(std::string_view data, sctp_assoc_t snd_assoc_id,
+ int stream, int timetolive) {
+ struct iovec iov;
+ iov.iov_base = const_cast<char *>(data.data());
+ iov.iov_len = data.size();
+
+ // Use the assoc_id for the destination instead of the msg_name.
+ struct msghdr outmsg;
+ outmsg.msg_namelen = 0;
+
+ // Data to send.
+ outmsg.msg_iov = &iov;
+ outmsg.msg_iovlen = 1;
+
+ // Build up the sndinfo message.
+ char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
+ outmsg.msg_control = outcmsg;
+ outmsg.msg_controllen = CMSG_SPACE(sizeof(struct sctp_sndrcvinfo));
+ outmsg.msg_flags = 0;
+
+ struct cmsghdr *cmsg = CMSG_FIRSTHDR(&outmsg);
+ cmsg->cmsg_level = IPPROTO_SCTP;
+ cmsg->cmsg_type = SCTP_SNDRCV;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
+
+ struct sctp_sndrcvinfo *sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
+ memset(sinfo, 0, sizeof(struct sctp_sndrcvinfo));
+ sinfo->sinfo_ppid = ++ppid_;
+ sinfo->sinfo_stream = stream;
+ sinfo->sinfo_flags = 0;
+ sinfo->sinfo_assoc_id = snd_assoc_id;
+ sinfo->sinfo_timetolive = timetolive;
+
+ // And send.
+ const ssize_t size = sendmsg(fd_, &outmsg, MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (size == -1) {
+ if (errno != EPIPE) {
+ PCHECK(size == static_cast<ssize_t>(data.size()));
+ }
+ } else {
+ CHECK_EQ(static_cast<ssize_t>(data.size()), size);
+ }
+}
+
+void SctpServer::SetPriorityScheduler(sctp_assoc_t assoc_id) {
+ struct sctp_assoc_value scheduler;
+ memset(&scheduler, 0, sizeof(scheduler));
+ scheduler.assoc_id = assoc_id;
+ scheduler.assoc_value = SCTP_SS_PRIO;
+ if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER, &scheduler,
+ sizeof(scheduler)) != 0) {
+ PLOG(WARNING) << "Failed to set scheduler";
+ }
+}
+
+void SctpServer::SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
+ uint16_t priority) {
+ struct sctp_stream_value sctp_priority;
+ memset(&sctp_priority, 0, sizeof(sctp_priority));
+ sctp_priority.assoc_id = assoc_id;
+ sctp_priority.stream_id = stream_id;
+ sctp_priority.stream_value = priority;
+ if (setsockopt(fd(), IPPROTO_SCTP, SCTP_STREAM_SCHEDULER_VALUE,
+ &sctp_priority, sizeof(sctp_priority)) != 0) {
+ PLOG(WARNING) << "Failed to set scheduler";
+ }
+}
+
+} // namespace message_bridge
+} // namespace aos
diff --git a/aos/network/sctp_server.h b/aos/network/sctp_server.h
new file mode 100644
index 0000000..a3086d9
--- /dev/null
+++ b/aos/network/sctp_server.h
@@ -0,0 +1,63 @@
+#ifndef AOS_NETWORK_SCTP_SERVER_H_
+#define AOS_NETWORK_SCTP_SERVER_H_
+
+#include <arpa/inet.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/sctp.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <memory>
+#include <sys/socket.h>
+
+#include "aos/network/sctp_lib.h"
+#include "aos/unique_malloc_ptr.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace message_bridge {
+
+class SctpServer {
+ public:
+ SctpServer(std::string_view local_host = "0.0.0.0", int local_port = 9971);
+
+ ~SctpServer() {
+ LOG(INFO) << "close(" << fd_ << ")";
+ PCHECK(close(fd_) == 0);
+ }
+
+ // Receives the next packet from the remote.
+ aos::unique_c_ptr<Message> Read();
+
+ // Sends a block of data to a client on a stream with a TTL.
+ void Send(std::string_view data, sctp_assoc_t snd_assoc_id, int stream,
+ int timetolive);
+
+ int fd() { return fd_; }
+
+ // Enables the priority scheduler. This is a SCTP feature which lets us
+ // configure the priority per stream so that higher priority packets don't get
+ // backed up behind lower priority packets in the networking queues.
+ void SetPriorityScheduler(sctp_assoc_t assoc_id);
+
+ // Sets the priority of a specific stream.
+ void SetStreamPriority(sctp_assoc_t assoc_id, int stream_id,
+ uint16_t priority);
+
+ private:
+ struct sockaddr_storage sockaddr_local_;
+ int fd_;
+
+ // TODO(austin): Configure this.
+ size_t max_size_ = 1000;
+
+ int ppid_ = 1;
+};
+
+
+} // namespace message_bridge
+} // namespace aos
+
+#endif // AOS_NETWORK_SCTP_SERVER_H_
diff --git a/aos/testdata/config1_multinode.json b/aos/testdata/config1_multinode.json
index 32bce5c..f12d927 100644
--- a/aos/testdata/config1_multinode.json
+++ b/aos/testdata/config1_multinode.json
@@ -4,12 +4,22 @@
"name": "/foo",
"type": ".aos.bar",
"max_size": 5,
- "source_node": "pi2"
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1"
+ }
+ ]
},
{
"name": "/foo2",
"type": ".aos.bar",
- "source_node": "pi1"
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2"
+ }
+ ]
}
],
"applications": [
diff --git a/aos/testdata/expected_multinode.json b/aos/testdata/expected_multinode.json
index 46d052e..0946007 100644
--- a/aos/testdata/expected_multinode.json
+++ b/aos/testdata/expected_multinode.json
@@ -4,12 +4,22 @@
"name": "/foo",
"type": ".aos.bar",
"max_size": 5,
- "source_node": "pi2"
+ "source_node": "pi2",
+ "destination_nodes": [
+ {
+ "name": "pi1"
+ }
+ ]
},
{
"name": "/foo2",
"type": ".aos.bar",
- "source_node": "pi1"
+ "source_node": "pi1",
+ "destination_nodes": [
+ {
+ "name": "pi2"
+ }
+ ]
},
{
"name": "/foo3",
diff --git a/compilers/linaro_linux_gcc.BUILD b/compilers/linaro_linux_gcc.BUILD
index db2e9bb..aea261d 100644
--- a/compilers/linaro_linux_gcc.BUILD
+++ b/compilers/linaro_linux_gcc.BUILD
@@ -63,7 +63,10 @@
"libexec/**",
"lib/gcc/arm-linux-gnueabihf/**",
"include/**",
- ]),
+ ], exclude=["arm-linux-gnueabihf/libc/usr/include/linux/sctp.h"]) +
+ [
+ "@org_frc971//third_party/linux:sctp",
+ ],
)
filegroup(
diff --git a/third_party/linux/BUILD b/third_party/linux/BUILD
new file mode 100644
index 0000000..007b913
--- /dev/null
+++ b/third_party/linux/BUILD
@@ -0,0 +1,10 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+filegroup(
+ name = "sctp",
+ srcs = [
+ "sctp.h",
+ ],
+)
diff --git a/third_party/linux/sctp.h b/third_party/linux/sctp.h
new file mode 100644
index 0000000..bd82046
--- /dev/null
+++ b/third_party/linux/sctp.h
@@ -0,0 +1,1105 @@
+/* SPDX-License-Identifier: GPL-2.0+ WITH Linux-syscall-note */
+/* SCTP kernel implementation
+ * (C) Copyright IBM Corp. 2001, 2004
+ * Copyright (c) 1999-2000 Cisco, Inc.
+ * Copyright (c) 1999-2001 Motorola, Inc.
+ * Copyright (c) 2002 Intel Corp.
+ *
+ * This file is part of the SCTP kernel implementation
+ *
+ * This header represents the structures and constants needed to support
+ * the SCTP Extension to the Sockets API.
+ *
+ * This SCTP implementation is free software;
+ * you can redistribute it and/or modify it under the terms of
+ * the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * This SCTP implementation is distributed in the hope that it
+ * will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * ************************
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU CC; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Please send any bug reports or fixes you make to the
+ * email address(es):
+ * lksctp developers <linux-sctp@vger.kernel.org>
+ *
+ * Or submit a bug report through the following website:
+ * http://www.sf.net/projects/lksctp
+ *
+ * Written or modified by:
+ * La Monte H.P. Yarroll <piggy@acm.org>
+ * R. Stewart <randall@sctp.chicago.il.us>
+ * K. Morneau <kmorneau@cisco.com>
+ * Q. Xie <qxie1@email.mot.com>
+ * Karl Knutson <karl@athena.chicago.il.us>
+ * Jon Grimm <jgrimm@us.ibm.com>
+ * Daisy Chang <daisyc@us.ibm.com>
+ * Ryan Layer <rmlayer@us.ibm.com>
+ * Ardelle Fan <ardelle.fan@intel.com>
+ * Sridhar Samudrala <sri@us.ibm.com>
+ * Inaky Perez-Gonzalez <inaky.gonzalez@intel.com>
+ * Vlad Yasevich <vladislav.yasevich@hp.com>
+ *
+ * Any bugs reported given to us we will try to fix... any fixes shared will
+ * be incorporated into the next SCTP release.
+ */
+
+#ifndef _SCTP_H
+#define _SCTP_H
+
+#include <linux/types.h>
+#include <linux/socket.h>
+
+typedef __s32 sctp_assoc_t;
+
+/* The following symbols come from the Sockets API Extensions for
+ * SCTP <draft-ietf-tsvwg-sctpsocket-07.txt>.
+ */
+#define SCTP_RTOINFO 0
+#define SCTP_ASSOCINFO 1
+#define SCTP_INITMSG 2
+#define SCTP_NODELAY 3 /* Get/set nodelay option. */
+#define SCTP_AUTOCLOSE 4
+#define SCTP_SET_PEER_PRIMARY_ADDR 5
+#define SCTP_PRIMARY_ADDR 6
+#define SCTP_ADAPTATION_LAYER 7
+#define SCTP_DISABLE_FRAGMENTS 8
+#define SCTP_PEER_ADDR_PARAMS 9
+#define SCTP_DEFAULT_SEND_PARAM 10
+#define SCTP_EVENTS 11
+#define SCTP_I_WANT_MAPPED_V4_ADDR 12 /* Turn on/off mapped v4 addresses */
+#define SCTP_MAXSEG 13 /* Get/set maximum fragment. */
+#define SCTP_STATUS 14
+#define SCTP_GET_PEER_ADDR_INFO 15
+#define SCTP_DELAYED_ACK_TIME 16
+#define SCTP_DELAYED_ACK SCTP_DELAYED_ACK_TIME
+#define SCTP_DELAYED_SACK SCTP_DELAYED_ACK_TIME
+#define SCTP_CONTEXT 17
+#define SCTP_FRAGMENT_INTERLEAVE 18
+#define SCTP_PARTIAL_DELIVERY_POINT 19 /* Set/Get partial delivery point */
+#define SCTP_MAX_BURST 20 /* Set/Get max burst */
+#define SCTP_AUTH_CHUNK 21 /* Set only: add a chunk type to authenticate */
+#define SCTP_HMAC_IDENT 22
+#define SCTP_AUTH_KEY 23
+#define SCTP_AUTH_ACTIVE_KEY 24
+#define SCTP_AUTH_DELETE_KEY 25
+#define SCTP_PEER_AUTH_CHUNKS 26 /* Read only */
+#define SCTP_LOCAL_AUTH_CHUNKS 27 /* Read only */
+#define SCTP_GET_ASSOC_NUMBER 28 /* Read only */
+#define SCTP_GET_ASSOC_ID_LIST 29 /* Read only */
+#define SCTP_AUTO_ASCONF 30
+#define SCTP_PEER_ADDR_THLDS 31
+#define SCTP_RECVRCVINFO 32
+#define SCTP_RECVNXTINFO 33
+#define SCTP_DEFAULT_SNDINFO 34
+
+/* Internal Socket Options. Some of the sctp library functions are
+ * implemented using these socket options.
+ */
+#define SCTP_SOCKOPT_BINDX_ADD 100 /* BINDX requests for adding addrs */
+#define SCTP_SOCKOPT_BINDX_REM 101 /* BINDX requests for removing addrs. */
+#define SCTP_SOCKOPT_PEELOFF 102 /* peel off association. */
+/* Options 104-106 are deprecated and removed. Do not use this space */
+#define SCTP_SOCKOPT_CONNECTX_OLD 107 /* CONNECTX old requests. */
+#define SCTP_GET_PEER_ADDRS 108 /* Get all peer address. */
+#define SCTP_GET_LOCAL_ADDRS 109 /* Get all local address. */
+#define SCTP_SOCKOPT_CONNECTX 110 /* CONNECTX requests. */
+#define SCTP_SOCKOPT_CONNECTX3 111 /* CONNECTX requests (updated) */
+#define SCTP_GET_ASSOC_STATS 112 /* Read only */
+#define SCTP_PR_SUPPORTED 113
+#define SCTP_DEFAULT_PRINFO 114
+#define SCTP_PR_ASSOC_STATUS 115
+#define SCTP_PR_STREAM_STATUS 116
+#define SCTP_RECONFIG_SUPPORTED 117
+#define SCTP_ENABLE_STREAM_RESET 118
+#define SCTP_RESET_STREAMS 119
+#define SCTP_RESET_ASSOC 120
+#define SCTP_ADD_STREAMS 121
+#define SCTP_SOCKOPT_PEELOFF_FLAGS 122
+#define SCTP_STREAM_SCHEDULER 123
+#define SCTP_STREAM_SCHEDULER_VALUE 124
+
+/* PR-SCTP policies */
+#define SCTP_PR_SCTP_NONE 0x0000
+#define SCTP_PR_SCTP_TTL 0x0010
+#define SCTP_PR_SCTP_RTX 0x0020
+#define SCTP_PR_SCTP_PRIO 0x0030
+#define SCTP_PR_SCTP_MAX SCTP_PR_SCTP_PRIO
+#define SCTP_PR_SCTP_MASK 0x0030
+
+#define __SCTP_PR_INDEX(x) ((x >> 4) - 1)
+#define SCTP_PR_INDEX(x) __SCTP_PR_INDEX(SCTP_PR_SCTP_ ## x)
+
+#define SCTP_PR_POLICY(x) ((x) & SCTP_PR_SCTP_MASK)
+#define SCTP_PR_SET_POLICY(flags, x) \
+ do { \
+ flags &= ~SCTP_PR_SCTP_MASK; \
+ flags |= x; \
+ } while (0)
+
+#define SCTP_PR_TTL_ENABLED(x) (SCTP_PR_POLICY(x) == SCTP_PR_SCTP_TTL)
+#define SCTP_PR_RTX_ENABLED(x) (SCTP_PR_POLICY(x) == SCTP_PR_SCTP_RTX)
+#define SCTP_PR_PRIO_ENABLED(x) (SCTP_PR_POLICY(x) == SCTP_PR_SCTP_PRIO)
+
+/* For enable stream reset */
+#define SCTP_ENABLE_RESET_STREAM_REQ 0x01
+#define SCTP_ENABLE_RESET_ASSOC_REQ 0x02
+#define SCTP_ENABLE_CHANGE_ASSOC_REQ 0x04
+#define SCTP_ENABLE_STRRESET_MASK 0x07
+
+#define SCTP_STREAM_RESET_INCOMING 0x01
+#define SCTP_STREAM_RESET_OUTGOING 0x02
+
+/* These are bit fields for msghdr->msg_flags. See section 5.1. */
+/* On user space Linux, these live in <bits/socket.h> as an enum. */
+enum sctp_msg_flags {
+ MSG_NOTIFICATION = 0x8000,
+#define MSG_NOTIFICATION MSG_NOTIFICATION
+};
+
+/* 5.3.1 SCTP Initiation Structure (SCTP_INIT)
+ *
+ * This cmsghdr structure provides information for initializing new
+ * SCTP associations with sendmsg(). The SCTP_INITMSG socket option
+ * uses this same data structure. This structure is not used for
+ * recvmsg().
+ *
+ * cmsg_level cmsg_type cmsg_data[]
+ * ------------ ------------ ----------------------
+ * IPPROTO_SCTP SCTP_INIT struct sctp_initmsg
+ */
+struct sctp_initmsg {
+ __u16 sinit_num_ostreams;
+ __u16 sinit_max_instreams;
+ __u16 sinit_max_attempts;
+ __u16 sinit_max_init_timeo;
+};
+
+/* 5.3.2 SCTP Header Information Structure (SCTP_SNDRCV)
+ *
+ * This cmsghdr structure specifies SCTP options for sendmsg() and
+ * describes SCTP header information about a received message through
+ * recvmsg().
+ *
+ * cmsg_level cmsg_type cmsg_data[]
+ * ------------ ------------ ----------------------
+ * IPPROTO_SCTP SCTP_SNDRCV struct sctp_sndrcvinfo
+ */
+struct sctp_sndrcvinfo {
+ __u16 sinfo_stream;
+ __u16 sinfo_ssn;
+ __u16 sinfo_flags;
+ __u32 sinfo_ppid;
+ __u32 sinfo_context;
+ __u32 sinfo_timetolive;
+ __u32 sinfo_tsn;
+ __u32 sinfo_cumtsn;
+ sctp_assoc_t sinfo_assoc_id;
+};
+
+/* 5.3.4 SCTP Send Information Structure (SCTP_SNDINFO)
+ *
+ * This cmsghdr structure specifies SCTP options for sendmsg().
+ *
+ * cmsg_level cmsg_type cmsg_data[]
+ * ------------ ------------ -------------------
+ * IPPROTO_SCTP SCTP_SNDINFO struct sctp_sndinfo
+ */
+struct sctp_sndinfo {
+ __u16 snd_sid;
+ __u16 snd_flags;
+ __u32 snd_ppid;
+ __u32 snd_context;
+ sctp_assoc_t snd_assoc_id;
+};
+
+/* 5.3.5 SCTP Receive Information Structure (SCTP_RCVINFO)
+ *
+ * This cmsghdr structure describes SCTP receive information
+ * about a received message through recvmsg().
+ *
+ * cmsg_level cmsg_type cmsg_data[]
+ * ------------ ------------ -------------------
+ * IPPROTO_SCTP SCTP_RCVINFO struct sctp_rcvinfo
+ */
+struct sctp_rcvinfo {
+ __u16 rcv_sid;
+ __u16 rcv_ssn;
+ __u16 rcv_flags;
+ __u32 rcv_ppid;
+ __u32 rcv_tsn;
+ __u32 rcv_cumtsn;
+ __u32 rcv_context;
+ sctp_assoc_t rcv_assoc_id;
+};
+
+/* 5.3.6 SCTP Next Receive Information Structure (SCTP_NXTINFO)
+ *
+ * This cmsghdr structure describes SCTP receive information
+ * of the next message that will be delivered through recvmsg()
+ * if this information is already available when delivering
+ * the current message.
+ *
+ * cmsg_level cmsg_type cmsg_data[]
+ * ------------ ------------ -------------------
+ * IPPROTO_SCTP SCTP_NXTINFO struct sctp_nxtinfo
+ */
+struct sctp_nxtinfo {
+ __u16 nxt_sid;
+ __u16 nxt_flags;
+ __u32 nxt_ppid;
+ __u32 nxt_length;
+ sctp_assoc_t nxt_assoc_id;
+};
+
+/*
+ * sinfo_flags: 16 bits (unsigned integer)
+ *
+ * This field may contain any of the following flags and is composed of
+ * a bitwise OR of these values.
+ */
+enum sctp_sinfo_flags {
+ SCTP_UNORDERED = (1 << 0), /* Send/receive message unordered. */
+ SCTP_ADDR_OVER = (1 << 1), /* Override the primary destination. */
+ SCTP_ABORT = (1 << 2), /* Send an ABORT message to the peer. */
+ SCTP_SACK_IMMEDIATELY = (1 << 3), /* SACK should be sent without delay. */
+ SCTP_NOTIFICATION = MSG_NOTIFICATION, /* Next message is not user msg but notification. */
+ SCTP_EOF = MSG_FIN, /* Initiate graceful shutdown process. */
+};
+
+typedef union {
+ __u8 raw;
+ struct sctp_initmsg init;
+ struct sctp_sndrcvinfo sndrcv;
+} sctp_cmsg_data_t;
+
+/* These are cmsg_types. */
+typedef enum sctp_cmsg_type {
+ SCTP_INIT, /* 5.2.1 SCTP Initiation Structure */
+#define SCTP_INIT SCTP_INIT
+ SCTP_SNDRCV, /* 5.2.2 SCTP Header Information Structure */
+#define SCTP_SNDRCV SCTP_SNDRCV
+ SCTP_SNDINFO, /* 5.3.4 SCTP Send Information Structure */
+#define SCTP_SNDINFO SCTP_SNDINFO
+ SCTP_RCVINFO, /* 5.3.5 SCTP Receive Information Structure */
+#define SCTP_RCVINFO SCTP_RCVINFO
+ SCTP_NXTINFO, /* 5.3.6 SCTP Next Receive Information Structure */
+#define SCTP_NXTINFO SCTP_NXTINFO
+} sctp_cmsg_t;
+
+/*
+ * 5.3.1.1 SCTP_ASSOC_CHANGE
+ *
+ * Communication notifications inform the ULP that an SCTP association
+ * has either begun or ended. The identifier for a new association is
+ * provided by this notificaion. The notification information has the
+ * following format:
+ *
+ */
+struct sctp_assoc_change {
+ __u16 sac_type;
+ __u16 sac_flags;
+ __u32 sac_length;
+ __u16 sac_state;
+ __u16 sac_error;
+ __u16 sac_outbound_streams;
+ __u16 sac_inbound_streams;
+ sctp_assoc_t sac_assoc_id;
+ __u8 sac_info[0];
+};
+
+/*
+ * sac_state: 32 bits (signed integer)
+ *
+ * This field holds one of a number of values that communicate the
+ * event that happened to the association. They include:
+ *
+ * Note: The following state names deviate from the API draft as
+ * the names clash too easily with other kernel symbols.
+ */
+enum sctp_sac_state {
+ SCTP_COMM_UP,
+ SCTP_COMM_LOST,
+ SCTP_RESTART,
+ SCTP_SHUTDOWN_COMP,
+ SCTP_CANT_STR_ASSOC,
+};
+
+/*
+ * 5.3.1.2 SCTP_PEER_ADDR_CHANGE
+ *
+ * When a destination address on a multi-homed peer encounters a change
+ * an interface details event is sent. The information has the
+ * following structure:
+ */
+struct sctp_paddr_change {
+ __u16 spc_type;
+ __u16 spc_flags;
+ __u32 spc_length;
+ struct sockaddr_storage spc_aaddr;
+ int spc_state;
+ int spc_error;
+ sctp_assoc_t spc_assoc_id;
+} __attribute__((packed, aligned(4)));
+
+/*
+ * spc_state: 32 bits (signed integer)
+ *
+ * This field holds one of a number of values that communicate the
+ * event that happened to the address. They include:
+ */
+enum sctp_spc_state {
+ SCTP_ADDR_AVAILABLE,
+ SCTP_ADDR_UNREACHABLE,
+ SCTP_ADDR_REMOVED,
+ SCTP_ADDR_ADDED,
+ SCTP_ADDR_MADE_PRIM,
+ SCTP_ADDR_CONFIRMED,
+};
+
+
+/*
+ * 5.3.1.3 SCTP_REMOTE_ERROR
+ *
+ * A remote peer may send an Operational Error message to its peer.
+ * This message indicates a variety of error conditions on an
+ * association. The entire error TLV as it appears on the wire is
+ * included in a SCTP_REMOTE_ERROR event. Please refer to the SCTP
+ * specification [SCTP] and any extensions for a list of possible
+ * error formats. SCTP error TLVs have the format:
+ */
+struct sctp_remote_error {
+ __u16 sre_type;
+ __u16 sre_flags;
+ __u32 sre_length;
+ __be16 sre_error;
+ sctp_assoc_t sre_assoc_id;
+ __u8 sre_data[0];
+};
+
+
+/*
+ * 5.3.1.4 SCTP_SEND_FAILED
+ *
+ * If SCTP cannot deliver a message it may return the message as a
+ * notification.
+ */
+struct sctp_send_failed {
+ __u16 ssf_type;
+ __u16 ssf_flags;
+ __u32 ssf_length;
+ __u32 ssf_error;
+ struct sctp_sndrcvinfo ssf_info;
+ sctp_assoc_t ssf_assoc_id;
+ __u8 ssf_data[0];
+};
+
+/*
+ * ssf_flags: 16 bits (unsigned integer)
+ *
+ * The flag value will take one of the following values
+ *
+ * SCTP_DATA_UNSENT - Indicates that the data was never put on
+ * the wire.
+ *
+ * SCTP_DATA_SENT - Indicates that the data was put on the wire.
+ * Note that this does not necessarily mean that the
+ * data was (or was not) successfully delivered.
+ */
+enum sctp_ssf_flags {
+ SCTP_DATA_UNSENT,
+ SCTP_DATA_SENT,
+};
+
+/*
+ * 5.3.1.5 SCTP_SHUTDOWN_EVENT
+ *
+ * When a peer sends a SHUTDOWN, SCTP delivers this notification to
+ * inform the application that it should cease sending data.
+ */
+struct sctp_shutdown_event {
+ __u16 sse_type;
+ __u16 sse_flags;
+ __u32 sse_length;
+ sctp_assoc_t sse_assoc_id;
+};
+
+/*
+ * 5.3.1.6 SCTP_ADAPTATION_INDICATION
+ *
+ * When a peer sends a Adaptation Layer Indication parameter , SCTP
+ * delivers this notification to inform the application
+ * that of the peers requested adaptation layer.
+ */
+struct sctp_adaptation_event {
+ __u16 sai_type;
+ __u16 sai_flags;
+ __u32 sai_length;
+ __u32 sai_adaptation_ind;
+ sctp_assoc_t sai_assoc_id;
+};
+
+/*
+ * 5.3.1.7 SCTP_PARTIAL_DELIVERY_EVENT
+ *
+ * When a receiver is engaged in a partial delivery of a
+ * message this notification will be used to indicate
+ * various events.
+ */
+struct sctp_pdapi_event {
+ __u16 pdapi_type;
+ __u16 pdapi_flags;
+ __u32 pdapi_length;
+ __u32 pdapi_indication;
+ sctp_assoc_t pdapi_assoc_id;
+};
+
+enum { SCTP_PARTIAL_DELIVERY_ABORTED=0, };
+
+/*
+ * 5.3.1.8. SCTP_AUTHENTICATION_EVENT
+ *
+ * When a receiver is using authentication this message will provide
+ * notifications regarding new keys being made active as well as errors.
+ */
+struct sctp_authkey_event {
+ __u16 auth_type;
+ __u16 auth_flags;
+ __u32 auth_length;
+ __u16 auth_keynumber;
+ __u16 auth_altkeynumber;
+ __u32 auth_indication;
+ sctp_assoc_t auth_assoc_id;
+};
+
+enum { SCTP_AUTH_NEWKEY = 0, };
+
+/*
+ * 6.1.9. SCTP_SENDER_DRY_EVENT
+ *
+ * When the SCTP stack has no more user data to send or retransmit, this
+ * notification is given to the user. Also, at the time when a user app
+ * subscribes to this event, if there is no data to be sent or
+ * retransmit, the stack will immediately send up this notification.
+ */
+struct sctp_sender_dry_event {
+ __u16 sender_dry_type;
+ __u16 sender_dry_flags;
+ __u32 sender_dry_length;
+ sctp_assoc_t sender_dry_assoc_id;
+};
+
+#define SCTP_STREAM_RESET_INCOMING_SSN 0x0001
+#define SCTP_STREAM_RESET_OUTGOING_SSN 0x0002
+#define SCTP_STREAM_RESET_DENIED 0x0004
+#define SCTP_STREAM_RESET_FAILED 0x0008
+struct sctp_stream_reset_event {
+ __u16 strreset_type;
+ __u16 strreset_flags;
+ __u32 strreset_length;
+ sctp_assoc_t strreset_assoc_id;
+ __u16 strreset_stream_list[];
+};
+
+#define SCTP_ASSOC_RESET_DENIED 0x0004
+#define SCTP_ASSOC_RESET_FAILED 0x0008
+struct sctp_assoc_reset_event {
+ __u16 assocreset_type;
+ __u16 assocreset_flags;
+ __u32 assocreset_length;
+ sctp_assoc_t assocreset_assoc_id;
+ __u32 assocreset_local_tsn;
+ __u32 assocreset_remote_tsn;
+};
+
+#define SCTP_ASSOC_CHANGE_DENIED 0x0004
+#define SCTP_ASSOC_CHANGE_FAILED 0x0008
+#define SCTP_STREAM_CHANGE_DENIED SCTP_ASSOC_CHANGE_DENIED
+#define SCTP_STREAM_CHANGE_FAILED SCTP_ASSOC_CHANGE_FAILED
+struct sctp_stream_change_event {
+ __u16 strchange_type;
+ __u16 strchange_flags;
+ __u32 strchange_length;
+ sctp_assoc_t strchange_assoc_id;
+ __u16 strchange_instrms;
+ __u16 strchange_outstrms;
+};
+
+/*
+ * Described in Section 7.3
+ * Ancillary Data and Notification Interest Options
+ */
+struct sctp_event_subscribe {
+ __u8 sctp_data_io_event;
+ __u8 sctp_association_event;
+ __u8 sctp_address_event;
+ __u8 sctp_send_failure_event;
+ __u8 sctp_peer_error_event;
+ __u8 sctp_shutdown_event;
+ __u8 sctp_partial_delivery_event;
+ __u8 sctp_adaptation_layer_event;
+ __u8 sctp_authentication_event;
+ __u8 sctp_sender_dry_event;
+ __u8 sctp_stream_reset_event;
+ __u8 sctp_assoc_reset_event;
+ __u8 sctp_stream_change_event;
+};
+
+/*
+ * 5.3.1 SCTP Notification Structure
+ *
+ * The notification structure is defined as the union of all
+ * notification types.
+ *
+ */
+union sctp_notification {
+ struct {
+ __u16 sn_type; /* Notification type. */
+ __u16 sn_flags;
+ __u32 sn_length;
+ } sn_header;
+ struct sctp_assoc_change sn_assoc_change;
+ struct sctp_paddr_change sn_paddr_change;
+ struct sctp_remote_error sn_remote_error;
+ struct sctp_send_failed sn_send_failed;
+ struct sctp_shutdown_event sn_shutdown_event;
+ struct sctp_adaptation_event sn_adaptation_event;
+ struct sctp_pdapi_event sn_pdapi_event;
+ struct sctp_authkey_event sn_authkey_event;
+ struct sctp_sender_dry_event sn_sender_dry_event;
+ struct sctp_stream_reset_event sn_strreset_event;
+ struct sctp_assoc_reset_event sn_assocreset_event;
+ struct sctp_stream_change_event sn_strchange_event;
+};
+
+/* Section 5.3.1
+ * All standard values for sn_type flags are greater than 2^15.
+ * Values from 2^15 and down are reserved.
+ */
+
+enum sctp_sn_type {
+ SCTP_SN_TYPE_BASE = (1<<15),
+ SCTP_ASSOC_CHANGE,
+#define SCTP_ASSOC_CHANGE SCTP_ASSOC_CHANGE
+ SCTP_PEER_ADDR_CHANGE,
+#define SCTP_PEER_ADDR_CHANGE SCTP_PEER_ADDR_CHANGE
+ SCTP_SEND_FAILED,
+#define SCTP_SEND_FAILED SCTP_SEND_FAILED
+ SCTP_REMOTE_ERROR,
+#define SCTP_REMOTE_ERROR SCTP_REMOTE_ERROR
+ SCTP_SHUTDOWN_EVENT,
+#define SCTP_SHUTDOWN_EVENT SCTP_SHUTDOWN_EVENT
+ SCTP_PARTIAL_DELIVERY_EVENT,
+#define SCTP_PARTIAL_DELIVERY_EVENT SCTP_PARTIAL_DELIVERY_EVENT
+ SCTP_ADAPTATION_INDICATION,
+#define SCTP_ADAPTATION_INDICATION SCTP_ADAPTATION_INDICATION
+ SCTP_AUTHENTICATION_EVENT,
+#define SCTP_AUTHENTICATION_INDICATION SCTP_AUTHENTICATION_EVENT
+ SCTP_SENDER_DRY_EVENT,
+#define SCTP_SENDER_DRY_EVENT SCTP_SENDER_DRY_EVENT
+ SCTP_STREAM_RESET_EVENT,
+#define SCTP_STREAM_RESET_EVENT SCTP_STREAM_RESET_EVENT
+ SCTP_ASSOC_RESET_EVENT,
+#define SCTP_ASSOC_RESET_EVENT SCTP_ASSOC_RESET_EVENT
+ SCTP_STREAM_CHANGE_EVENT,
+#define SCTP_STREAM_CHANGE_EVENT SCTP_STREAM_CHANGE_EVENT
+};
+
+/* Notification error codes used to fill up the error fields in some
+ * notifications.
+ * SCTP_PEER_ADDRESS_CHAGE : spc_error
+ * SCTP_ASSOC_CHANGE : sac_error
+ * These names should be potentially included in the draft 04 of the SCTP
+ * sockets API specification.
+ */
+typedef enum sctp_sn_error {
+ SCTP_FAILED_THRESHOLD,
+ SCTP_RECEIVED_SACK,
+ SCTP_HEARTBEAT_SUCCESS,
+ SCTP_RESPONSE_TO_USER_REQ,
+ SCTP_INTERNAL_ERROR,
+ SCTP_SHUTDOWN_GUARD_EXPIRES,
+ SCTP_PEER_FAULTY,
+} sctp_sn_error_t;
+
+/*
+ * 7.1.1 Retransmission Timeout Parameters (SCTP_RTOINFO)
+ *
+ * The protocol parameters used to initialize and bound retransmission
+ * timeout (RTO) are tunable. See [SCTP] for more information on how
+ * these parameters are used in RTO calculation.
+ */
+struct sctp_rtoinfo {
+ sctp_assoc_t srto_assoc_id;
+ __u32 srto_initial;
+ __u32 srto_max;
+ __u32 srto_min;
+};
+
+/*
+ * 7.1.2 Association Parameters (SCTP_ASSOCINFO)
+ *
+ * This option is used to both examine and set various association and
+ * endpoint parameters.
+ */
+struct sctp_assocparams {
+ sctp_assoc_t sasoc_assoc_id;
+ __u16 sasoc_asocmaxrxt;
+ __u16 sasoc_number_peer_destinations;
+ __u32 sasoc_peer_rwnd;
+ __u32 sasoc_local_rwnd;
+ __u32 sasoc_cookie_life;
+};
+
+/*
+ * 7.1.9 Set Peer Primary Address (SCTP_SET_PEER_PRIMARY_ADDR)
+ *
+ * Requests that the peer mark the enclosed address as the association
+ * primary. The enclosed address must be one of the association's
+ * locally bound addresses. The following structure is used to make a
+ * set primary request:
+ */
+struct sctp_setpeerprim {
+ sctp_assoc_t sspp_assoc_id;
+ struct sockaddr_storage sspp_addr;
+} __attribute__((packed, aligned(4)));
+
+/*
+ * 7.1.10 Set Primary Address (SCTP_PRIMARY_ADDR)
+ *
+ * Requests that the local SCTP stack use the enclosed peer address as
+ * the association primary. The enclosed address must be one of the
+ * association peer's addresses. The following structure is used to
+ * make a set peer primary request:
+ */
+struct sctp_prim {
+ sctp_assoc_t ssp_assoc_id;
+ struct sockaddr_storage ssp_addr;
+} __attribute__((packed, aligned(4)));
+
+/* For backward compatibility use, define the old name too */
+#define sctp_setprim sctp_prim
+
+/*
+ * 7.1.11 Set Adaptation Layer Indicator (SCTP_ADAPTATION_LAYER)
+ *
+ * Requests that the local endpoint set the specified Adaptation Layer
+ * Indication parameter for all future INIT and INIT-ACK exchanges.
+ */
+struct sctp_setadaptation {
+ __u32 ssb_adaptation_ind;
+};
+
+/*
+ * 7.1.13 Peer Address Parameters (SCTP_PEER_ADDR_PARAMS)
+ *
+ * Applications can enable or disable heartbeats for any peer address
+ * of an association, modify an address's heartbeat interval, force a
+ * heartbeat to be sent immediately, and adjust the address's maximum
+ * number of retransmissions sent before an address is considered
+ * unreachable. The following structure is used to access and modify an
+ * address's parameters:
+ */
+enum sctp_spp_flags {
+ SPP_HB_ENABLE = 1<<0, /*Enable heartbeats*/
+ SPP_HB_DISABLE = 1<<1, /*Disable heartbeats*/
+ SPP_HB = SPP_HB_ENABLE | SPP_HB_DISABLE,
+ SPP_HB_DEMAND = 1<<2, /*Send heartbeat immediately*/
+ SPP_PMTUD_ENABLE = 1<<3, /*Enable PMTU discovery*/
+ SPP_PMTUD_DISABLE = 1<<4, /*Disable PMTU discovery*/
+ SPP_PMTUD = SPP_PMTUD_ENABLE | SPP_PMTUD_DISABLE,
+ SPP_SACKDELAY_ENABLE = 1<<5, /*Enable SACK*/
+ SPP_SACKDELAY_DISABLE = 1<<6, /*Disable SACK*/
+ SPP_SACKDELAY = SPP_SACKDELAY_ENABLE | SPP_SACKDELAY_DISABLE,
+ SPP_HB_TIME_IS_ZERO = 1<<7, /* Set HB delay to 0 */
+};
+
+struct sctp_paddrparams {
+ sctp_assoc_t spp_assoc_id;
+ struct sockaddr_storage spp_address;
+ __u32 spp_hbinterval;
+ __u16 spp_pathmaxrxt;
+ __u32 spp_pathmtu;
+ __u32 spp_sackdelay;
+ __u32 spp_flags;
+} __attribute__((packed, aligned(4)));
+
+/*
+ * 7.1.18. Add a chunk that must be authenticated (SCTP_AUTH_CHUNK)
+ *
+ * This set option adds a chunk type that the user is requesting to be
+ * received only in an authenticated way. Changes to the list of chunks
+ * will only effect future associations on the socket.
+ */
+struct sctp_authchunk {
+ __u8 sauth_chunk;
+};
+
+/*
+ * 7.1.19. Get or set the list of supported HMAC Identifiers (SCTP_HMAC_IDENT)
+ *
+ * This option gets or sets the list of HMAC algorithms that the local
+ * endpoint requires the peer to use.
+ */
+/* This here is only used by user space as is. It might not be a good idea
+ * to export/reveal the whole structure with reserved fields etc.
+ */
+enum {
+ SCTP_AUTH_HMAC_ID_SHA1 = 1,
+ SCTP_AUTH_HMAC_ID_SHA256 = 3,
+};
+
+struct sctp_hmacalgo {
+ __u32 shmac_num_idents;
+ __u16 shmac_idents[];
+};
+
+/* Sadly, user and kernel space have different names for
+ * this structure member, so this is to not break anything.
+ */
+#define shmac_number_of_idents shmac_num_idents
+
+/*
+ * 7.1.20. Set a shared key (SCTP_AUTH_KEY)
+ *
+ * This option will set a shared secret key which is used to build an
+ * association shared key.
+ */
+struct sctp_authkey {
+ sctp_assoc_t sca_assoc_id;
+ __u16 sca_keynumber;
+ __u16 sca_keylength;
+ __u8 sca_key[];
+};
+
+/*
+ * 7.1.21. Get or set the active shared key (SCTP_AUTH_ACTIVE_KEY)
+ *
+ * This option will get or set the active shared key to be used to build
+ * the association shared key.
+ */
+
+struct sctp_authkeyid {
+ sctp_assoc_t scact_assoc_id;
+ __u16 scact_keynumber;
+};
+
+
+/*
+ * 7.1.23. Get or set delayed ack timer (SCTP_DELAYED_SACK)
+ *
+ * This option will effect the way delayed acks are performed. This
+ * option allows you to get or set the delayed ack time, in
+ * milliseconds. It also allows changing the delayed ack frequency.
+ * Changing the frequency to 1 disables the delayed sack algorithm. If
+ * the assoc_id is 0, then this sets or gets the endpoints default
+ * values. If the assoc_id field is non-zero, then the set or get
+ * effects the specified association for the one to many model (the
+ * assoc_id field is ignored by the one to one model). Note that if
+ * sack_delay or sack_freq are 0 when setting this option, then the
+ * current values will remain unchanged.
+ */
+struct sctp_sack_info {
+ sctp_assoc_t sack_assoc_id;
+ uint32_t sack_delay;
+ uint32_t sack_freq;
+};
+
+struct sctp_assoc_value {
+ sctp_assoc_t assoc_id;
+ uint32_t assoc_value;
+};
+
+struct sctp_stream_value {
+ sctp_assoc_t assoc_id;
+ uint16_t stream_id;
+ uint16_t stream_value;
+};
+
+/*
+ * 7.2.2 Peer Address Information
+ *
+ * Applications can retrieve information about a specific peer address
+ * of an association, including its reachability state, congestion
+ * window, and retransmission timer values. This information is
+ * read-only. The following structure is used to access this
+ * information:
+ */
+struct sctp_paddrinfo {
+ sctp_assoc_t spinfo_assoc_id;
+ struct sockaddr_storage spinfo_address;
+ __s32 spinfo_state;
+ __u32 spinfo_cwnd;
+ __u32 spinfo_srtt;
+ __u32 spinfo_rto;
+ __u32 spinfo_mtu;
+} __attribute__((packed, aligned(4)));
+
+/* Peer addresses's state. */
+/* UNKNOWN: Peer address passed by the upper layer in sendmsg or connect[x]
+ * calls.
+ * UNCONFIRMED: Peer address received in INIT/INIT-ACK address parameters.
+ * Not yet confirmed by a heartbeat and not available for data
+ * transfers.
+ * ACTIVE : Peer address confirmed, active and available for data transfers.
+ * INACTIVE: Peer address inactive and not available for data transfers.
+ */
+enum sctp_spinfo_state {
+ SCTP_INACTIVE,
+ SCTP_PF,
+ SCTP_ACTIVE,
+ SCTP_UNCONFIRMED,
+ SCTP_UNKNOWN = 0xffff /* Value used for transport state unknown */
+};
+
+/*
+ * 7.2.1 Association Status (SCTP_STATUS)
+ *
+ * Applications can retrieve current status information about an
+ * association, including association state, peer receiver window size,
+ * number of unacked data chunks, and number of data chunks pending
+ * receipt. This information is read-only. The following structure is
+ * used to access this information:
+ */
+struct sctp_status {
+ sctp_assoc_t sstat_assoc_id;
+ __s32 sstat_state;
+ __u32 sstat_rwnd;
+ __u16 sstat_unackdata;
+ __u16 sstat_penddata;
+ __u16 sstat_instrms;
+ __u16 sstat_outstrms;
+ __u32 sstat_fragmentation_point;
+ struct sctp_paddrinfo sstat_primary;
+};
+
+/*
+ * 7.2.3. Get the list of chunks the peer requires to be authenticated
+ * (SCTP_PEER_AUTH_CHUNKS)
+ *
+ * This option gets a list of chunks for a specified association that
+ * the peer requires to be received authenticated only.
+ */
+struct sctp_authchunks {
+ sctp_assoc_t gauth_assoc_id;
+ __u32 gauth_number_of_chunks;
+ uint8_t gauth_chunks[];
+};
+
+/* The broken spelling has been released already in lksctp-tools header,
+ * so don't break anyone, now that it's fixed.
+ */
+#define guth_number_of_chunks gauth_number_of_chunks
+
+/* Association states. */
+enum sctp_sstat_state {
+ SCTP_EMPTY = 0,
+ SCTP_CLOSED = 1,
+ SCTP_COOKIE_WAIT = 2,
+ SCTP_COOKIE_ECHOED = 3,
+ SCTP_ESTABLISHED = 4,
+ SCTP_SHUTDOWN_PENDING = 5,
+ SCTP_SHUTDOWN_SENT = 6,
+ SCTP_SHUTDOWN_RECEIVED = 7,
+ SCTP_SHUTDOWN_ACK_SENT = 8,
+};
+
+/*
+ * 8.2.6. Get the Current Identifiers of Associations
+ * (SCTP_GET_ASSOC_ID_LIST)
+ *
+ * This option gets the current list of SCTP association identifiers of
+ * the SCTP associations handled by a one-to-many style socket.
+ */
+struct sctp_assoc_ids {
+ __u32 gaids_number_of_ids;
+ sctp_assoc_t gaids_assoc_id[];
+};
+
+/*
+ * 8.3, 8.5 get all peer/local addresses in an association.
+ * This parameter struct is used by SCTP_GET_PEER_ADDRS and
+ * SCTP_GET_LOCAL_ADDRS socket options used internally to implement
+ * sctp_getpaddrs() and sctp_getladdrs() API.
+ */
+struct sctp_getaddrs_old {
+ sctp_assoc_t assoc_id;
+ int addr_num;
+ struct sockaddr *addrs;
+};
+
+struct sctp_getaddrs {
+ sctp_assoc_t assoc_id; /*input*/
+ __u32 addr_num; /*output*/
+ __u8 addrs[0]; /*output, variable size*/
+};
+
+/* A socket user request obtained via SCTP_GET_ASSOC_STATS that retrieves
+ * association stats. All stats are counts except sas_maxrto and
+ * sas_obs_rto_ipaddr. maxrto is the max observed rto + transport since
+ * the last call. Will return 0 when RTO was not update since last call
+ */
+struct sctp_assoc_stats {
+ sctp_assoc_t sas_assoc_id; /* Input */
+ /* Transport of observed max RTO */
+ struct sockaddr_storage sas_obs_rto_ipaddr;
+ __u64 sas_maxrto; /* Maximum Observed RTO for period */
+ __u64 sas_isacks; /* SACKs received */
+ __u64 sas_osacks; /* SACKs sent */
+ __u64 sas_opackets; /* Packets sent */
+ __u64 sas_ipackets; /* Packets received */
+ __u64 sas_rtxchunks; /* Retransmitted Chunks */
+ __u64 sas_outofseqtsns;/* TSN received > next expected */
+ __u64 sas_idupchunks; /* Dups received (ordered+unordered) */
+ __u64 sas_gapcnt; /* Gap Acknowledgements Received */
+ __u64 sas_ouodchunks; /* Unordered data chunks sent */
+ __u64 sas_iuodchunks; /* Unordered data chunks received */
+ __u64 sas_oodchunks; /* Ordered data chunks sent */
+ __u64 sas_iodchunks; /* Ordered data chunks received */
+ __u64 sas_octrlchunks; /* Control chunks sent */
+ __u64 sas_ictrlchunks; /* Control chunks received */
+};
+
+/*
+ * 8.1 sctp_bindx()
+ *
+ * The flags parameter is formed from the bitwise OR of zero or more of the
+ * following currently defined flags:
+ */
+#define SCTP_BINDX_ADD_ADDR 0x01
+#define SCTP_BINDX_REM_ADDR 0x02
+
+/* This is the structure that is passed as an argument(optval) to
+ * getsockopt(SCTP_SOCKOPT_PEELOFF).
+ */
+typedef struct {
+ sctp_assoc_t associd;
+ int sd;
+} sctp_peeloff_arg_t;
+
+typedef struct {
+ sctp_peeloff_arg_t p_arg;
+ unsigned flags;
+} sctp_peeloff_flags_arg_t;
+
+/*
+ * Peer Address Thresholds socket option
+ */
+struct sctp_paddrthlds {
+ sctp_assoc_t spt_assoc_id;
+ struct sockaddr_storage spt_address;
+ __u16 spt_pathmaxrxt;
+ __u16 spt_pathpfthld;
+};
+
+/*
+ * Socket Option for Getting the Association/Stream-Specific PR-SCTP Status
+ */
+struct sctp_prstatus {
+ sctp_assoc_t sprstat_assoc_id;
+ __u16 sprstat_sid;
+ __u16 sprstat_policy;
+ __u64 sprstat_abandoned_unsent;
+ __u64 sprstat_abandoned_sent;
+};
+
+struct sctp_default_prinfo {
+ sctp_assoc_t pr_assoc_id;
+ __u32 pr_value;
+ __u16 pr_policy;
+};
+
+struct sctp_info {
+ __u32 sctpi_tag;
+ __u32 sctpi_state;
+ __u32 sctpi_rwnd;
+ __u16 sctpi_unackdata;
+ __u16 sctpi_penddata;
+ __u16 sctpi_instrms;
+ __u16 sctpi_outstrms;
+ __u32 sctpi_fragmentation_point;
+ __u32 sctpi_inqueue;
+ __u32 sctpi_outqueue;
+ __u32 sctpi_overall_error;
+ __u32 sctpi_max_burst;
+ __u32 sctpi_maxseg;
+ __u32 sctpi_peer_rwnd;
+ __u32 sctpi_peer_tag;
+ __u8 sctpi_peer_capable;
+ __u8 sctpi_peer_sack;
+ __u16 __reserved1;
+
+ /* assoc status info */
+ __u64 sctpi_isacks;
+ __u64 sctpi_osacks;
+ __u64 sctpi_opackets;
+ __u64 sctpi_ipackets;
+ __u64 sctpi_rtxchunks;
+ __u64 sctpi_outofseqtsns;
+ __u64 sctpi_idupchunks;
+ __u64 sctpi_gapcnt;
+ __u64 sctpi_ouodchunks;
+ __u64 sctpi_iuodchunks;
+ __u64 sctpi_oodchunks;
+ __u64 sctpi_iodchunks;
+ __u64 sctpi_octrlchunks;
+ __u64 sctpi_ictrlchunks;
+
+ /* primary transport info */
+ struct sockaddr_storage sctpi_p_address;
+ __s32 sctpi_p_state;
+ __u32 sctpi_p_cwnd;
+ __u32 sctpi_p_srtt;
+ __u32 sctpi_p_rto;
+ __u32 sctpi_p_hbinterval;
+ __u32 sctpi_p_pathmaxrxt;
+ __u32 sctpi_p_sackdelay;
+ __u32 sctpi_p_sackfreq;
+ __u32 sctpi_p_ssthresh;
+ __u32 sctpi_p_partial_bytes_acked;
+ __u32 sctpi_p_flight_size;
+ __u16 sctpi_p_error;
+ __u16 __reserved2;
+
+ /* sctp sock info */
+ __u32 sctpi_s_autoclose;
+ __u32 sctpi_s_adaptation_ind;
+ __u32 sctpi_s_pd_point;
+ __u8 sctpi_s_nodelay;
+ __u8 sctpi_s_disable_fragments;
+ __u8 sctpi_s_v4mapped;
+ __u8 sctpi_s_frag_interleave;
+ __u32 sctpi_s_type;
+ __u32 __reserved3;
+};
+
+struct sctp_reset_streams {
+ sctp_assoc_t srs_assoc_id;
+ uint16_t srs_flags;
+ uint16_t srs_number_streams; /* 0 == ALL */
+ uint16_t srs_stream_list[]; /* list if srs_num_streams is not 0 */
+};
+
+struct sctp_add_streams {
+ sctp_assoc_t sas_assoc_id;
+ uint16_t sas_instrms;
+ uint16_t sas_outstrms;
+};
+
+/* SCTP Stream schedulers */
+enum sctp_sched_type {
+ SCTP_SS_FCFS,
+ SCTP_SS_DEFAULT = SCTP_SS_FCFS,
+ SCTP_SS_PRIO,
+ SCTP_SS_RR,
+ SCTP_SS_MAX = SCTP_SS_RR
+};
+
+#endif /* _SCTP_H */
diff --git a/third_party/lksctp-tools/BUILD b/third_party/lksctp-tools/BUILD
new file mode 100644
index 0000000..b127b77
--- /dev/null
+++ b/third_party/lksctp-tools/BUILD
@@ -0,0 +1,35 @@
+licenses(["notice"])
+
+genrule(
+ name = "sctp_copy",
+ srcs = [
+ "src/include/netinet/sctp.h.in",
+ ],
+ outs = [
+ "src/include/netinet/sctp.h",
+ ],
+ cmd = "cp $< $@",
+)
+
+cc_library(
+ name = "sctp",
+ srcs = [
+ "src/lib/addrs.c",
+ "src/lib/bindx.c",
+ "src/lib/opt_info.c",
+ "src/lib/peeloff.c",
+ "src/lib/recvmsg.c",
+ "src/lib/sendmsg.c",
+ ],
+ hdrs = [
+ "src/include/netinet/sctp.h",
+ ],
+ copts = [
+ "-Wno-cast-align",
+ "-Wno-cast-qual",
+ ],
+ includes = [
+ "src/include",
+ ],
+ visibility = ["//visibility:public"],
+)
diff --git a/third_party/lksctp-tools/src/include/netinet/sctp.h.in b/third_party/lksctp-tools/src/include/netinet/sctp.h.in
index 2009f1c..c1d4d2c 100644
--- a/third_party/lksctp-tools/src/include/netinet/sctp.h.in
+++ b/third_party/lksctp-tools/src/include/netinet/sctp.h.in
@@ -60,15 +60,17 @@
#define HAVE_SCTP_ADDIP
#define HAVE_SCTP_CANSET_PRIMARY
-#undef HAVE_SCTP_STREAM_RESET_EVENT
-#undef HAVE_SCTP_STREAM_RECONFIG
-#undef HAVE_SCTP_PEELOFF_FLAGS
-#undef HAVE_SCTP_PDAPI_EVENT_PDAPI_STREAM
-#undef HAVE_SCTP_PDAPI_EVENT_PDAPI_SEQ
-#undef HAVE_SCTP_SENDV
-#undef HAVE_SCTP_AUTH_NO_AUTH
-#undef HAVE_SCTP_SPP_IPV6_FLOWLABEL
-#undef HAVE_SCTP_SPP_DSCP
+#define HAVE_SCTP_STREAM_RESET_EVENT 1
+/* #undef HAVE_SCTP_ASSOC_RESET_EVENT */
+/* #undef HAVE_SCTP_STREAM_CHANGE_EVENT */
+#define HAVE_SCTP_STREAM_RECONFIG 1
+/* #undef HAVE_SCTP_PEELOFF_FLAGS */
+#define HAVE_SCTP_PDAPI_EVENT_PDAPI_STREAM 1
+#define HAVE_SCTP_PDAPI_EVENT_PDAPI_SEQ 1
+/* #undef HAVE_SCTP_SENDV */
+/* #undef HAVE_SCTP_AUTH_NO_AUTH */
+/* #undef HAVE_SCTP_SPP_IPV6_FLOWLABEL */
+/* #undef HAVE_SCTP_SPP_DSCP */
int sctp_bindx(int sd, struct sockaddr *addrs, int addrcnt, int flags);
diff --git a/tools/cpp/CROSSTOOL b/tools/cpp/CROSSTOOL
index 39f29d8..20f47e3 100644
--- a/tools/cpp/CROSSTOOL
+++ b/tools/cpp/CROSSTOOL
@@ -932,6 +932,8 @@
compiler_flag: "external/linaro_linux_gcc_repo/include/c++/7.4.1"
compiler_flag: "-isystem"
compiler_flag: "external/linaro_linux_gcc_repo/arm-linux-gnueabihf/libc/usr/include"
+ compiler_flag: "-isystem"
+ compiler_flag: "external/org_frc971/third_party"
compiler_flag: "-D__STDC_FORMAT_MACROS"
compiler_flag: "-D__STDC_CONSTANT_MACROS"
compiler_flag: "-D__STDC_LIMIT_MACROS"