Switch web_proxy over to rawrtc
In theory, nothing else should change... This will make it possible to
run the web proxy and web plotter in applications which use absl among
other things.
Future work should involve reducing a copy (we copy an extra time when
buffering), and making web_proxy not spin at 100% CPU when it is done by
reshuffling how the event loop works a bit better.
Change-Id: Iab6b6b278c5e830bb71e107824b364dfe7cad46d
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index c762b63..6147e46 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -5,74 +5,119 @@
#include "aos/network/web_proxy_generated.h"
#include "aos/network/web_proxy_utils.h"
#include "aos/seasocks/seasocks_logger.h"
-#include "api/create_peerconnection_factory.h"
#include "glog/logging.h"
#include "internal/Embedded.h"
+extern "C" {
+#include <rawrtc.h>
+
+#define DEBUG_LEVEL 7
+#define DEBUG_MODULE "web-proxy"
+#include <re_dbg.h>
+struct list *tmrl_get(void);
+}
+
DEFINE_int32(proxy_port, 8080, "Port to use for the web proxy server.");
namespace aos {
namespace web_proxy {
-
-namespace {
-// Based on webrtc examples. In our controlled environment we expect setting sdp
-// to always succeed, and we can't do anything about a failure, so just ignore
-// everything.
-class DummySetSessionDescriptionObserver
- : public webrtc::SetSessionDescriptionObserver {
- public:
- static DummySetSessionDescriptionObserver *Create() {
- return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>();
- }
- virtual void OnSuccess() {}
- virtual void OnFailure(webrtc::RTCError /*error*/) {}
-};
-
-} // namespace
-
WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
aos::EventLoop *event_loop, int buffer_size)
: server_(server),
config_(aos::CopyFlatBuffer(event_loop->configuration())),
event_loop_(event_loop) {
+ if (VLOG_IS_ON(2)) {
+ dbg_init(DBG_DEBUG, DBG_ALL);
+ }
+ CHECK_RAWRTC(rawrtc_init(true));
+
// We need to reference findEmbeddedContent() to make the linker happy...
findEmbeddedContent("");
- const aos::Node *self = event_loop->node();
+ const aos::Node *self = event_loop_->node();
- for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) {
- auto channel = event_loop->configuration()->channels()->Get(i);
+ subscribers_.reserve(event_loop_->configuration()->channels()->size());
+ for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
+ auto channel = event_loop_->configuration()->channels()->Get(i);
if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
- auto fetcher = event_loop->MakeRawFetcher(channel);
+ auto fetcher = event_loop_->MakeRawFetcher(channel);
subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
std::move(fetcher), i, buffer_size));
+ } else {
+ subscribers_.emplace_back(nullptr);
}
}
- TimerHandler *const timer = event_loop->AddTimer([this]() {
+ TimerHandler *const timer = event_loop_->AddTimer([this]() {
for (auto &subscriber : subscribers_) {
- subscriber->RunIteration();
+ if (subscriber) subscriber->RunIteration();
}
});
- event_loop->OnRun([timer, event_loop]() {
- timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(100));
+ event_loop_->OnRun([this, timer]() {
+ timer->Setup(event_loop_->monotonic_now(), std::chrono::milliseconds(100));
});
}
void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
- std::unique_ptr<Connection> conn = std::make_unique<Connection>(
- sock, server_, subscribers_, config_, event_loop_);
- connections_.insert({sock, std::move(conn)});
+ std::unique_ptr<ApplicationConnection> connection =
+ std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
+ config_, event_loop_);
+
+ connections_.insert({sock, std::move(connection)});
}
void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
size_t size) {
- connections_[sock]->HandleWebSocketData(data, size);
+ const FlatbufferSpan<WebSocketMessage> message({data, size});
+ if (!message.Verify()) {
+ LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
+ return;
+ }
+ VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
+ switch (message.message().payload_type()) {
+ case Payload::WebSocketSdp: {
+ const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
+ if (offer->type() != SdpType::OFFER) {
+ LOG(WARNING) << "Got the wrong sdp type from client";
+ break;
+ }
+ const flatbuffers::String *sdp = offer->payload();
+ connections_[sock]->OnSdp(sdp->c_str());
+ break;
+ }
+ case Payload::WebSocketIce: {
+ const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
+ connections_[sock]->OnIce(ice);
+ break;
+ }
+ default: { break; }
+ }
}
void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
connections_.erase(sock);
}
+// Global epoll pointer
+static aos::internal::EPoll *global_epoll = nullptr;
+
+static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
+ if (flags & 0x1) {
+ global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
+ }
+ if (flags & 0x2) {
+ global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
+ }
+ if (flags & 0x4) {
+ global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
+ }
+ return 0;
+}
+
+static void ReFdClose(int fd) {
+ CHECK(global_epoll != nullptr);
+ global_epoll->DeleteFd(fd);
+}
+
WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
: WebProxy(event_loop, &internal_epoll_, buffer_size) {}
@@ -86,6 +131,23 @@
::seasocks::Logger::Level::Info)),
websocket_handler_(
new WebsocketHandler(&server_, event_loop, buffer_size)) {
+ CHECK(!global_epoll);
+ global_epoll = epoll;
+
+ re_fd_set_listen_callback(&ReFdListen);
+ re_fd_set_close_callback(&ReFdClose);
+
+ epoll->BeforeWait([]() {
+ const uint64_t to = tmr_next_timeout(tmrl_get());
+ if (to != 0) {
+ VLOG(1) << "Next timeout " << to;
+ }
+ // Note: this only works because we are spinning on it...
+ // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
+ // for handling tmr.
+ tmr_poll(tmrl_get());
+ });
+
server_.addWebSocketHandler("/ws", websocket_handler_);
CHECK(server_.startListening(FLAGS_proxy_port));
@@ -117,27 +179,11 @@
epoll_->DeleteFd(server_.fd());
server_.terminate();
CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
+ CHECK(global_epoll == epoll_);
+ global_epoll = nullptr;
}
void Subscriber::RunIteration() {
- {
- // Manage updating the channels_ map given the pending_* members from the
- // *Listeners() methods.
- // We handle all the removals first so that we correctly handle the
- // situation where the user calls RemoveListener() and then AddListener()
- // between calls to RunIteration(). The reverse order (adding and then
- // removing before an update) is handled directly in RemoveListener() where
- // we remove things from the pending_channels_ map directly.
- MutexLocker lock(&mutex_);
- for (const auto &channel : pending_removal_) {
- channels_.erase(channel);
- }
- pending_removal_.clear();
- for (const auto &channel : pending_channels_) {
- channels_.insert(channel);
- }
- pending_channels_.clear();
- }
if (channels_.empty() && buffer_size_ == 0) {
return;
}
@@ -153,38 +199,39 @@
<< "packets";
for (int packet_index = 0;
packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
+ flatbuffers::FlatBufferBuilder fbb;
flatbuffers::Offset<MessageHeader> message_offset =
- PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
- fbb_.Finish(message_offset);
+ PackMessage(&fbb, fetcher_->context(), channel_index_, packet_index);
+ fbb.Finish(message_offset);
- const flatbuffers::DetachedBuffer buffer = fbb_.Release();
+ const flatbuffers::DetachedBuffer buffer = fbb.Release();
+
+
+ struct mbuf *mbuffer = mbuf_alloc(buffer.size());
+ mbuf_write_mem(mbuffer, buffer.data(), buffer.size());
+ mbuf_set_pos(mbuffer, 0);
message.data.emplace_back(
- rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
- true /* binary array */);
+ std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
}
message_buffer_.push_back(std::move(message));
}
for (auto &conn : channels_) {
- rtc::scoped_refptr<webrtc::DataChannelInterface> rtc_channel = conn.first;
+ std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first;
ChannelInformation *channel_data = &conn.second;
if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
SkipToLastMessage(channel_data);
}
- const webrtc::DataBuffer *buffer = NextBuffer(channel_data);
- while (buffer != nullptr) {
+ std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
+ while (buffer) {
+ // TODO(austin): This is a nop so we just buffer forever. Fix this when
+ // we care.
if (rtc_channel->buffered_amount() > 14000000) {
VLOG(1) << "skipping a send because buffered amount is too high";
break;
}
- // Call Send() from the signalling thread. The Invoke() call blocks until
- // the handler has been called, so we do not need to handle any
- // synchronization on this end. The body of the handler should be kept as
- // short as possible to avoid blocking the signalling thread continuously
- // for any longer than necessary.
- channel_data->signaling_thread->Invoke<void>(
- RTC_FROM_HERE,
- [rtc_channel, buffer]() { rtc_channel->Send(*buffer); });
+
+ rtc_channel->Send(buffer.get());
buffer = NextBuffer(channel_data);
}
}
@@ -195,24 +242,23 @@
}
}
-bool Subscriber::Compare(const Channel *channel) const {
- return channel->name()->string_view() ==
- fetcher_->channel()->name()->string_view() &&
- channel->type()->string_view() ==
- fetcher_->channel()->type()->string_view();
-}
-
-void Subscriber::AddListener(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
- TransferMethod transfer_method, rtc::Thread *signaling_thread) {
- MutexLocker lock(&mutex_);
+void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
+ TransferMethod transfer_method) {
+ VLOG(1) << "Adding listener for " << data_channel.get();
ChannelInformation info;
info.transfer_method = transfer_method;
- info.signaling_thread = signaling_thread;
- pending_channels_.emplace(channel, info);
+
+ channels_.emplace(data_channel, info);
}
-const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
+void Subscriber::RemoveListener(
+ std::shared_ptr<ScopedDataChannel> data_channel) {
+ VLOG(1) << "Removing listener for " << data_channel.get();
+ channels_.erase(data_channel);
+}
+
+std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
+ ChannelInformation *channel) {
CHECK_NOTNULL(channel);
if (message_buffer_.empty()) {
return nullptr;
@@ -223,7 +269,7 @@
if (fell_behind) {
channel->current_queue_index = earliest_index;
channel->next_packet_number = 0;
- return &message_buffer_.front().data.at(0);
+ return message_buffer_.front().data.at(0);
}
if (channel->current_queue_index > latest_index) {
// We are still waiting on the next message to appear; return.
@@ -237,8 +283,8 @@
CHECK_LT(0u, packets_in_message);
CHECK_LT(channel->next_packet_number, packets_in_message);
- const webrtc::DataBuffer *data =
- &message_buffer_[channel->current_queue_index - earliest_index].data.at(
+ std::shared_ptr<struct mbuf> original_data =
+ message_buffer_[channel->current_queue_index - earliest_index].data.at(
channel->next_packet_number);
++channel->next_packet_number;
@@ -247,7 +293,9 @@
channel->next_packet_number = 0;
}
- return data;
+ // Trigger a copy of the mbuf without copying the data.
+ return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
+ mem_deref);
}
void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
@@ -260,229 +308,282 @@
channel->next_packet_number = 0;
}
-void Subscriber::RemoveListener(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
- MutexLocker lock(&mutex_);
- pending_channels_.erase(channel);
- pending_removal_.push_back(channel);
-}
-
-Connection::Connection(
- ::seasocks::WebSocket *sock, ::seasocks::Server *server,
+ApplicationConnection::ApplicationConnection(
+ ::seasocks::Server *server, ::seasocks::WebSocket *sock,
const std::vector<std::unique_ptr<Subscriber>> &subscribers,
const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
const EventLoop *event_loop)
- : sock_(sock),
- server_(server),
+ : server_(server),
+ sock_(sock),
subscribers_(subscribers),
config_headers_(PackBuffer(config.span())),
- event_loop_(event_loop) {}
+ event_loop_(event_loop) {
+ connection_.set_on_negotiation_needed([]() {
+ VLOG(1) << "Negotiation needed, not offering so not creating offer.";
+ });
-// Function called for web socket data. Parses the flatbuffer and
-// handles it appropriately.
-void Connection::HandleWebSocketData(const uint8_t *data, size_t size) {
- const FlatbufferSpan<WebSocketMessage> message({data, size});
- if (!message.Verify()) {
- LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
+ connection_.set_on_local_candidate(
+ [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url) { LocalCandidate(candidate, url); });
+
+ connection_.set_on_data_channel(
+ [this](std::shared_ptr<ScopedDataChannel> channel) {
+ OnDataChannel(channel);
+ });
+
+ connection_.Open();
+}
+
+ApplicationConnection::~ApplicationConnection() {
+ for (auto &it : channels_) {
+ it.second.data_channel->Close();
+ it.second.data_channel = nullptr;
+ }
+
+ // Eh, we are done, tell the channel to shut down. If we didn't, it would
+ // just hang around until the connection closes, which is rather shortly
+ // after.
+ if (channel_) {
+ channel_->Close();
+ }
+}
+
+void ApplicationConnection::OnSdp(const char *sdp) {
+ struct rawrtc_peer_connection_description *remote_description = NULL;
+
+ auto error = rawrtc_peer_connection_description_create(
+ &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
+ if (error) {
+ LOG(WARNING) << "Cannot parse remote description: "
+ << rawrtc_code_to_str(error);
return;
}
- switch (message.message().payload_type()) {
- case Payload::WebSocketSdp: {
- const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
- if (offer->type() != SdpType::OFFER) {
- LOG(WARNING) << "Got the wrong sdp type from client";
- break;
- }
- const flatbuffers::String *sdp = offer->payload();
- webrtc::SdpParseError error;
- std::unique_ptr<webrtc::SessionDescriptionInterface> desc =
- CreateSessionDescription(webrtc::SdpType::kOffer, sdp->str(), &error);
- if (!desc) {
- LOG(WARNING) << "Failed to parse sdp description: "
- << error.description;
- // TODO(alex): send a message back to browser for failure.
- break;
- }
- // We can only start creating the PeerConnection once we have
- // something to give it, so we wait until we get an offer before
- // starting.
- webrtc::PeerConnectionInterface::RTCConfiguration config;
- config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
- config.enable_dtls_srtp = true;
- {
- webrtc::PeerConnectionInterface::IceServer ice_server;
- ice_server.urls.push_back("stun:stun.l.google.com:19302");
- config.servers.push_back(ice_server);
- }
+ CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
+ connection_.connection(), remote_description));
- std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create();
- signaling_thread->SetName("signaling_thread", nullptr);
- signaling_thread->Start();
+ struct rawrtc_peer_connection_description *local_description;
+ CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
+ connection_.connection()));
+ CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
+ connection_.connection(), local_description));
- signaling_thread_ = signaling_thread.get();
+ enum rawrtc_sdp_type type;
+ char *local_sdp = nullptr;
+ // Get SDP type & the SDP itself
+ CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
+ &type, local_description));
+ CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
+ local_description));
- webrtc::PeerConnectionFactoryDependencies factory_deps;
- factory_deps.signaling_thread = signaling_thread.release();
- rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
- CreateModularPeerConnectionFactory(std::move(factory_deps));
- {
- // Don't ignore *any* networks--by default, the loopback interface is
- // ignored, which makes it impossible to use WebRTC on devices with no
- // network.
- webrtc::PeerConnectionFactoryInterface::Options options;
- options.network_ignore_mask = 0;
- factory->SetOptions(options);
- }
-
- peer_connection_ =
- factory->CreatePeerConnection(config, nullptr, nullptr, this);
-
- peer_connection_->SetRemoteDescription(
- DummySetSessionDescriptionObserver::Create(), desc.release());
-
- peer_connection_->CreateAnswer(
- this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
- break;
- }
- case Payload::WebSocketIce: {
- const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
- std::string candidate = ice->candidate()->str();
- std::string sdpMid = ice->sdpMid()->str();
- int sdpMLineIndex = ice->sdpMLineIndex();
- webrtc::SdpParseError error;
- webrtc::IceCandidateInterface *ice_candidate =
- webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error);
- if (!ice_candidate) {
- LOG(WARNING) << "Failed to parse ice candidate: " << error.description;
- // TODO(alex): send a message back to browser for failure.
- break;
- }
- peer_connection_->AddIceCandidate(ice_candidate);
- break;
- }
- default: {
- break;
- }
- }
-}
-
-void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
- webrtc::DataBuffer data_buffer(
- rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
- true /* binary array */);
- VLOG(1) << "Sending " << buffer.size() << "bytes to a client";
- data_channel_->Send(data_buffer);
-}
-
-void Connection::OnDataChannel(
- rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
- data_channel_ = channel;
- data_channel_->RegisterObserver(this);
-}
-
-void Connection::OnIceCandidate(
- const webrtc::IceCandidateInterface *candidate) {
- flatbuffers::FlatBufferBuilder fbb(512);
- std::string ice_string;
- candidate->ToString(&ice_string);
-
- flatbuffers::Offset<WebSocketIce> ice_fb = CreateWebSocketIceDirect(
- fbb, ice_string.c_str(), candidate->sdp_mid().c_str(),
- candidate->sdp_mline_index());
- flatbuffers::Offset<WebSocketMessage> ice_message =
- CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
- fbb.Finish(ice_message);
-
- server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
-}
-
-// This is the callback for creating an sdp. We have to manually assign it
-// locally and send it to the client.
-void Connection::OnSuccess(webrtc::SessionDescriptionInterface *desc) {
- peer_connection_->SetLocalDescription(
- DummySetSessionDescriptionObserver::Create(), desc);
- flatbuffers::FlatBufferBuilder fbb(512);
- std::string answer_string;
- desc->ToString(&answer_string);
+ flatbuffers::FlatBufferBuilder fbb;
flatbuffers::Offset<WebSocketSdp> sdp_fb =
- CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, answer_string.c_str());
+ CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
flatbuffers::Offset<WebSocketMessage> answer_message =
CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
+
+ VLOG(1) << aos::FlatbufferToJson(
+ flatbuffers::GetTemporaryPointer(fbb, answer_message));
fbb.Finish(answer_message);
server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
+ mem_deref(local_sdp);
}
-// Wait until the data channel is ready for data before sending the config.
-void Connection::OnStateChange() {
- if (peer_connection_.get() != nullptr &&
- data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
- for (const auto &header : config_headers_) {
- Send(header.buffer());
+void ApplicationConnection::OnIce(const WebSocketIce *ice) {
+ if (!ice->has_candidate()) {
+ return;
+ }
+ uint8_t sdpMLineIndex = ice->sdpMLineIndex();
+
+ struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
+ CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
+ &ice_candidate, ice->candidate()->c_str(), ice->sdpMid()->c_str(),
+ &sdpMLineIndex, nullptr));
+
+ rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
+ ice_candidate);
+
+ mem_deref(ice_candidate);
+}
+
+void ApplicationConnection::LocalCandidate(
+ struct rawrtc_peer_connection_ice_candidate *const candidate,
+ char const *const url) {
+ struct rawrtc_ice_candidate *ortc_candidate = nullptr;
+ if (candidate) {
+ CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
+ &ortc_candidate, candidate));
+
+ flatbuffers::FlatBufferBuilder fbb;
+ char *sdpp = nullptr;
+ CHECK_RAWRTC(
+ rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
+ char *midp = nullptr;
+ CHECK_RAWRTC(
+ rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
+
+ uint8_t media_line_index;
+ enum rawrtc_code error =
+ rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
+ &media_line_index, candidate);
+
+ flatbuffers::Offset<flatbuffers::String> sdpp_offset =
+ fbb.CreateString(sdpp);
+ flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
+ fbb.CreateString(midp);
+
+ WebSocketIce::Builder web_socket_ice_builder(fbb);
+
+ web_socket_ice_builder.add_candidate(sdpp_offset);
+ web_socket_ice_builder.add_sdpMid(sdp_mid_offset);
+
+ if (error == RAWRTC_CODE_SUCCESS) {
+ web_socket_ice_builder.add_sdpMLineIndex(media_line_index);
}
+ flatbuffers::Offset<WebSocketIce> ice_offset =
+ web_socket_ice_builder.Finish();
+
+ flatbuffers::Offset<WebSocketMessage> ice_message =
+ CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
+ VLOG(1) << url << ": "
+ << aos::FlatbufferToJson(
+ flatbuffers::GetTemporaryPointer(fbb, ice_message));
+ fbb.Finish(ice_message);
+
+ server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
+
+ mem_deref(sdpp);
+ mem_deref(midp);
}
}
-// Handle DataChannel messages. Subscribe to each listener that matches the
-// subscribe message
-void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
- // Sanity check--we are relying on the Add/RemoveListener calls being made
- // from the correct thread.
- CHECK(signaling_thread_->IsCurrent());
+void ApplicationConnection::OnDataChannel(
+ std::shared_ptr<ScopedDataChannel> channel) {
+ if (channel->label() == std::string_view("signalling")) {
+ CHECK(!channel_);
+ channel_ = channel;
+
+ channel_->set_on_message(
+ [this](struct mbuf *const buffer,
+ const enum rawrtc_data_channel_message_flag flags) {
+ HandleSignallingData(buffer, flags);
+ });
+
+ channel_->set_on_open([this]() {
+ for (const auto &header : config_headers_) {
+ channel_->Send(header.buffer());
+ }
+ });
+
+ channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
+
+ // Register an on_close callback which does nothing but keeps channel alive
+ // until it is done. This keeps the memory around until rawrtc can finish
+ // calling the close callback.
+ channel_->set_on_close([channel]() {});
+ } else {
+ channel_->set_on_close([channel]() {});
+ channel->Close();
+ }
+}
+
+void ApplicationConnection::HandleSignallingData(
+ struct mbuf *const
+ buffer, // nullable (in case partial delivery has been requested)
+ const enum rawrtc_data_channel_message_flag /*flags*/) {
FlatbufferSpan<SubscriberRequest> message(
- {buffer.data.data(), buffer.data.size()});
+ {mbuf_buf(buffer), mbuf_get_left(buffer)});
if (!message.Verify()) {
LOG(ERROR) << "Invalid flatbuffer received from browser client.";
return;
}
- VLOG(2) << "Got a subscription message "
+ VLOG(1) << "Got a subscription message "
<< aos::FlatbufferToJson(&message.message());
if (!message.message().has_channels_to_transfer()) {
LOG(ERROR) << "No channels requested for transfer.";
return;
}
- for (auto &subscriber : subscribers_) {
- bool found_match = false;
- for (auto channel_request : *message.message().channels_to_transfer()) {
- const Channel *channel = channel_request->channel();
- if (channel == nullptr) {
- LOG(ERROR) << "Got unpopulated channel.";
- continue;
- }
- const TransferMethod transfer_method = channel_request->method();
- // Call GetChannel() before comparing the channel name/type to each
- // subscriber. This allows us to resolve any node or application specific
- // mappings.
- const Channel *comparison_channel =
- configuration::GetChannel(event_loop_->configuration(), channel,
- event_loop_->name(), event_loop_->node());
- if (comparison_channel == nullptr) {
- LOG(ERROR) << "Channel not available: "
- << configuration::StrippedChannelToString(channel);
- continue;
- }
- if (subscriber->Compare(comparison_channel)) {
- int index = subscriber->index();
- auto it = channels_.find(index);
- if (it == channels_.end()) {
- auto pair = channels_.insert(
- {index, peer_connection_->CreateDataChannel(
- channel->name()->str() + "/" + channel->type()->str(),
- nullptr)});
- it = pair.first;
- }
- subscriber->AddListener(it->second, transfer_method, signaling_thread_);
- VLOG(1) << "Subscribe to: " << channel->type()->str();
- found_match = true;
- break;
- }
+ // The client each time sends a full list of everything it wants to be
+ // subscribed to. It is our responsibility to remove channels which aren't
+ // in that list and add ones which need to be.
+ //
+ // Start by clearing a tracking bit on each channel. This takes O(number of
+ // open channels), which should be small.
+ //
+ // Then open any new channels. For any we visit which are already open,
+ // don't update those.
+ //
+ // Finally, iterate over the channel list and purge anything which we didn't
+ // touch.
+ for (auto &it : channels_) {
+ it.second.requested = false;
+ }
+ for (auto channel_request : *message.message().channels_to_transfer()) {
+ const Channel *channel = channel_request->channel();
+ if (channel == nullptr) {
+ LOG(ERROR) << "Got unpopulated channel.";
+ continue;
}
- if (!found_match) {
- int index = subscriber->index();
- auto it = channels_.find(index);
- subscriber->RemoveListener(it->second);
+ const TransferMethod transfer_method = channel_request->method();
+ // Call GetChannel() before comparing the channel name/type to each
+ // subscriber. This allows us to resolve any node or application
+ // specific mappings.
+ const Channel *comparison_channel =
+ configuration::GetChannel(event_loop_->configuration(), channel,
+ event_loop_->name(), event_loop_->node());
+ if (comparison_channel == nullptr) {
+ LOG(ERROR) << "Channel not available: "
+ << configuration::StrippedChannelToString(channel);
+ continue;
+ }
+
+ size_t channel_index = configuration::ChannelIndex(
+ event_loop_->configuration(), comparison_channel);
+
+ auto it = channels_.find(channel_index);
+ if (it == channels_.end()) {
+ std::shared_ptr<ScopedDataChannel> data_channel =
+ std::make_shared<ScopedDataChannel>();
+
+ std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
+
+ data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
+ channel_index]() {
+ std::shared_ptr<ScopedDataChannel> data_channel =
+ data_channel_weak_ptr.lock();
+ CHECK(data_channel) << ": Subscriber got destroyed before we started.";
+ // Weak ptr inside the subscriber so we don't have a circular
+ // reference. AddListener will close it.
+ subscribers_[channel_index]->AddListener(data_channel, transfer_method);
+ });
+
+ Subscriber *subscriber = subscribers_[channel_index].get();
+ data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
+ std::shared_ptr<ScopedDataChannel> data_channel =
+ data_channel_weak_ptr.lock();
+ CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
+ subscriber->RemoveListener(data_channel);
+ });
+
+ data_channel->Open(
+ connection_.connection(),
+ absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
+
+ auto pair = channels_.insert({channel_index, {data_channel, true}});
+ it = pair.first;
+ }
+
+ it->second.requested = true;
+
+ VLOG(1) << "Subscribe to: " << channel->type()->str();
+ }
+
+ for (auto &it : channels_) {
+ if (!it.second.requested) {
+ it.second.data_channel->Close();
+ it.second.data_channel = nullptr;
}
}
}