Switch web_proxy over to rawrtc
In theory, nothing else should change... This will make it possible to
run the web proxy and web plotter in applications which use absl among
other things.
Future work should involve reducing a copy (we copy an extra time when
buffering), and making web_proxy not spin at 100% CPU when it is done by
reshuffling how the event loop works a bit better.
Change-Id: Iab6b6b278c5e830bb71e107824b364dfe7cad46d
diff --git a/WORKSPACE b/WORKSPACE
index 1b0c85c..6118a37 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -657,27 +657,6 @@
)
http_archive(
- name = "webrtc_x64",
- build_file = "@//debian:webrtc.BUILD",
- sha256 = "bd212b2a112a043d08d27f49027091788fa01c7c2ac5f072d096c17d9dbd976f",
- url = "https://www.frc971.org/Build-Dependencies/webrtc-30326-1a68679-linux-x64.tar.gz",
-)
-
-http_archive(
- name = "webrtc_arm",
- build_file = "@//debian:webrtc.BUILD",
- sha256 = "c34badaf313877cd03a0dfd6b71de024d806a7652550a7f1cd7dea523a7c813d",
- url = "https://www.frc971.org/Build-Dependencies/webrtc-30326-1a68679-linux-arm.tar.gz",
-)
-
-http_archive(
- name = "webrtc_rio",
- build_file = "@//debian:webrtc.BUILD",
- sha256 = "d86d3b030099b35ae5ea31c807fb4d0b0352598e79f1ea84877e5504e185faa8",
- url = "https://www.frc971.org/Build-Dependencies/webrtc-30376-4c4735b-linux-rio.tar.gz",
-)
-
-http_archive(
name = "build_bazel_rules_nodejs",
sha256 = "0d9660cf0894f1fe1e9840818553e0080fbce0851169812d77a70bdb9981c946",
urls = ["https://www.frc971.org/Build-Dependencies/rules_nodejs-0.37.0.tar.gz"],
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 42e115b..56adbe1 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -412,8 +412,14 @@
cc_library(
name = "web_proxy",
- srcs = ["web_proxy.cc"],
- hdrs = ["web_proxy.h"],
+ srcs = [
+ "rawrtc.cc",
+ "web_proxy.cc",
+ ],
+ hdrs = [
+ "rawrtc.h",
+ "web_proxy.h",
+ ],
copts = [
"-DWEBRTC_POSIX",
],
@@ -426,9 +432,9 @@
"//aos/events:shm_event_loop",
"//aos/mutex",
"//aos/seasocks:seasocks_logger",
- "//third_party:webrtc",
"//third_party/seasocks",
"@com_github_google_glog//:glog",
+ "@com_github_rawrtc_rawrtc//:rawrtc",
],
)
diff --git a/aos/network/rawrtc.cc b/aos/network/rawrtc.cc
new file mode 100644
index 0000000..a92b408
--- /dev/null
+++ b/aos/network/rawrtc.cc
@@ -0,0 +1,283 @@
+#include "aos/network/rawrtc.h"
+
+extern "C" {
+#include <rawrtc.h>
+
+#include "external/com_github_rawrtc_rawrtc_common/include/rawrtcc/utils.h"
+}
+
+#include <functional>
+#include <string>
+
+#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace web_proxy {
+namespace {
+enum {
+ TRANSPORT_BUFFER_LENGTH = 1048576, // 1 MiB
+};
+}
+
+ScopedDataChannel::ScopedDataChannel() {}
+
+void ScopedDataChannel::Open(struct rawrtc_peer_connection *connection,
+ const std::string &label) {
+ label_ = label;
+ VLOG(1) << "(" << this << ") Opening " << label_;
+ struct rawrtc_data_channel_parameters *channel_parameters;
+ // Create data channel parameters
+ // TODO(austin): TYPE?
+ CHECK_RAWRTC(rawrtc_data_channel_parameters_create(
+ &channel_parameters, label.c_str(),
+ RAWRTC_DATA_CHANNEL_TYPE_RELIABLE_ORDERED, 0, NULL, false, 0));
+
+ // Create data channel
+ CHECK_RAWRTC(rawrtc_peer_connection_create_data_channel(
+ &data_channel_, connection, channel_parameters,
+ StaticDataChannelOpenHandler, StaticBufferedAmountLowHandler,
+ StaticDataChannelErrorHandler, StaticDataChannelCloseHandler,
+ StaticDataChannelMessageHandler, this));
+
+ // Un-reference data channel parameters
+ mem_deref(channel_parameters);
+}
+
+void ScopedDataChannel::Open(struct rawrtc_data_channel *const channel) {
+ struct rawrtc_data_channel_parameters *parameters;
+ enum rawrtc_code const ignore[] = {RAWRTC_CODE_NO_VALUE};
+ char *label = NULL;
+
+ // Get data channel label and protocol
+ CHECK_RAWRTC(rawrtc_data_channel_get_parameters(¶meters, channel));
+ CHECK_RAWRTC_IGNORE(
+ rawrtc_data_channel_parameters_get_label(&label, parameters), ignore);
+ if (label) {
+ label_ = label;
+ mem_deref(label);
+ }
+ mem_deref(parameters);
+
+ VLOG(1) << "(" << this << ") New data channel instance: " << label_;
+
+ mem_ref(channel);
+ data_channel_ = channel;
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_arg(data_channel_, this));
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_open_handler(
+ data_channel_, StaticDataChannelOpenHandler));
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_buffered_amount_low_handler(
+ data_channel_, StaticBufferedAmountLowHandler));
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_error_handler(
+ data_channel_, StaticDataChannelErrorHandler));
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_close_handler(
+ data_channel_, StaticDataChannelCloseHandler));
+
+ CHECK_RAWRTC(rawrtc_data_channel_set_message_handler(
+ data_channel_, StaticDataChannelMessageHandler));
+}
+
+ScopedDataChannel::~ScopedDataChannel() {
+ CHECK(opened_);
+ CHECK(closed_);
+ CHECK(data_channel_ == nullptr)
+ << ": Destroying open data channel " << this << ".";
+}
+
+void ScopedDataChannel::StaticDataChannelOpenHandler(void *const arg) {
+ ScopedDataChannel *const client = reinterpret_cast<ScopedDataChannel *>(arg);
+ CHECK(!client->opened_);
+ CHECK(!client->closed_);
+ if (client->on_open_) client->on_open_();
+ client->opened_ = true;
+}
+
+void ScopedDataChannel::StaticBufferedAmountLowHandler(void *const arg) {
+ ScopedDataChannel *const client = reinterpret_cast<ScopedDataChannel *>(arg);
+ if (client->on_buffered_amount_low_) client->on_buffered_amount_low_();
+}
+
+void ScopedDataChannel::StaticDataChannelErrorHandler(void *const arg) {
+ ScopedDataChannel *const client = reinterpret_cast<ScopedDataChannel *>(arg);
+ if (client->on_error_) client->on_error_();
+}
+
+void ScopedDataChannel::StaticDataChannelCloseHandler(void *const arg) {
+ ScopedDataChannel *const client = reinterpret_cast<ScopedDataChannel *>(arg);
+ CHECK(client->opened_);
+ CHECK(!client->closed_);
+ // Close() assumes that this method will do the final cleanup. The destructor
+ // CHECKs that.
+ client->closed_ = true;
+ struct rawrtc_data_channel *data_channel = client->data_channel_;
+ client->data_channel_ = nullptr;
+ if (client->on_close_) {
+ // Take the function so we can call it without referencing client.
+ // This could destroy the client when the function is deleted by releasing
+ // any shared_ptrs.
+ std::function<void()> on_close = std::move(client->on_close_);
+ on_close();
+ }
+ mem_deref(data_channel);
+}
+
+void ScopedDataChannel::StaticDataChannelMessageHandler(
+ struct mbuf *const
+ buffer, // nullable (in case partial delivery has been requested)
+ enum rawrtc_data_channel_message_flag const flags, void *const arg) {
+ ScopedDataChannel *const client = reinterpret_cast<ScopedDataChannel *>(arg);
+ if (client->on_message_) client->on_message_(buffer, flags);
+}
+
+void ScopedDataChannel::Close() {
+ CHECK(opened_);
+ CHECK(!closed_);
+ CHECK_RAWRTC(rawrtc_data_channel_close(data_channel_));
+}
+
+void ScopedDataChannel::Send(const ::flatbuffers::DetachedBuffer &buffer) {
+ struct mbuf *mbuffer = mbuf_alloc(buffer.size());
+ mbuf_write_mem(mbuffer, buffer.data(), buffer.size());
+ mbuf_set_pos(mbuffer, 0);
+
+ Send(mbuffer);
+
+ mem_deref(mbuffer);
+}
+
+void ScopedDataChannel::Send(struct mbuf *buffer) {
+ // TODO(austin): Checking isn't right, handle errors more gracefully.
+ CHECK_RAWRTC(
+ rawrtc_data_channel_send(CHECK_NOTNULL(data_channel_), buffer, true));
+}
+
+uint64_t ScopedDataChannel::buffered_amount() {
+ return 0;
+
+ // TODO(austin): Not implemented yet...
+ uint64_t result;
+ CHECK_RAWRTC(rawrtc_data_channel_get_buffered_amount(
+ &result, CHECK_NOTNULL(data_channel_)));
+ return result;
+}
+
+RawRTCConnection::RawRTCConnection() {}
+
+void RawRTCConnection::Open() {
+ const char *const stun_google_com_urls[] = {"stun:stun.l.google.com:19302",
+ "stun:stun1.l.google.com:19302"};
+
+ struct rawrtc_peer_connection_configuration *configuration = nullptr;
+
+ CHECK_RAWRTC(rawrtc_peer_connection_configuration_create(
+ &configuration, RAWRTC_ICE_GATHER_POLICY_ALL));
+
+ // Add ICE servers to configuration
+ CHECK_RAWRTC(rawrtc_peer_connection_configuration_add_ice_server(
+ configuration, stun_google_com_urls, ARRAY_SIZE(stun_google_com_urls),
+ NULL, NULL, RAWRTC_ICE_CREDENTIAL_TYPE_NONE));
+
+ // Set the SCTP transport's buffer length
+ CHECK_RAWRTC(rawrtc_peer_connection_configuration_set_sctp_buffer_length(
+ configuration, TRANSPORT_BUFFER_LENGTH, TRANSPORT_BUFFER_LENGTH));
+
+ // Create peer connection
+ CHECK_RAWRTC(rawrtc_peer_connection_create(
+ &connection_, configuration, StaticNegotiationNeededHandler,
+ StaticLocalCandidateHandler,
+ StaticPeerConnectionLocalCandidateErrorHandler,
+ StaticSignalingStateChangeHandler, StaticIceTransportStateChangeHandler,
+ StaticIceGathererStateChangeHandler, StaticConnectionStateChangeHandler,
+ StaticDataChannelHandler, this));
+
+ mem_deref(configuration);
+}
+
+RawRTCConnection::~RawRTCConnection() {
+ CHECK_RAWRTC(rawrtc_peer_connection_close(connection_));
+ mem_deref(connection_);
+ connection_ = nullptr;
+}
+
+void RawRTCConnection::StaticNegotiationNeededHandler(void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ if (client->on_negotiation_needed_) client->on_negotiation_needed_();
+}
+
+void RawRTCConnection::StaticLocalCandidateHandler(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url, // read-only
+ void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ if (client->on_local_candidate_) client->on_local_candidate_(candidate, url);
+}
+
+void RawRTCConnection::StaticPeerConnectionLocalCandidateErrorHandler(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url, uint16_t const error_code,
+ char const *const error_text, void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ LOG(ERROR) << "(" << client << ") ICE candidate error, URL: " << url
+ << ", reason: " << error_text;
+ if (client->on_peer_connection_local_candidate_error_)
+ client->on_peer_connection_local_candidate_error_(candidate, url,
+ error_code, error_text);
+}
+
+void RawRTCConnection::StaticSignalingStateChangeHandler(
+ const enum rawrtc_signaling_state state, void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ VLOG(1) << "(" << client << ") Signaling state change: "
+ << rawrtc_signaling_state_to_name(state);
+ if (client->on_signaling_state_change_)
+ client->on_signaling_state_change_(state);
+}
+
+void RawRTCConnection::StaticIceTransportStateChangeHandler(
+ const enum rawrtc_ice_transport_state state, void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ VLOG(1) << "(" << client << ") ICE transport state: "
+ << rawrtc_ice_transport_state_to_name(state);
+ if (client->on_ice_transport_state_change_)
+ client->on_ice_transport_state_change_(state);
+}
+
+void RawRTCConnection::StaticIceGathererStateChangeHandler(
+ const enum rawrtc_ice_gatherer_state state, void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ VLOG(1) << "(" << client << ") ICE gatherer state: "
+ << rawrtc_ice_gatherer_state_to_name(state);
+ if (client->on_ice_gatherer_state_change_)
+ client->on_ice_gatherer_state_change_(state);
+}
+
+void RawRTCConnection::StaticConnectionStateChangeHandler(
+ const enum rawrtc_peer_connection_state state, // read-only
+ void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ VLOG(1) << "(" << client << ") Peer connection state change: "
+ << rawrtc_peer_connection_state_to_name(state);
+ if (client->on_connection_state_change_)
+ client->on_connection_state_change_(state);
+}
+
+void RawRTCConnection::StaticDataChannelHandler(
+ struct rawrtc_data_channel
+ *const channel, // read-only, MUST be referenced when used
+ void *const arg) {
+ RawRTCConnection *const client = reinterpret_cast<RawRTCConnection *>(arg);
+ if (client->on_data_channel_) {
+ std::shared_ptr<ScopedDataChannel> new_channel =
+ std::make_shared<ScopedDataChannel>();
+ new_channel->Open(channel);
+ client->on_data_channel_(std::move(new_channel));
+ }
+}
+
+} // namespace web_proxy
+} // namespace aos
diff --git a/aos/network/rawrtc.h b/aos/network/rawrtc.h
new file mode 100644
index 0000000..3f37435
--- /dev/null
+++ b/aos/network/rawrtc.h
@@ -0,0 +1,223 @@
+#ifndef AOS_NETWORK_RAWRTC_H_
+#define AOS_NETWORK_RAWRTC_H_
+
+#include <functional>
+#include <string>
+
+extern "C" {
+#include <rawrtc.h>
+
+#include "external/com_github_rawrtc_rawrtc_common/include/rawrtcc/utils.h"
+}
+
+#include "flatbuffers/flatbuffers.h"
+#include "glog/logging.h"
+
+namespace aos {
+namespace web_proxy {
+
+// TODO(austin): This doesn't allow streaming data in.
+#define CHECK_RAWRTC(x) \
+ [&]() { \
+ enum rawrtc_code r = x; \
+ return CHECK(r == RAWRTC_CODE_SUCCESS) \
+ << " actual " << rawrtc_code_to_str(r); \
+ }()
+
+#define CHECK_RAWRTC_IGNORE(x, i) \
+ [&]() { \
+ enum rawrtc_code r = x; \
+ for (auto w : i) { \
+ if (w == r) return; \
+ } \
+ return CHECK(r == RAWRTC_CODE_SUCCESS); \
+ }()
+
+// Wrapper around a RawRTC data channel to manage it's lifetime and provide C++
+// callbacks for all the callbacks.
+//
+// There are 3 phases of the object's lifetime.
+// 1) Initialization. Callbacks can be set here.
+// 2) Open. Calling Open transitions the channel to be open and triggers the
+// on_open callback to be called.
+// 3) Close. This must be called before destroying the channel and calls the
+// on_close callback and shuts down the channel.
+class ScopedDataChannel {
+ public:
+ ScopedDataChannel();
+ ScopedDataChannel(const ScopedDataChannel &) = delete;
+ ScopedDataChannel &operator=(const ScopedDataChannel &) = delete;
+
+ ~ScopedDataChannel();
+
+ // Setters for all the callbacks. These may be called whenever.
+
+ // Registers a callback to be called when the channel is opened. This only
+ // gets called once during or after Open is called.
+ void set_on_open(std::function<void()> &&fn) { on_open_ = std::move(fn); }
+
+ // Registers a callback to be called when the channel is closed. This only
+ // gets called once during or after Close is called.
+ void set_on_close(std::function<void()> &&fn) { on_close_ = std::move(fn); }
+
+ void set_on_buffered_amount_low(std::function<void()> &&fn) {
+ on_buffered_amount_low_ = std::move(fn);
+ }
+ void set_on_error(std::function<void()> &&fn) { on_error_ = std::move(fn); }
+ void set_on_message(
+ std::function<void(struct mbuf *const,
+ enum rawrtc_data_channel_message_flag const)> &&fn) {
+ on_message_ = std::move(fn);
+ }
+
+ // Opens the channel on the provided connection with the provided label. This
+ // is separate so we can optionally register callbacks before opening.
+ void Open(struct rawrtc_peer_connection *connection,
+ const std::string &label);
+ // Takes over an already open channel.
+ void Open(struct rawrtc_data_channel *channel);
+
+ // Closes the channel. It must be open first.
+ void Close();
+
+ // Sends a buffer.
+ void Send(const ::flatbuffers::DetachedBuffer &buffer);
+ void Send(struct mbuf *buffer);
+
+ std::string_view label() const { return label_; }
+
+ // Returns the amount of data buffered.
+ uint64_t buffered_amount();
+
+ private:
+ // Trampolines from C -> C++.
+ static void StaticDataChannelOpenHandler(void *const arg);
+ static void StaticBufferedAmountLowHandler(void *const arg);
+ static void StaticDataChannelErrorHandler(void *const arg);
+ static void StaticDataChannelCloseHandler(void *const arg);
+ static void StaticDataChannelMessageHandler(
+ struct mbuf *const
+ buffer, // nullable (in case partial delivery has been requested)
+ enum rawrtc_data_channel_message_flag const flags, void *const arg);
+
+ // Our channel and the label for it.
+ std::string label_;
+ struct rawrtc_data_channel *data_channel_ = nullptr;
+
+ bool opened_ = false;
+ bool closed_ = false;
+
+ std::function<void()> on_open_;
+ std::function<void()> on_buffered_amount_low_;
+ std::function<void()> on_error_;
+ std::function<void()> on_close_;
+ std::function<void(struct mbuf *const,
+ enum rawrtc_data_channel_message_flag const)>
+ on_message_;
+
+ // Self referential pointer to keep ourselves in scope until close() gets
+ // called.
+ std::shared_ptr<ScopedDataChannel> self_;
+};
+
+// Wraper around a RawRTC connection to both manage it's lifetime and provide
+// std::function interfaces for the callbacks.
+class RawRTCConnection {
+ public:
+ RawRTCConnection();
+
+ virtual ~RawRTCConnection();
+
+ void set_on_negotiation_needed(std::function<void()> &&fn) {
+ on_negotiation_needed_ = std::move(fn);
+ }
+ void set_on_local_candidate(
+ std::function<void(struct rawrtc_peer_connection_ice_candidate *,
+ char const *)> &&fn) {
+ on_local_candidate_ = std::move(fn);
+ }
+ // Sets the handler for a peer connection local candidate error. Arguments
+ // are the candidate, URL, error_code and error_text.
+ void set_on_peer_connection_local_candidate_error(
+ std::function<void(struct rawrtc_peer_connection_ice_candidate *,
+ char const *, uint16_t, char const *)> &&fn) {
+ on_peer_connection_local_candidate_error_ = std::move(fn);
+ }
+ void set_on_signaling_state_change(
+ std::function<void(enum rawrtc_signaling_state const)> &&fn) {
+ on_signaling_state_change_ = std::move(fn);
+ }
+ void set_on_ice_transport_state_change(
+ std::function<void(const enum rawrtc_ice_transport_state)> &&fn) {
+ on_ice_transport_state_change_ = std::move(fn);
+ }
+ void set_on_ice_gatherer_state_change(
+ std::function<void(const enum rawrtc_ice_gatherer_state)> &&fn) {
+ on_ice_gatherer_state_change_ = std::move(fn);
+ }
+ void set_on_connection_state_change(
+ std::function<void(const enum rawrtc_peer_connection_state)> &&fn) {
+ on_connection_state_change_ = std::move(fn);
+ }
+
+ // TODO(austin): Really, this should be a ScopedDataChannel object.
+ void set_on_data_channel(
+ std::function<void(std::shared_ptr<ScopedDataChannel>)> &&fn) {
+ on_data_channel_ = std::move(fn);
+ }
+
+ // Opens the connection. This lets us register callbacks before starting it.
+ void Open();
+
+ // Returns the connection if Open has been called.
+ struct rawrtc_peer_connection *connection() {
+ return connection_;
+ }
+
+ private:
+ // Trampolines from C -> C++.
+ static void StaticNegotiationNeededHandler(void *const arg);
+ static void StaticLocalCandidateHandler(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url, void *const arg);
+ static void StaticPeerConnectionLocalCandidateErrorHandler(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url, uint16_t const error_code,
+ char const *const error_text, void *const arg);
+ static void StaticSignalingStateChangeHandler(
+ const enum rawrtc_signaling_state state, void *const arg);
+ static void StaticIceTransportStateChangeHandler(
+ const enum rawrtc_ice_transport_state state, void *const arg);
+ static void StaticIceGathererStateChangeHandler(
+ const enum rawrtc_ice_gatherer_state state, void *const arg);
+ static void StaticConnectionStateChangeHandler(
+ const enum rawrtc_peer_connection_state state, void *const arg);
+ static void StaticDataChannelHandler(
+ struct rawrtc_data_channel *const channel, void *const arg);
+
+ // The connection.
+ struct rawrtc_peer_connection *connection_ = nullptr;
+
+ // Callbacks
+ std::function<void()> on_negotiation_needed_;
+ std::function<void(struct rawrtc_peer_connection_ice_candidate *,
+ char const *)>
+ on_local_candidate_;
+ std::function<void(struct rawrtc_peer_connection_ice_candidate *,
+ char const *, uint16_t, char const *)>
+ on_peer_connection_local_candidate_error_;
+ std::function<void(enum rawrtc_signaling_state const)>
+ on_signaling_state_change_;
+ std::function<void(const enum rawrtc_ice_transport_state)>
+ on_ice_transport_state_change_;
+ std::function<void(const enum rawrtc_ice_gatherer_state)>
+ on_ice_gatherer_state_change_;
+ std::function<void(const enum rawrtc_peer_connection_state)>
+ on_connection_state_change_;
+ std::function<void(std::shared_ptr<ScopedDataChannel>)> on_data_channel_;
+};
+
+} // namespace web_proxy
+} // namespace aos
+
+#endif // AOS_NETWORK_RAWRTC_H_
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index c762b63..6147e46 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -5,74 +5,119 @@
#include "aos/network/web_proxy_generated.h"
#include "aos/network/web_proxy_utils.h"
#include "aos/seasocks/seasocks_logger.h"
-#include "api/create_peerconnection_factory.h"
#include "glog/logging.h"
#include "internal/Embedded.h"
+extern "C" {
+#include <rawrtc.h>
+
+#define DEBUG_LEVEL 7
+#define DEBUG_MODULE "web-proxy"
+#include <re_dbg.h>
+struct list *tmrl_get(void);
+}
+
DEFINE_int32(proxy_port, 8080, "Port to use for the web proxy server.");
namespace aos {
namespace web_proxy {
-
-namespace {
-// Based on webrtc examples. In our controlled environment we expect setting sdp
-// to always succeed, and we can't do anything about a failure, so just ignore
-// everything.
-class DummySetSessionDescriptionObserver
- : public webrtc::SetSessionDescriptionObserver {
- public:
- static DummySetSessionDescriptionObserver *Create() {
- return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>();
- }
- virtual void OnSuccess() {}
- virtual void OnFailure(webrtc::RTCError /*error*/) {}
-};
-
-} // namespace
-
WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
aos::EventLoop *event_loop, int buffer_size)
: server_(server),
config_(aos::CopyFlatBuffer(event_loop->configuration())),
event_loop_(event_loop) {
+ if (VLOG_IS_ON(2)) {
+ dbg_init(DBG_DEBUG, DBG_ALL);
+ }
+ CHECK_RAWRTC(rawrtc_init(true));
+
// We need to reference findEmbeddedContent() to make the linker happy...
findEmbeddedContent("");
- const aos::Node *self = event_loop->node();
+ const aos::Node *self = event_loop_->node();
- for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) {
- auto channel = event_loop->configuration()->channels()->Get(i);
+ subscribers_.reserve(event_loop_->configuration()->channels()->size());
+ for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
+ auto channel = event_loop_->configuration()->channels()->Get(i);
if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
- auto fetcher = event_loop->MakeRawFetcher(channel);
+ auto fetcher = event_loop_->MakeRawFetcher(channel);
subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
std::move(fetcher), i, buffer_size));
+ } else {
+ subscribers_.emplace_back(nullptr);
}
}
- TimerHandler *const timer = event_loop->AddTimer([this]() {
+ TimerHandler *const timer = event_loop_->AddTimer([this]() {
for (auto &subscriber : subscribers_) {
- subscriber->RunIteration();
+ if (subscriber) subscriber->RunIteration();
}
});
- event_loop->OnRun([timer, event_loop]() {
- timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(100));
+ event_loop_->OnRun([this, timer]() {
+ timer->Setup(event_loop_->monotonic_now(), std::chrono::milliseconds(100));
});
}
void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
- std::unique_ptr<Connection> conn = std::make_unique<Connection>(
- sock, server_, subscribers_, config_, event_loop_);
- connections_.insert({sock, std::move(conn)});
+ std::unique_ptr<ApplicationConnection> connection =
+ std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
+ config_, event_loop_);
+
+ connections_.insert({sock, std::move(connection)});
}
void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
size_t size) {
- connections_[sock]->HandleWebSocketData(data, size);
+ const FlatbufferSpan<WebSocketMessage> message({data, size});
+ if (!message.Verify()) {
+ LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
+ return;
+ }
+ VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
+ switch (message.message().payload_type()) {
+ case Payload::WebSocketSdp: {
+ const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
+ if (offer->type() != SdpType::OFFER) {
+ LOG(WARNING) << "Got the wrong sdp type from client";
+ break;
+ }
+ const flatbuffers::String *sdp = offer->payload();
+ connections_[sock]->OnSdp(sdp->c_str());
+ break;
+ }
+ case Payload::WebSocketIce: {
+ const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
+ connections_[sock]->OnIce(ice);
+ break;
+ }
+ default: { break; }
+ }
}
void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
connections_.erase(sock);
}
+// Global epoll pointer
+static aos::internal::EPoll *global_epoll = nullptr;
+
+static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
+ if (flags & 0x1) {
+ global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
+ }
+ if (flags & 0x2) {
+ global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
+ }
+ if (flags & 0x4) {
+ global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
+ }
+ return 0;
+}
+
+static void ReFdClose(int fd) {
+ CHECK(global_epoll != nullptr);
+ global_epoll->DeleteFd(fd);
+}
+
WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
: WebProxy(event_loop, &internal_epoll_, buffer_size) {}
@@ -86,6 +131,23 @@
::seasocks::Logger::Level::Info)),
websocket_handler_(
new WebsocketHandler(&server_, event_loop, buffer_size)) {
+ CHECK(!global_epoll);
+ global_epoll = epoll;
+
+ re_fd_set_listen_callback(&ReFdListen);
+ re_fd_set_close_callback(&ReFdClose);
+
+ epoll->BeforeWait([]() {
+ const uint64_t to = tmr_next_timeout(tmrl_get());
+ if (to != 0) {
+ VLOG(1) << "Next timeout " << to;
+ }
+ // Note: this only works because we are spinning on it...
+ // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
+ // for handling tmr.
+ tmr_poll(tmrl_get());
+ });
+
server_.addWebSocketHandler("/ws", websocket_handler_);
CHECK(server_.startListening(FLAGS_proxy_port));
@@ -117,27 +179,11 @@
epoll_->DeleteFd(server_.fd());
server_.terminate();
CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
+ CHECK(global_epoll == epoll_);
+ global_epoll = nullptr;
}
void Subscriber::RunIteration() {
- {
- // Manage updating the channels_ map given the pending_* members from the
- // *Listeners() methods.
- // We handle all the removals first so that we correctly handle the
- // situation where the user calls RemoveListener() and then AddListener()
- // between calls to RunIteration(). The reverse order (adding and then
- // removing before an update) is handled directly in RemoveListener() where
- // we remove things from the pending_channels_ map directly.
- MutexLocker lock(&mutex_);
- for (const auto &channel : pending_removal_) {
- channels_.erase(channel);
- }
- pending_removal_.clear();
- for (const auto &channel : pending_channels_) {
- channels_.insert(channel);
- }
- pending_channels_.clear();
- }
if (channels_.empty() && buffer_size_ == 0) {
return;
}
@@ -153,38 +199,39 @@
<< "packets";
for (int packet_index = 0;
packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
+ flatbuffers::FlatBufferBuilder fbb;
flatbuffers::Offset<MessageHeader> message_offset =
- PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
- fbb_.Finish(message_offset);
+ PackMessage(&fbb, fetcher_->context(), channel_index_, packet_index);
+ fbb.Finish(message_offset);
- const flatbuffers::DetachedBuffer buffer = fbb_.Release();
+ const flatbuffers::DetachedBuffer buffer = fbb.Release();
+
+
+ struct mbuf *mbuffer = mbuf_alloc(buffer.size());
+ mbuf_write_mem(mbuffer, buffer.data(), buffer.size());
+ mbuf_set_pos(mbuffer, 0);
message.data.emplace_back(
- rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
- true /* binary array */);
+ std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
}
message_buffer_.push_back(std::move(message));
}
for (auto &conn : channels_) {
- rtc::scoped_refptr<webrtc::DataChannelInterface> rtc_channel = conn.first;
+ std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first;
ChannelInformation *channel_data = &conn.second;
if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
SkipToLastMessage(channel_data);
}
- const webrtc::DataBuffer *buffer = NextBuffer(channel_data);
- while (buffer != nullptr) {
+ std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
+ while (buffer) {
+ // TODO(austin): This is a nop so we just buffer forever. Fix this when
+ // we care.
if (rtc_channel->buffered_amount() > 14000000) {
VLOG(1) << "skipping a send because buffered amount is too high";
break;
}
- // Call Send() from the signalling thread. The Invoke() call blocks until
- // the handler has been called, so we do not need to handle any
- // synchronization on this end. The body of the handler should be kept as
- // short as possible to avoid blocking the signalling thread continuously
- // for any longer than necessary.
- channel_data->signaling_thread->Invoke<void>(
- RTC_FROM_HERE,
- [rtc_channel, buffer]() { rtc_channel->Send(*buffer); });
+
+ rtc_channel->Send(buffer.get());
buffer = NextBuffer(channel_data);
}
}
@@ -195,24 +242,23 @@
}
}
-bool Subscriber::Compare(const Channel *channel) const {
- return channel->name()->string_view() ==
- fetcher_->channel()->name()->string_view() &&
- channel->type()->string_view() ==
- fetcher_->channel()->type()->string_view();
-}
-
-void Subscriber::AddListener(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
- TransferMethod transfer_method, rtc::Thread *signaling_thread) {
- MutexLocker lock(&mutex_);
+void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
+ TransferMethod transfer_method) {
+ VLOG(1) << "Adding listener for " << data_channel.get();
ChannelInformation info;
info.transfer_method = transfer_method;
- info.signaling_thread = signaling_thread;
- pending_channels_.emplace(channel, info);
+
+ channels_.emplace(data_channel, info);
}
-const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
+void Subscriber::RemoveListener(
+ std::shared_ptr<ScopedDataChannel> data_channel) {
+ VLOG(1) << "Removing listener for " << data_channel.get();
+ channels_.erase(data_channel);
+}
+
+std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
+ ChannelInformation *channel) {
CHECK_NOTNULL(channel);
if (message_buffer_.empty()) {
return nullptr;
@@ -223,7 +269,7 @@
if (fell_behind) {
channel->current_queue_index = earliest_index;
channel->next_packet_number = 0;
- return &message_buffer_.front().data.at(0);
+ return message_buffer_.front().data.at(0);
}
if (channel->current_queue_index > latest_index) {
// We are still waiting on the next message to appear; return.
@@ -237,8 +283,8 @@
CHECK_LT(0u, packets_in_message);
CHECK_LT(channel->next_packet_number, packets_in_message);
- const webrtc::DataBuffer *data =
- &message_buffer_[channel->current_queue_index - earliest_index].data.at(
+ std::shared_ptr<struct mbuf> original_data =
+ message_buffer_[channel->current_queue_index - earliest_index].data.at(
channel->next_packet_number);
++channel->next_packet_number;
@@ -247,7 +293,9 @@
channel->next_packet_number = 0;
}
- return data;
+ // Trigger a copy of the mbuf without copying the data.
+ return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
+ mem_deref);
}
void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
@@ -260,229 +308,282 @@
channel->next_packet_number = 0;
}
-void Subscriber::RemoveListener(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
- MutexLocker lock(&mutex_);
- pending_channels_.erase(channel);
- pending_removal_.push_back(channel);
-}
-
-Connection::Connection(
- ::seasocks::WebSocket *sock, ::seasocks::Server *server,
+ApplicationConnection::ApplicationConnection(
+ ::seasocks::Server *server, ::seasocks::WebSocket *sock,
const std::vector<std::unique_ptr<Subscriber>> &subscribers,
const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
const EventLoop *event_loop)
- : sock_(sock),
- server_(server),
+ : server_(server),
+ sock_(sock),
subscribers_(subscribers),
config_headers_(PackBuffer(config.span())),
- event_loop_(event_loop) {}
+ event_loop_(event_loop) {
+ connection_.set_on_negotiation_needed([]() {
+ VLOG(1) << "Negotiation needed, not offering so not creating offer.";
+ });
-// Function called for web socket data. Parses the flatbuffer and
-// handles it appropriately.
-void Connection::HandleWebSocketData(const uint8_t *data, size_t size) {
- const FlatbufferSpan<WebSocketMessage> message({data, size});
- if (!message.Verify()) {
- LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
+ connection_.set_on_local_candidate(
+ [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url) { LocalCandidate(candidate, url); });
+
+ connection_.set_on_data_channel(
+ [this](std::shared_ptr<ScopedDataChannel> channel) {
+ OnDataChannel(channel);
+ });
+
+ connection_.Open();
+}
+
+ApplicationConnection::~ApplicationConnection() {
+ for (auto &it : channels_) {
+ it.second.data_channel->Close();
+ it.second.data_channel = nullptr;
+ }
+
+ // Eh, we are done, tell the channel to shut down. If we didn't, it would
+ // just hang around until the connection closes, which is rather shortly
+ // after.
+ if (channel_) {
+ channel_->Close();
+ }
+}
+
+void ApplicationConnection::OnSdp(const char *sdp) {
+ struct rawrtc_peer_connection_description *remote_description = NULL;
+
+ auto error = rawrtc_peer_connection_description_create(
+ &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
+ if (error) {
+ LOG(WARNING) << "Cannot parse remote description: "
+ << rawrtc_code_to_str(error);
return;
}
- switch (message.message().payload_type()) {
- case Payload::WebSocketSdp: {
- const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
- if (offer->type() != SdpType::OFFER) {
- LOG(WARNING) << "Got the wrong sdp type from client";
- break;
- }
- const flatbuffers::String *sdp = offer->payload();
- webrtc::SdpParseError error;
- std::unique_ptr<webrtc::SessionDescriptionInterface> desc =
- CreateSessionDescription(webrtc::SdpType::kOffer, sdp->str(), &error);
- if (!desc) {
- LOG(WARNING) << "Failed to parse sdp description: "
- << error.description;
- // TODO(alex): send a message back to browser for failure.
- break;
- }
- // We can only start creating the PeerConnection once we have
- // something to give it, so we wait until we get an offer before
- // starting.
- webrtc::PeerConnectionInterface::RTCConfiguration config;
- config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
- config.enable_dtls_srtp = true;
- {
- webrtc::PeerConnectionInterface::IceServer ice_server;
- ice_server.urls.push_back("stun:stun.l.google.com:19302");
- config.servers.push_back(ice_server);
- }
+ CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
+ connection_.connection(), remote_description));
- std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create();
- signaling_thread->SetName("signaling_thread", nullptr);
- signaling_thread->Start();
+ struct rawrtc_peer_connection_description *local_description;
+ CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
+ connection_.connection()));
+ CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
+ connection_.connection(), local_description));
- signaling_thread_ = signaling_thread.get();
+ enum rawrtc_sdp_type type;
+ char *local_sdp = nullptr;
+ // Get SDP type & the SDP itself
+ CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
+ &type, local_description));
+ CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
+ local_description));
- webrtc::PeerConnectionFactoryDependencies factory_deps;
- factory_deps.signaling_thread = signaling_thread.release();
- rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
- CreateModularPeerConnectionFactory(std::move(factory_deps));
- {
- // Don't ignore *any* networks--by default, the loopback interface is
- // ignored, which makes it impossible to use WebRTC on devices with no
- // network.
- webrtc::PeerConnectionFactoryInterface::Options options;
- options.network_ignore_mask = 0;
- factory->SetOptions(options);
- }
-
- peer_connection_ =
- factory->CreatePeerConnection(config, nullptr, nullptr, this);
-
- peer_connection_->SetRemoteDescription(
- DummySetSessionDescriptionObserver::Create(), desc.release());
-
- peer_connection_->CreateAnswer(
- this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
- break;
- }
- case Payload::WebSocketIce: {
- const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
- std::string candidate = ice->candidate()->str();
- std::string sdpMid = ice->sdpMid()->str();
- int sdpMLineIndex = ice->sdpMLineIndex();
- webrtc::SdpParseError error;
- webrtc::IceCandidateInterface *ice_candidate =
- webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error);
- if (!ice_candidate) {
- LOG(WARNING) << "Failed to parse ice candidate: " << error.description;
- // TODO(alex): send a message back to browser for failure.
- break;
- }
- peer_connection_->AddIceCandidate(ice_candidate);
- break;
- }
- default: {
- break;
- }
- }
-}
-
-void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
- webrtc::DataBuffer data_buffer(
- rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
- true /* binary array */);
- VLOG(1) << "Sending " << buffer.size() << "bytes to a client";
- data_channel_->Send(data_buffer);
-}
-
-void Connection::OnDataChannel(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
- data_channel_ = channel;
- data_channel_->RegisterObserver(this);
-}
-
-void Connection::OnIceCandidate(
- const webrtc::IceCandidateInterface *candidate) {
- flatbuffers::FlatBufferBuilder fbb(512);
- std::string ice_string;
- candidate->ToString(&ice_string);
-
- flatbuffers::Offset<WebSocketIce> ice_fb = CreateWebSocketIceDirect(
- fbb, ice_string.c_str(), candidate->sdp_mid().c_str(),
- candidate->sdp_mline_index());
- flatbuffers::Offset<WebSocketMessage> ice_message =
- CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
- fbb.Finish(ice_message);
-
- server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
-}
-
-// This is the callback for creating an sdp. We have to manually assign it
-// locally and send it to the client.
-void Connection::OnSuccess(webrtc::SessionDescriptionInterface *desc) {
- peer_connection_->SetLocalDescription(
- DummySetSessionDescriptionObserver::Create(), desc);
- flatbuffers::FlatBufferBuilder fbb(512);
- std::string answer_string;
- desc->ToString(&answer_string);
+ flatbuffers::FlatBufferBuilder fbb;
flatbuffers::Offset<WebSocketSdp> sdp_fb =
- CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, answer_string.c_str());
+ CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
flatbuffers::Offset<WebSocketMessage> answer_message =
CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
+
+ VLOG(1) << aos::FlatbufferToJson(
+ flatbuffers::GetTemporaryPointer(fbb, answer_message));
fbb.Finish(answer_message);
server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
+ mem_deref(local_sdp);
}
-// Wait until the data channel is ready for data before sending the config.
-void Connection::OnStateChange() {
- if (peer_connection_.get() != nullptr &&
- data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
- for (const auto &header : config_headers_) {
- Send(header.buffer());
+void ApplicationConnection::OnIce(const WebSocketIce *ice) {
+ if (!ice->has_candidate()) {
+ return;
+ }
+ uint8_t sdpMLineIndex = ice->sdpMLineIndex();
+
+ struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
+ CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
+ &ice_candidate, ice->candidate()->c_str(), ice->sdpMid()->c_str(),
+ &sdpMLineIndex, nullptr));
+
+ rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
+ ice_candidate);
+
+ mem_deref(ice_candidate);
+}
+
+void ApplicationConnection::LocalCandidate(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url) {
+ struct rawrtc_ice_candidate *ortc_candidate = nullptr;
+ if (candidate) {
+ CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
+ &ortc_candidate, candidate));
+
+ flatbuffers::FlatBufferBuilder fbb;
+ char *sdpp = nullptr;
+ CHECK_RAWRTC(
+ rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
+ char *midp = nullptr;
+ CHECK_RAWRTC(
+ rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
+
+ uint8_t media_line_index;
+ enum rawrtc_code error =
+ rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
+ &media_line_index, candidate);
+
+ flatbuffers::Offset<flatbuffers::String> sdpp_offset =
+ fbb.CreateString(sdpp);
+ flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
+ fbb.CreateString(midp);
+
+ WebSocketIce::Builder web_socket_ice_builder(fbb);
+
+ web_socket_ice_builder.add_candidate(sdpp_offset);
+ web_socket_ice_builder.add_sdpMid(sdp_mid_offset);
+
+ if (error == RAWRTC_CODE_SUCCESS) {
+ web_socket_ice_builder.add_sdpMLineIndex(media_line_index);
}
+ flatbuffers::Offset<WebSocketIce> ice_offset =
+ web_socket_ice_builder.Finish();
+
+ flatbuffers::Offset<WebSocketMessage> ice_message =
+ CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
+ VLOG(1) << url << ": "
+ << aos::FlatbufferToJson(
+ flatbuffers::GetTemporaryPointer(fbb, ice_message));
+ fbb.Finish(ice_message);
+
+ server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
+
+ mem_deref(sdpp);
+ mem_deref(midp);
}
}
-// Handle DataChannel messages. Subscribe to each listener that matches the
-// subscribe message
-void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
- // Sanity check--we are relying on the Add/RemoveListener calls being made
- // from the correct thread.
- CHECK(signaling_thread_->IsCurrent());
+void ApplicationConnection::OnDataChannel(
+ std::shared_ptr<ScopedDataChannel> channel) {
+ if (channel->label() == std::string_view("signalling")) {
+ CHECK(!channel_);
+ channel_ = channel;
+
+ channel_->set_on_message(
+ [this](struct mbuf *const buffer,
+ const enum rawrtc_data_channel_message_flag flags) {
+ HandleSignallingData(buffer, flags);
+ });
+
+ channel_->set_on_open([this]() {
+ for (const auto &header : config_headers_) {
+ channel_->Send(header.buffer());
+ }
+ });
+
+ channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
+
+ // Register an on_close callback which does nothing but keeps channel alive
+ // until it is done. This keeps the memory around until rawrtc can finish
+ // calling the close callback.
+ channel_->set_on_close([channel]() {});
+ } else {
+ channel_->set_on_close([channel]() {});
+ channel->Close();
+ }
+}
+
+void ApplicationConnection::HandleSignallingData(
+ struct mbuf *const
+ buffer, // nullable (in case partial delivery has been requested)
+ const enum rawrtc_data_channel_message_flag /*flags*/) {
FlatbufferSpan<SubscriberRequest> message(
- {buffer.data.data(), buffer.data.size()});
+ {mbuf_buf(buffer), mbuf_get_left(buffer)});
if (!message.Verify()) {
LOG(ERROR) << "Invalid flatbuffer received from browser client.";
return;
}
- VLOG(2) << "Got a subscription message "
+ VLOG(1) << "Got a subscription message "
<< aos::FlatbufferToJson(&message.message());
if (!message.message().has_channels_to_transfer()) {
LOG(ERROR) << "No channels requested for transfer.";
return;
}
- for (auto &subscriber : subscribers_) {
- bool found_match = false;
- for (auto channel_request : *message.message().channels_to_transfer()) {
- const Channel *channel = channel_request->channel();
- if (channel == nullptr) {
- LOG(ERROR) << "Got unpopulated channel.";
- continue;
- }
- const TransferMethod transfer_method = channel_request->method();
- // Call GetChannel() before comparing the channel name/type to each
- // subscriber. This allows us to resolve any node or application specific
- // mappings.
- const Channel *comparison_channel =
- configuration::GetChannel(event_loop_->configuration(), channel,
- event_loop_->name(), event_loop_->node());
- if (comparison_channel == nullptr) {
- LOG(ERROR) << "Channel not available: "
- << configuration::StrippedChannelToString(channel);
- continue;
- }
- if (subscriber->Compare(comparison_channel)) {
- int index = subscriber->index();
- auto it = channels_.find(index);
- if (it == channels_.end()) {
- auto pair = channels_.insert(
- {index, peer_connection_->CreateDataChannel(
- channel->name()->str() + "/" + channel->type()->str(),
- nullptr)});
- it = pair.first;
- }
- subscriber->AddListener(it->second, transfer_method, signaling_thread_);
- VLOG(1) << "Subscribe to: " << channel->type()->str();
- found_match = true;
- break;
- }
+ // The client each time sends a full list of everything it wants to be
+ // subscribed to. It is our responsibility to remove channels which aren't
+ // in that list and add ones which need to be.
+ //
+ // Start by clearing a tracking bit on each channel. This takes O(number of
+ // open channels), which should be small.
+ //
+ // Then open any new channels. For any we visit which are already open,
+ // don't update those.
+ //
+ // Finally, iterate over the channel list and purge anything which we didn't
+ // touch.
+ for (auto &it : channels_) {
+ it.second.requested = false;
+ }
+ for (auto channel_request : *message.message().channels_to_transfer()) {
+ const Channel *channel = channel_request->channel();
+ if (channel == nullptr) {
+ LOG(ERROR) << "Got unpopulated channel.";
+ continue;
}
- if (!found_match) {
- int index = subscriber->index();
- auto it = channels_.find(index);
- subscriber->RemoveListener(it->second);
+ const TransferMethod transfer_method = channel_request->method();
+ // Call GetChannel() before comparing the channel name/type to each
+ // subscriber. This allows us to resolve any node or application
+ // specific mappings.
+ const Channel *comparison_channel =
+ configuration::GetChannel(event_loop_->configuration(), channel,
+ event_loop_->name(), event_loop_->node());
+ if (comparison_channel == nullptr) {
+ LOG(ERROR) << "Channel not available: "
+ << configuration::StrippedChannelToString(channel);
+ continue;
+ }
+
+ size_t channel_index = configuration::ChannelIndex(
+ event_loop_->configuration(), comparison_channel);
+
+ auto it = channels_.find(channel_index);
+ if (it == channels_.end()) {
+ std::shared_ptr<ScopedDataChannel> data_channel =
+ std::make_shared<ScopedDataChannel>();
+
+ std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
+
+ data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
+ channel_index]() {
+ std::shared_ptr<ScopedDataChannel> data_channel =
+ data_channel_weak_ptr.lock();
+ CHECK(data_channel) << ": Subscriber got destroyed before we started.";
+ // Weak ptr inside the subscriber so we don't have a circular
+ // reference. AddListener will close it.
+ subscribers_[channel_index]->AddListener(data_channel, transfer_method);
+ });
+
+ Subscriber *subscriber = subscribers_[channel_index].get();
+ data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
+ std::shared_ptr<ScopedDataChannel> data_channel =
+ data_channel_weak_ptr.lock();
+ CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
+ subscriber->RemoveListener(data_channel);
+ });
+
+ data_channel->Open(
+ connection_.connection(),
+ absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
+
+ auto pair = channels_.insert({channel_index, {data_channel, true}});
+ it = pair.first;
+ }
+
+ it->second.requested = true;
+
+ VLOG(1) << "Subscribe to: " << channel->type()->str();
+ }
+
+ for (auto &it : channels_) {
+ if (!it.second.requested) {
+ it.second.data_channel->Close();
+ it.second.data_channel = nullptr;
}
}
}
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index e7bf4e0..4ad0630 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -1,11 +1,15 @@
#ifndef AOS_NETWORK_WEB_PROXY_H_
#define AOS_NETWORK_WEB_PROXY_H_
+
+#include <deque>
#include <map>
#include <set>
+
#include "aos/events/event_loop.h"
#include "aos/events/shm_event_loop.h"
#include "aos/mutex/mutex.h"
#include "aos/network/connect_generated.h"
+#include "aos/network/rawrtc.h"
#include "aos/network/web_proxy_generated.h"
#include "aos/seasocks/seasocks_logger.h"
#include "flatbuffers/flatbuffers.h"
@@ -13,13 +17,12 @@
#include "seasocks/StringUtil.h"
#include "seasocks/WebSocket.h"
-#include "api/peer_connection_interface.h"
-
namespace aos {
namespace web_proxy {
class Connection;
class Subscriber;
+class ApplicationConnection;
// Basic class that handles receiving new websocket connections. Creates a new
// Connection to manage the rest of the negotiation and data passing. When the
@@ -34,12 +37,14 @@
void onDisconnect(::seasocks::WebSocket *sock) override;
private:
- std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
::seasocks::Server *server_;
std::vector<std::unique_ptr<Subscriber>> subscribers_;
const aos::FlatbufferDetachedBuffer<aos::Configuration> config_;
- const EventLoop *const event_loop_;
+ std::map<::seasocks::WebSocket *, std::unique_ptr<ApplicationConnection>>
+ connections_;
+
+ EventLoop *const event_loop_;
};
// Wrapper class that manages the seasocks server and WebsocketHandler.
@@ -92,135 +97,86 @@
public:
Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index,
int buffer_size)
- : fbb_(1024),
- fetcher_(std::move(fetcher)),
+ : fetcher_(std::move(fetcher)),
channel_index_(channel_index),
buffer_size_(buffer_size) {}
void RunIteration();
- void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
- TransferMethod transfer_method,
- rtc::Thread *signaling_thread);
+ void AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
+ TransferMethod transfer_method);
- void RemoveListener(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
-
- // Check if the Channel passed matches the channel this fetchs.
- bool Compare(const Channel *channel) const;
-
- int index() const { return channel_index_; }
+ void RemoveListener(std::shared_ptr<ScopedDataChannel> data_channel);
private:
struct ChannelInformation {
TransferMethod transfer_method;
uint32_t current_queue_index = 0;
size_t next_packet_number = 0;
- // Thread to use for making calls to the DataChannelInterface.
- rtc::Thread *signaling_thread;
};
struct Message {
uint32_t index = 0xffffffff;
- std::vector<webrtc::DataBuffer> data;
+ std::vector<std::shared_ptr<struct mbuf>> data;
};
- const webrtc::DataBuffer *NextBuffer(ChannelInformation *channel);
+ std::shared_ptr<struct mbuf> NextBuffer(ChannelInformation *channel);
void SkipToLastMessage(ChannelInformation *channel);
- flatbuffers::FlatBufferBuilder fbb_;
std::unique_ptr<RawFetcher> fetcher_;
int channel_index_;
int buffer_size_;
std::deque<Message> message_buffer_;
- std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
- channels_;
- // In order to enable the Connection class to add/remove listeners
- // asyncrhonously, queue up all the newly added listeners in pending_*
- // members. Access to these members is controlled by mutex_.
- std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
- pending_channels_;
- std::vector<rtc::scoped_refptr<webrtc::DataChannelInterface>>
- pending_removal_;
-
- aos::Mutex mutex_;
+ std::map<std::shared_ptr<ScopedDataChannel>, ChannelInformation> channels_;
};
-// Represents a single connection to a browser for the entire lifetime of the
-// connection.
-class Connection : public webrtc::PeerConnectionObserver,
- public webrtc::CreateSessionDescriptionObserver,
- public webrtc::DataChannelObserver {
+// Class to manage a WebRTC connection to a browser.
+class ApplicationConnection {
public:
- Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server,
- const std::vector<std::unique_ptr<Subscriber>> &subscribers,
- const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
- const EventLoop *event_loop);
+ ApplicationConnection(
+ ::seasocks::Server *server, ::seasocks::WebSocket *sock,
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
+ const EventLoop *event_loop);
- ~Connection() {
- // DataChannel may call OnStateChange after this is destroyed, so make sure
- // it doesn't.
- if (data_channel_) {
- data_channel_->UnregisterObserver();
- }
- }
+ ~ApplicationConnection();
- void HandleWebSocketData(const uint8_t *data, size_t size);
-
- void Send(const flatbuffers::DetachedBuffer &buffer) const;
-
- // PeerConnectionObserver implementation
- void OnSignalingChange(
- webrtc::PeerConnectionInterface::SignalingState) override {}
- void OnAddStream(rtc::scoped_refptr<webrtc::MediaStreamInterface>) override {}
- void OnRemoveStream(
- rtc::scoped_refptr<webrtc::MediaStreamInterface>) override {}
- void OnDataChannel(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel) override;
- void OnRenegotiationNeeded() override {}
- void OnIceConnectionChange(
- webrtc::PeerConnectionInterface::IceConnectionState) override {}
- void OnIceGatheringChange(
- webrtc::PeerConnectionInterface::IceGatheringState) override {}
- void OnIceCandidate(const webrtc::IceCandidateInterface *candidate) override;
- void OnIceCandidateError(const std::string &host_candidate,
- const std::string &url, int error_code,
- const std::string &error_text) override {
- LOG(ERROR) << "ICE Candidate Error on " << host_candidate << " for " << url
- << " with error " << error_code << ": " << error_text;
- }
- void OnIceConnectionReceivingChange(bool) override {}
-
- // CreateSessionDescriptionObserver implementation
- void OnSuccess(webrtc::SessionDescriptionInterface *desc) override;
- void OnFailure(webrtc::RTCError /*error*/) override {}
- // CreateSessionDescriptionObserver is a refcounted object
- void AddRef() const override {}
- // We handle ownership with a unique_ptr so don't worry about actually
- // refcounting. We will delete when we are done.
- rtc::RefCountReleaseStatus Release() const override {
- return rtc::RefCountReleaseStatus::kOtherRefsRemained;
- }
-
- // DataChannelObserver implementation
- void OnStateChange() override;
- void OnMessage(const webrtc::DataBuffer &buffer) override;
- void OnBufferedAmountChange(uint64_t /*sent_data_size*/) override {}
+ // Handles a SDP sent through the negotiation channel.
+ void OnSdp(const char *sdp);
+ // Handles a ICE candidate sent through the negotiation channel.
+ void OnIce(const WebSocketIce *ice);
private:
- ::seasocks::WebSocket *sock_;
- ::seasocks::Server *server_;
- // The signaling thread is the thread on which most/all of the work we do with
- // WebRTC will happen--it is both where the handlers we register should be
- // called and where we should be calling Send() from.
- rtc::Thread *signaling_thread_;
- const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
- const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
- std::map<int, rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
+ void LocalCandidate(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url);
- rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
- rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
+ // Handles a signaling channel being made.
+ void OnDataChannel(std::shared_ptr<ScopedDataChannel> channel);
+
+ // Handles data coming in on the signaling channel requesting subscription.
+ void HandleSignallingData(
+ struct mbuf *const
+ buffer, // nullable (in case partial delivery has been requested)
+ const enum rawrtc_data_channel_message_flag /*flags*/);
+
+ RawRTCConnection connection_;
+
+ ::seasocks::Server *server_;
+ ::seasocks::WebSocket *sock_;
+
+ struct ChannelState {
+ std::shared_ptr<ScopedDataChannel> data_channel;
+ bool requested = true;
+ };
+
+ std::map<int, ChannelState> channels_;
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
+
+ const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
const EventLoop *const event_loop_;
+
+ std::shared_ptr<ScopedDataChannel> channel_;
};
} // namespace web_proxy
diff --git a/aos/network/web_proxy_utils.h b/aos/network/web_proxy_utils.h
index 09ad333..cfed19d 100644
--- a/aos/network/web_proxy_utils.h
+++ b/aos/network/web_proxy_utils.h
@@ -8,11 +8,8 @@
int GetPacketCount(const Context &context);
-/*
- * Packs a message embedded in context into a MessageHeader on fbb. Handles
- * multipart messages by use of the packet_index.
- * TODO(alex): make this an iterator that returns each packet sequentially
- */
+// Packs a message embedded in context into a MessageHeader on fbb. Handles
+// multipart messages by use of the packet_index.
flatbuffers::Offset<MessageHeader> PackMessage(
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, int packet_index);
diff --git a/third_party/BUILD b/third_party/BUILD
index 4ecad0d..040d3f7 100644
--- a/third_party/BUILD
+++ b/third_party/BUILD
@@ -91,18 +91,6 @@
)
cc_library(
- name = "webrtc",
- target_compatible_with = ["@platforms//os:linux"],
- visibility = ["//visibility:public"],
- deps = cpu_select({
- "amd64": ["@webrtc_x64//:webrtc"],
- "armhf": ["@webrtc_arm//:webrtc"],
- "cortex-m": ["@webrtc_arm//:webrtc"],
- "roborio": ["@webrtc_rio//:webrtc"],
- }),
-)
-
-cc_library(
name = "lzma",
visibility = ["//visibility:public"],
deps = select({
diff --git a/third_party/rawrtc/rawrtc/BUILD b/third_party/rawrtc/rawrtc/BUILD
index d65d0af..dc278f7 100644
--- a/third_party/rawrtc/rawrtc/BUILD
+++ b/third_party/rawrtc/rawrtc/BUILD
@@ -28,6 +28,9 @@
"-DHAVE_STDBOOL_H",
"-DHAVE_INTTYPES_H",
],
+ defines = [
+ "HAVE_STDBOOL_H",
+ ],
includes = ["include/"],
local_defines = [
"RAWRTC_VERSION=\\\"0.5.1\\\"",
@@ -39,17 +42,36 @@
],
)
+cc_library(
+ name = "tools_helper",
+ srcs = [
+ "tools/helper/common.c",
+ "tools/helper/handler.c",
+ "tools/helper/parameters.c",
+ "tools/helper/utils.c",
+ ],
+ hdrs = [
+ "tools/helper/common.h",
+ "tools/helper/handler.h",
+ "tools/helper/parameters.h",
+ "tools/helper/utils.h",
+ ],
+ copts = [
+ "-Wno-missing-braces",
+ "-Wno-incompatible-pointer-types-discards-qualifiers",
+ ] + compiler_select({
+ "clang": [],
+ "gcc": [
+ "-Wno-discarded-qualifiers",
+ ],
+ }),
+ visibility = ["//visibility:public"],
+ deps = [":rawrtc"],
+)
+
cc_binary(
name = "peer-connection",
srcs = [
- "tools/helper/common.c",
- "tools/helper/common.h",
- "tools/helper/handler.c",
- "tools/helper/handler.h",
- "tools/helper/parameters.c",
- "tools/helper/parameters.h",
- "tools/helper/utils.c",
- "tools/helper/utils.h",
"tools/peer-connection.c",
],
copts = [
@@ -64,5 +86,8 @@
],
}),
includes = ["tools"],
- deps = [":rawrtc"],
+ deps = [
+ ":rawrtc",
+ ":tools_helper",
+ ],
)
diff --git a/third_party/rawrtc/re/include/re_main.h b/third_party/rawrtc/re/include/re_main.h
index cc9ac4b..c4e6293 100644
--- a/third_party/rawrtc/re/include/re_main.h
+++ b/third_party/rawrtc/re/include/re_main.h
@@ -29,8 +29,13 @@
*
* @param sig Signal number
*/
+typedef int(re_fd_listen_h)(int fd, int flags, fd_h *fh, void *arg);
+typedef void(re_fd_close_h)(int fd);
+
typedef void (re_signal_h)(int sig);
+void re_fd_set_listen_callback(re_fd_listen_h *listenh);
+void re_fd_set_close_callback(re_fd_close_h *closeh);
int fd_listen(int fd, int flags, fd_h *fh, void *arg);
void fd_close(int fd);
diff --git a/third_party/rawrtc/re/src/main/main.c b/third_party/rawrtc/re/src/main/main.c
index 0243b4b..de3e68b 100644
--- a/third_party/rawrtc/re/src/main/main.c
+++ b/third_party/rawrtc/re/src/main/main.c
@@ -222,6 +222,25 @@
#endif
+static re_fd_listen_h *global_fd_listen_h = NULL;
+static re_fd_close_h *global_fd_close_h = NULL;
+
+void re_fd_set_listen_callback(re_fd_listen_h *listenh) {
+ struct re *re = re_get();
+
+ re_lock(re);
+ global_fd_listen_h = listenh;
+ re_unlock(re);
+}
+void re_fd_set_close_callback(re_fd_close_h *closeh) {
+ struct re *re = re_get();
+
+ re_lock(re);
+ global_fd_close_h = closeh;
+ re_unlock(re);
+}
+
+
#if MAIN_DEBUG
/**
* Call the application event handler
@@ -566,7 +585,11 @@
struct re *re = re_get();
int err = 0;
- DEBUG_INFO("fd_listen: fd=%d flags=0x%02x\n", fd, flags);
+ if (global_fd_listen_h != NULL) {
+ return (*global_fd_listen_h)(fd, flags, fh, arg);
+ }
+
+ DEBUG_INFO("fd_listen: fd=%d flags=0x%02x\n", fd, flags);
if (fd < 0) {
DEBUG_WARNING("fd_listen: corrupt fd %d\n", fd);
@@ -642,6 +665,11 @@
*/
void fd_close(int fd)
{
+ if (global_fd_close_h != NULL) {
+ (*global_fd_close_h)(fd);
+ return;
+ }
+
(void)fd_listen(fd, 0, NULL, NULL);
}