blob: c762b634e22cb9cdab4bc630d94c9672146c6446 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08007#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -08008#include "api/create_peerconnection_factory.h"
9#include "glog/logging.h"
James Kuszmaul48671362020-12-24 13:54:16 -080010#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080011
James Kuszmaul1e95bed2021-01-09 21:02:49 -080012DEFINE_int32(proxy_port, 8080, "Port to use for the web proxy server.");
13
Alex Perryb3b50792020-01-18 16:13:45 -080014namespace aos {
15namespace web_proxy {
16
17namespace {
18// Based on webrtc examples. In our controlled environment we expect setting sdp
19// to always succeed, and we can't do anything about a failure, so just ignore
20// everything.
21class DummySetSessionDescriptionObserver
22 : public webrtc::SetSessionDescriptionObserver {
23 public:
24 static DummySetSessionDescriptionObserver *Create() {
25 return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>();
26 }
27 virtual void OnSuccess() {}
James Kuszmaul48671362020-12-24 13:54:16 -080028 virtual void OnFailure(webrtc::RTCError /*error*/) {}
Alex Perryb3b50792020-01-18 16:13:45 -080029};
30
31} // namespace
32
James Kuszmaul7ad91522020-09-01 19:15:35 -070033WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul71a81932020-12-15 21:08:01 -080034 aos::EventLoop *event_loop, int buffer_size)
James Kuszmaul7ad91522020-09-01 19:15:35 -070035 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080036 config_(aos::CopyFlatBuffer(event_loop->configuration())),
37 event_loop_(event_loop) {
James Kuszmaul48671362020-12-24 13:54:16 -080038 // We need to reference findEmbeddedContent() to make the linker happy...
39 findEmbeddedContent("");
James Kuszmaul71a81932020-12-15 21:08:01 -080040 const aos::Node *self = event_loop->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070041
42 for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) {
43 auto channel = event_loop->configuration()->channels()->Get(i);
44 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
45 auto fetcher = event_loop->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080046 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
47 std::move(fetcher), i, buffer_size));
James Kuszmaul7ad91522020-09-01 19:15:35 -070048 }
49 }
James Kuszmaul7ad91522020-09-01 19:15:35 -070050 TimerHandler *const timer = event_loop->AddTimer([this]() {
51 for (auto &subscriber : subscribers_) {
52 subscriber->RunIteration();
53 }
54 });
55
56 event_loop->OnRun([timer, event_loop]() {
57 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(100));
58 });
59}
Alex Perryb3b50792020-01-18 16:13:45 -080060
61void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
James Kuszmaul71a81932020-12-15 21:08:01 -080062 std::unique_ptr<Connection> conn = std::make_unique<Connection>(
63 sock, server_, subscribers_, config_, event_loop_);
Alex Perryb3b50792020-01-18 16:13:45 -080064 connections_.insert({sock, std::move(conn)});
65}
66
67void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
68 size_t size) {
69 connections_[sock]->HandleWebSocketData(data, size);
70}
71
72void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
73 connections_.erase(sock);
74}
75
James Kuszmaul71a81932020-12-15 21:08:01 -080076WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
77 : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
78
79WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
80 : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
81
82WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
83 int buffer_size)
84 : epoll_(epoll),
85 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
86 ::seasocks::Logger::Level::Info)),
87 websocket_handler_(
88 new WebsocketHandler(&server_, event_loop, buffer_size)) {
89 server_.addWebSocketHandler("/ws", websocket_handler_);
James Kuszmaul1e95bed2021-01-09 21:02:49 -080090 CHECK(server_.startListening(FLAGS_proxy_port));
James Kuszmaul71a81932020-12-15 21:08:01 -080091
92 epoll->OnReadable(server_.fd(), [this]() {
93 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
94 });
95
96 if (&internal_epoll_ == epoll) {
97 TimerHandler *const timer = event_loop->AddTimer([this]() {
98 // Run the epoll poller until there are no more events (if we are being
99 // backed by a shm event loop, there won't be anything registered to
100 // internal_epoll_ and this will just return false).
101 // We just deal with clearing all the epoll events using a simulated
102 // timer. This does mean that we will spin rather than actually sleeping
103 // in any coherent manner, which will be particularly noticeable when past
104 // the end of processing other events.
105 while (internal_epoll_.Poll(false)) {
106 continue;
107 }
108 });
109
110 event_loop->OnRun([timer, event_loop]() {
111 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
112 });
113 }
114}
115
116WebProxy::~WebProxy() {
117 epoll_->DeleteFd(server_.fd());
118 server_.terminate();
119 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
120}
121
Alex Perry5f474f22020-02-01 12:14:24 -0800122void Subscriber::RunIteration() {
James Kuszmaul8d928d02020-12-25 17:47:49 -0800123 {
124 // Manage updating the channels_ map given the pending_* members from the
125 // *Listeners() methods.
126 // We handle all the removals first so that we correctly handle the
127 // situation where the user calls RemoveListener() and then AddListener()
128 // between calls to RunIteration(). The reverse order (adding and then
129 // removing before an update) is handled directly in RemoveListener() where
130 // we remove things from the pending_channels_ map directly.
131 MutexLocker lock(&mutex_);
132 for (const auto &channel : pending_removal_) {
133 channels_.erase(channel);
134 }
135 pending_removal_.clear();
136 for (const auto &channel : pending_channels_) {
137 channels_.insert(channel);
138 }
139 pending_channels_.clear();
140 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800141 if (channels_.empty() && buffer_size_ == 0) {
Alex Perry5f474f22020-02-01 12:14:24 -0800142 return;
143 }
144
James Kuszmaul71a81932020-12-15 21:08:01 -0800145 while (fetcher_->FetchNext()) {
146 // If we aren't building up a buffer, short-circuit the FetchNext().
147 if (buffer_size_ == 0) {
148 fetcher_->Fetch();
149 }
150 Message message;
151 message.index = fetcher_->context().queue_index;
152 VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
153 << "packets";
154 for (int packet_index = 0;
155 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
156 flatbuffers::Offset<MessageHeader> message_offset =
157 PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
158 fbb_.Finish(message_offset);
Alex Perry5f474f22020-02-01 12:14:24 -0800159
James Kuszmaul71a81932020-12-15 21:08:01 -0800160 const flatbuffers::DetachedBuffer buffer = fbb_.Release();
Alex Perry5f474f22020-02-01 12:14:24 -0800161
James Kuszmaul71a81932020-12-15 21:08:01 -0800162 message.data.emplace_back(
163 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
164 true /* binary array */);
165 }
166 message_buffer_.push_back(std::move(message));
167 }
168 for (auto &conn : channels_) {
169 rtc::scoped_refptr<webrtc::DataChannelInterface> rtc_channel = conn.first;
170 ChannelInformation *channel_data = &conn.second;
171 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
172 SkipToLastMessage(channel_data);
173 }
174 const webrtc::DataBuffer *buffer = NextBuffer(channel_data);
175 while (buffer != nullptr) {
176 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800177 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800178 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800179 }
James Kuszmaul8d928d02020-12-25 17:47:49 -0800180 // Call Send() from the signalling thread. The Invoke() call blocks until
181 // the handler has been called, so we do not need to handle any
182 // synchronization on this end. The body of the handler should be kept as
183 // short as possible to avoid blocking the signalling thread continuously
184 // for any longer than necessary.
185 channel_data->signaling_thread->Invoke<void>(
186 RTC_FROM_HERE,
187 [rtc_channel, buffer]() { rtc_channel->Send(*buffer); });
James Kuszmaul71a81932020-12-15 21:08:01 -0800188 buffer = NextBuffer(channel_data);
189 }
190 }
191 if (buffer_size_ >= 0) {
192 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
193 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800194 }
195 }
196}
197
198bool Subscriber::Compare(const Channel *channel) const {
Alex Perry22824d72020-02-29 17:11:43 -0800199 return channel->name()->string_view() ==
200 fetcher_->channel()->name()->string_view() &&
201 channel->type()->string_view() ==
202 fetcher_->channel()->type()->string_view();
Alex Perry5f474f22020-02-01 12:14:24 -0800203}
204
James Kuszmaul71a81932020-12-15 21:08:01 -0800205void Subscriber::AddListener(
206 rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
James Kuszmaul8d928d02020-12-25 17:47:49 -0800207 TransferMethod transfer_method, rtc::Thread *signaling_thread) {
208 MutexLocker lock(&mutex_);
James Kuszmaul71a81932020-12-15 21:08:01 -0800209 ChannelInformation info;
210 info.transfer_method = transfer_method;
James Kuszmaul8d928d02020-12-25 17:47:49 -0800211 info.signaling_thread = signaling_thread;
212 pending_channels_.emplace(channel, info);
James Kuszmaul71a81932020-12-15 21:08:01 -0800213}
214
215const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
216 CHECK_NOTNULL(channel);
217 if (message_buffer_.empty()) {
218 return nullptr;
219 }
220 const uint32_t earliest_index = message_buffer_.front().index;
221 const uint32_t latest_index = message_buffer_.back().index;
222 const bool fell_behind = channel->current_queue_index < earliest_index;
223 if (fell_behind) {
224 channel->current_queue_index = earliest_index;
225 channel->next_packet_number = 0;
226 return &message_buffer_.front().data.at(0);
227 }
228 if (channel->current_queue_index > latest_index) {
229 // We are still waiting on the next message to appear; return.
230 return nullptr;
231 }
232 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
233 << "Inconsistent queue indices.";
234 const size_t packets_in_message =
235 message_buffer_[channel->current_queue_index - earliest_index]
236 .data.size();
237 CHECK_LT(0u, packets_in_message);
238 CHECK_LT(channel->next_packet_number, packets_in_message);
239
240 const webrtc::DataBuffer *data =
241 &message_buffer_[channel->current_queue_index - earliest_index].data.at(
242 channel->next_packet_number);
243
244 ++channel->next_packet_number;
245 if (channel->next_packet_number == packets_in_message) {
246 ++channel->current_queue_index;
247 channel->next_packet_number = 0;
248 }
249
250 return data;
251}
252
253void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
254 CHECK_NOTNULL(channel);
255 if (message_buffer_.empty() ||
256 channel->current_queue_index == message_buffer_.back().index) {
257 return;
258 }
259 channel->current_queue_index = message_buffer_.back().index;
260 channel->next_packet_number = 0;
261}
262
263void Subscriber::RemoveListener(
264 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
James Kuszmaul8d928d02020-12-25 17:47:49 -0800265 MutexLocker lock(&mutex_);
266 pending_channels_.erase(channel);
267 pending_removal_.push_back(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -0800268}
269
Alex Perry5f474f22020-02-01 12:14:24 -0800270Connection::Connection(
271 ::seasocks::WebSocket *sock, ::seasocks::Server *server,
272 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800273 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
274 const EventLoop *event_loop)
Alex Perry5f474f22020-02-01 12:14:24 -0800275 : sock_(sock),
276 server_(server),
277 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800278 config_headers_(PackBuffer(config.span())),
279 event_loop_(event_loop) {}
Alex Perryb3b50792020-01-18 16:13:45 -0800280
James Kuszmaul71a81932020-12-15 21:08:01 -0800281// Function called for web socket data. Parses the flatbuffer and
282// handles it appropriately.
Alex Perryb3b50792020-01-18 16:13:45 -0800283void Connection::HandleWebSocketData(const uint8_t *data, size_t size) {
James Kuszmaul48671362020-12-24 13:54:16 -0800284 const FlatbufferSpan<WebSocketMessage> message({data, size});
285 if (!message.Verify()) {
286 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
287 return;
288 }
289 switch (message.message().payload_type()) {
Alex Perryb3b50792020-01-18 16:13:45 -0800290 case Payload::WebSocketSdp: {
James Kuszmaul48671362020-12-24 13:54:16 -0800291 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
Alex Perryb3b50792020-01-18 16:13:45 -0800292 if (offer->type() != SdpType::OFFER) {
293 LOG(WARNING) << "Got the wrong sdp type from client";
294 break;
295 }
296 const flatbuffers::String *sdp = offer->payload();
297 webrtc::SdpParseError error;
298 std::unique_ptr<webrtc::SessionDescriptionInterface> desc =
299 CreateSessionDescription(webrtc::SdpType::kOffer, sdp->str(), &error);
300 if (!desc) {
301 LOG(WARNING) << "Failed to parse sdp description: "
302 << error.description;
303 // TODO(alex): send a message back to browser for failure.
304 break;
305 }
306
James Kuszmaul71a81932020-12-15 21:08:01 -0800307 // We can only start creating the PeerConnection once we have
308 // something to give it, so we wait until we get an offer before
309 // starting.
Alex Perryb3b50792020-01-18 16:13:45 -0800310 webrtc::PeerConnectionInterface::RTCConfiguration config;
311 config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
312 config.enable_dtls_srtp = true;
James Kuszmaul54424d02020-12-26 18:09:20 -0800313 {
314 webrtc::PeerConnectionInterface::IceServer ice_server;
315 ice_server.urls.push_back("stun:stun.l.google.com:19302");
316 config.servers.push_back(ice_server);
317 }
Alex Perryb3b50792020-01-18 16:13:45 -0800318
319 std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create();
320 signaling_thread->SetName("signaling_thread", nullptr);
321 signaling_thread->Start();
322
James Kuszmaul8d928d02020-12-25 17:47:49 -0800323 signaling_thread_ = signaling_thread.get();
324
Alex Perryb3b50792020-01-18 16:13:45 -0800325 webrtc::PeerConnectionFactoryDependencies factory_deps;
326 factory_deps.signaling_thread = signaling_thread.release();
327 rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
328 CreateModularPeerConnectionFactory(std::move(factory_deps));
James Kuszmaul54424d02020-12-26 18:09:20 -0800329 {
330 // Don't ignore *any* networks--by default, the loopback interface is
331 // ignored, which makes it impossible to use WebRTC on devices with no
332 // network.
333 webrtc::PeerConnectionFactoryInterface::Options options;
334 options.network_ignore_mask = 0;
335 factory->SetOptions(options);
336 }
Alex Perryb3b50792020-01-18 16:13:45 -0800337
338 peer_connection_ =
339 factory->CreatePeerConnection(config, nullptr, nullptr, this);
340
341 peer_connection_->SetRemoteDescription(
342 DummySetSessionDescriptionObserver::Create(), desc.release());
343
344 peer_connection_->CreateAnswer(
345 this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
346 break;
347 }
348 case Payload::WebSocketIce: {
James Kuszmaul48671362020-12-24 13:54:16 -0800349 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
Alex Perryb3b50792020-01-18 16:13:45 -0800350 std::string candidate = ice->candidate()->str();
351 std::string sdpMid = ice->sdpMid()->str();
352 int sdpMLineIndex = ice->sdpMLineIndex();
353 webrtc::SdpParseError error;
354 webrtc::IceCandidateInterface *ice_candidate =
355 webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error);
356 if (!ice_candidate) {
357 LOG(WARNING) << "Failed to parse ice candidate: " << error.description;
358 // TODO(alex): send a message back to browser for failure.
359 break;
360 }
361 peer_connection_->AddIceCandidate(ice_candidate);
362 break;
363 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800364 default: {
365 break;
366 }
Alex Perryb3b50792020-01-18 16:13:45 -0800367 }
368}
369
Alex Perry5f474f22020-02-01 12:14:24 -0800370void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
371 webrtc::DataBuffer data_buffer(
372 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
373 true /* binary array */);
Alex Perry3dfcb812020-03-04 19:32:17 -0800374 VLOG(1) << "Sending " << buffer.size() << "bytes to a client";
Alex Perry5f474f22020-02-01 12:14:24 -0800375 data_channel_->Send(data_buffer);
376}
377
Alex Perryb3b50792020-01-18 16:13:45 -0800378void Connection::OnDataChannel(
379 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
380 data_channel_ = channel;
381 data_channel_->RegisterObserver(this);
382}
383
384void Connection::OnIceCandidate(
385 const webrtc::IceCandidateInterface *candidate) {
386 flatbuffers::FlatBufferBuilder fbb(512);
387 std::string ice_string;
388 candidate->ToString(&ice_string);
389
390 flatbuffers::Offset<WebSocketIce> ice_fb = CreateWebSocketIceDirect(
391 fbb, ice_string.c_str(), candidate->sdp_mid().c_str(),
392 candidate->sdp_mline_index());
393 flatbuffers::Offset<WebSocketMessage> ice_message =
394 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
395 fbb.Finish(ice_message);
396
397 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
398}
399
400// This is the callback for creating an sdp. We have to manually assign it
401// locally and send it to the client.
402void Connection::OnSuccess(webrtc::SessionDescriptionInterface *desc) {
403 peer_connection_->SetLocalDescription(
404 DummySetSessionDescriptionObserver::Create(), desc);
405 flatbuffers::FlatBufferBuilder fbb(512);
406 std::string answer_string;
407 desc->ToString(&answer_string);
408 flatbuffers::Offset<WebSocketSdp> sdp_fb =
409 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, answer_string.c_str());
410 flatbuffers::Offset<WebSocketMessage> answer_message =
411 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
412 fbb.Finish(answer_message);
413
414 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
415}
416
Alex Perry5f474f22020-02-01 12:14:24 -0800417// Wait until the data channel is ready for data before sending the config.
418void Connection::OnStateChange() {
419 if (peer_connection_.get() != nullptr &&
420 data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800421 for (const auto &header : config_headers_) {
James Kuszmaul1ec74432020-07-30 20:26:45 -0700422 Send(header.buffer());
423 }
Alex Perry5f474f22020-02-01 12:14:24 -0800424 }
425}
426
427// Handle DataChannel messages. Subscribe to each listener that matches the
428// subscribe message
Alex Perryb3b50792020-01-18 16:13:45 -0800429void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
James Kuszmaul8d928d02020-12-25 17:47:49 -0800430 // Sanity check--we are relying on the Add/RemoveListener calls being made
431 // from the correct thread.
432 CHECK(signaling_thread_->IsCurrent());
James Kuszmaul71a81932020-12-15 21:08:01 -0800433 FlatbufferSpan<SubscriberRequest> message(
434 {buffer.data.data(), buffer.data.size()});
435 if (!message.Verify()) {
436 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
437 return;
438 }
439 VLOG(2) << "Got a subscription message "
440 << aos::FlatbufferToJson(&message.message());
441 if (!message.message().has_channels_to_transfer()) {
442 LOG(ERROR) << "No channels requested for transfer.";
443 return;
444 }
Alex Perry5f474f22020-02-01 12:14:24 -0800445 for (auto &subscriber : subscribers_) {
446 bool found_match = false;
James Kuszmaul71a81932020-12-15 21:08:01 -0800447 for (auto channel_request : *message.message().channels_to_transfer()) {
448 const Channel *channel = channel_request->channel();
449 if (channel == nullptr) {
450 LOG(ERROR) << "Got unpopulated channel.";
451 continue;
452 }
453 const TransferMethod transfer_method = channel_request->method();
454 // Call GetChannel() before comparing the channel name/type to each
455 // subscriber. This allows us to resolve any node or application specific
456 // mappings.
457 const Channel *comparison_channel =
458 configuration::GetChannel(event_loop_->configuration(), channel,
459 event_loop_->name(), event_loop_->node());
460 if (comparison_channel == nullptr) {
461 LOG(ERROR) << "Channel not available: "
462 << configuration::StrippedChannelToString(channel);
463 continue;
464 }
465 if (subscriber->Compare(comparison_channel)) {
Alex Perry5f474f22020-02-01 12:14:24 -0800466 int index = subscriber->index();
467 auto it = channels_.find(index);
468 if (it == channels_.end()) {
469 auto pair = channels_.insert(
470 {index, peer_connection_->CreateDataChannel(
471 channel->name()->str() + "/" + channel->type()->str(),
472 nullptr)});
473 it = pair.first;
474 }
James Kuszmaul8d928d02020-12-25 17:47:49 -0800475 subscriber->AddListener(it->second, transfer_method, signaling_thread_);
Alex Perry5f474f22020-02-01 12:14:24 -0800476
477 VLOG(1) << "Subscribe to: " << channel->type()->str();
478 found_match = true;
479 break;
480 }
481 }
482 if (!found_match) {
483 int index = subscriber->index();
484 auto it = channels_.find(index);
485 subscriber->RemoveListener(it->second);
486 }
487 }
Alex Perryb3b50792020-01-18 16:13:45 -0800488}
489
490} // namespace web_proxy
491} // namespace aos