blob: 6147e46870ed64de4cd4e33faf6601e635b23d99 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08007#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -08008#include "glog/logging.h"
James Kuszmaul48671362020-12-24 13:54:16 -08009#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080010
Austin Schuh52e5e3a2021-04-24 22:30:02 -070011extern "C" {
12#include <rawrtc.h>
13
14#define DEBUG_LEVEL 7
15#define DEBUG_MODULE "web-proxy"
16#include <re_dbg.h>
17struct list *tmrl_get(void);
18}
19
James Kuszmaul1e95bed2021-01-09 21:02:49 -080020DEFINE_int32(proxy_port, 8080, "Port to use for the web proxy server.");
21
Alex Perryb3b50792020-01-18 16:13:45 -080022namespace aos {
23namespace web_proxy {
James Kuszmaul7ad91522020-09-01 19:15:35 -070024WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul71a81932020-12-15 21:08:01 -080025 aos::EventLoop *event_loop, int buffer_size)
James Kuszmaul7ad91522020-09-01 19:15:35 -070026 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080027 config_(aos::CopyFlatBuffer(event_loop->configuration())),
28 event_loop_(event_loop) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070029 if (VLOG_IS_ON(2)) {
30 dbg_init(DBG_DEBUG, DBG_ALL);
31 }
32 CHECK_RAWRTC(rawrtc_init(true));
33
James Kuszmaul48671362020-12-24 13:54:16 -080034 // We need to reference findEmbeddedContent() to make the linker happy...
35 findEmbeddedContent("");
Austin Schuh52e5e3a2021-04-24 22:30:02 -070036 const aos::Node *self = event_loop_->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070037
Austin Schuh52e5e3a2021-04-24 22:30:02 -070038 subscribers_.reserve(event_loop_->configuration()->channels()->size());
39 for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
40 auto channel = event_loop_->configuration()->channels()->Get(i);
James Kuszmaul7ad91522020-09-01 19:15:35 -070041 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070042 auto fetcher = event_loop_->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080043 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
44 std::move(fetcher), i, buffer_size));
Austin Schuh52e5e3a2021-04-24 22:30:02 -070045 } else {
46 subscribers_.emplace_back(nullptr);
James Kuszmaul7ad91522020-09-01 19:15:35 -070047 }
48 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -070049 TimerHandler *const timer = event_loop_->AddTimer([this]() {
James Kuszmaul7ad91522020-09-01 19:15:35 -070050 for (auto &subscriber : subscribers_) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070051 if (subscriber) subscriber->RunIteration();
James Kuszmaul7ad91522020-09-01 19:15:35 -070052 }
53 });
54
Austin Schuh52e5e3a2021-04-24 22:30:02 -070055 event_loop_->OnRun([this, timer]() {
56 timer->Setup(event_loop_->monotonic_now(), std::chrono::milliseconds(100));
James Kuszmaul7ad91522020-09-01 19:15:35 -070057 });
58}
Alex Perryb3b50792020-01-18 16:13:45 -080059
60void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070061 std::unique_ptr<ApplicationConnection> connection =
62 std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
63 config_, event_loop_);
64
65 connections_.insert({sock, std::move(connection)});
Alex Perryb3b50792020-01-18 16:13:45 -080066}
67
68void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
69 size_t size) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070070 const FlatbufferSpan<WebSocketMessage> message({data, size});
71 if (!message.Verify()) {
72 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
73 return;
74 }
75 VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
76 switch (message.message().payload_type()) {
77 case Payload::WebSocketSdp: {
78 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
79 if (offer->type() != SdpType::OFFER) {
80 LOG(WARNING) << "Got the wrong sdp type from client";
81 break;
82 }
83 const flatbuffers::String *sdp = offer->payload();
84 connections_[sock]->OnSdp(sdp->c_str());
85 break;
86 }
87 case Payload::WebSocketIce: {
88 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
89 connections_[sock]->OnIce(ice);
90 break;
91 }
92 default: { break; }
93 }
Alex Perryb3b50792020-01-18 16:13:45 -080094}
95
96void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
97 connections_.erase(sock);
98}
99
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700100// Global epoll pointer
101static aos::internal::EPoll *global_epoll = nullptr;
102
103static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
104 if (flags & 0x1) {
105 global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
106 }
107 if (flags & 0x2) {
108 global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
109 }
110 if (flags & 0x4) {
111 global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
112 }
113 return 0;
114}
115
116static void ReFdClose(int fd) {
117 CHECK(global_epoll != nullptr);
118 global_epoll->DeleteFd(fd);
119}
120
James Kuszmaul71a81932020-12-15 21:08:01 -0800121WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
122 : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
123
124WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
125 : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
126
127WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
128 int buffer_size)
129 : epoll_(epoll),
130 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
131 ::seasocks::Logger::Level::Info)),
132 websocket_handler_(
133 new WebsocketHandler(&server_, event_loop, buffer_size)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700134 CHECK(!global_epoll);
135 global_epoll = epoll;
136
137 re_fd_set_listen_callback(&ReFdListen);
138 re_fd_set_close_callback(&ReFdClose);
139
140 epoll->BeforeWait([]() {
141 const uint64_t to = tmr_next_timeout(tmrl_get());
142 if (to != 0) {
143 VLOG(1) << "Next timeout " << to;
144 }
145 // Note: this only works because we are spinning on it...
146 // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
147 // for handling tmr.
148 tmr_poll(tmrl_get());
149 });
150
James Kuszmaul71a81932020-12-15 21:08:01 -0800151 server_.addWebSocketHandler("/ws", websocket_handler_);
James Kuszmaul1e95bed2021-01-09 21:02:49 -0800152 CHECK(server_.startListening(FLAGS_proxy_port));
James Kuszmaul71a81932020-12-15 21:08:01 -0800153
154 epoll->OnReadable(server_.fd(), [this]() {
155 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
156 });
157
158 if (&internal_epoll_ == epoll) {
159 TimerHandler *const timer = event_loop->AddTimer([this]() {
160 // Run the epoll poller until there are no more events (if we are being
161 // backed by a shm event loop, there won't be anything registered to
162 // internal_epoll_ and this will just return false).
163 // We just deal with clearing all the epoll events using a simulated
164 // timer. This does mean that we will spin rather than actually sleeping
165 // in any coherent manner, which will be particularly noticeable when past
166 // the end of processing other events.
167 while (internal_epoll_.Poll(false)) {
168 continue;
169 }
170 });
171
172 event_loop->OnRun([timer, event_loop]() {
173 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
174 });
175 }
176}
177
178WebProxy::~WebProxy() {
179 epoll_->DeleteFd(server_.fd());
180 server_.terminate();
181 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700182 CHECK(global_epoll == epoll_);
183 global_epoll = nullptr;
James Kuszmaul71a81932020-12-15 21:08:01 -0800184}
185
Alex Perry5f474f22020-02-01 12:14:24 -0800186void Subscriber::RunIteration() {
James Kuszmaul71a81932020-12-15 21:08:01 -0800187 if (channels_.empty() && buffer_size_ == 0) {
Alex Perry5f474f22020-02-01 12:14:24 -0800188 return;
189 }
190
James Kuszmaul71a81932020-12-15 21:08:01 -0800191 while (fetcher_->FetchNext()) {
192 // If we aren't building up a buffer, short-circuit the FetchNext().
193 if (buffer_size_ == 0) {
194 fetcher_->Fetch();
195 }
196 Message message;
197 message.index = fetcher_->context().queue_index;
198 VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
199 << "packets";
200 for (int packet_index = 0;
201 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700202 flatbuffers::FlatBufferBuilder fbb;
James Kuszmaul71a81932020-12-15 21:08:01 -0800203 flatbuffers::Offset<MessageHeader> message_offset =
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700204 PackMessage(&fbb, fetcher_->context(), channel_index_, packet_index);
205 fbb.Finish(message_offset);
Alex Perry5f474f22020-02-01 12:14:24 -0800206
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700207 const flatbuffers::DetachedBuffer buffer = fbb.Release();
208
209
210 struct mbuf *mbuffer = mbuf_alloc(buffer.size());
211 mbuf_write_mem(mbuffer, buffer.data(), buffer.size());
212 mbuf_set_pos(mbuffer, 0);
Alex Perry5f474f22020-02-01 12:14:24 -0800213
James Kuszmaul71a81932020-12-15 21:08:01 -0800214 message.data.emplace_back(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700215 std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
James Kuszmaul71a81932020-12-15 21:08:01 -0800216 }
217 message_buffer_.push_back(std::move(message));
218 }
219 for (auto &conn : channels_) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700220 std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first;
James Kuszmaul71a81932020-12-15 21:08:01 -0800221 ChannelInformation *channel_data = &conn.second;
222 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
223 SkipToLastMessage(channel_data);
224 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700225 std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
226 while (buffer) {
227 // TODO(austin): This is a nop so we just buffer forever. Fix this when
228 // we care.
James Kuszmaul71a81932020-12-15 21:08:01 -0800229 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800230 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800231 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800232 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700233
234 rtc_channel->Send(buffer.get());
James Kuszmaul71a81932020-12-15 21:08:01 -0800235 buffer = NextBuffer(channel_data);
236 }
237 }
238 if (buffer_size_ >= 0) {
239 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
240 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800241 }
242 }
243}
244
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700245void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
246 TransferMethod transfer_method) {
247 VLOG(1) << "Adding listener for " << data_channel.get();
James Kuszmaul71a81932020-12-15 21:08:01 -0800248 ChannelInformation info;
249 info.transfer_method = transfer_method;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700250
251 channels_.emplace(data_channel, info);
James Kuszmaul71a81932020-12-15 21:08:01 -0800252}
253
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700254void Subscriber::RemoveListener(
255 std::shared_ptr<ScopedDataChannel> data_channel) {
256 VLOG(1) << "Removing listener for " << data_channel.get();
257 channels_.erase(data_channel);
258}
259
260std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
261 ChannelInformation *channel) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800262 CHECK_NOTNULL(channel);
263 if (message_buffer_.empty()) {
264 return nullptr;
265 }
266 const uint32_t earliest_index = message_buffer_.front().index;
267 const uint32_t latest_index = message_buffer_.back().index;
268 const bool fell_behind = channel->current_queue_index < earliest_index;
269 if (fell_behind) {
270 channel->current_queue_index = earliest_index;
271 channel->next_packet_number = 0;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700272 return message_buffer_.front().data.at(0);
James Kuszmaul71a81932020-12-15 21:08:01 -0800273 }
274 if (channel->current_queue_index > latest_index) {
275 // We are still waiting on the next message to appear; return.
276 return nullptr;
277 }
278 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
279 << "Inconsistent queue indices.";
280 const size_t packets_in_message =
281 message_buffer_[channel->current_queue_index - earliest_index]
282 .data.size();
283 CHECK_LT(0u, packets_in_message);
284 CHECK_LT(channel->next_packet_number, packets_in_message);
285
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700286 std::shared_ptr<struct mbuf> original_data =
287 message_buffer_[channel->current_queue_index - earliest_index].data.at(
James Kuszmaul71a81932020-12-15 21:08:01 -0800288 channel->next_packet_number);
289
290 ++channel->next_packet_number;
291 if (channel->next_packet_number == packets_in_message) {
292 ++channel->current_queue_index;
293 channel->next_packet_number = 0;
294 }
295
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700296 // Trigger a copy of the mbuf without copying the data.
297 return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
298 mem_deref);
James Kuszmaul71a81932020-12-15 21:08:01 -0800299}
300
301void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
302 CHECK_NOTNULL(channel);
303 if (message_buffer_.empty() ||
304 channel->current_queue_index == message_buffer_.back().index) {
305 return;
306 }
307 channel->current_queue_index = message_buffer_.back().index;
308 channel->next_packet_number = 0;
309}
310
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700311ApplicationConnection::ApplicationConnection(
312 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
Alex Perry5f474f22020-02-01 12:14:24 -0800313 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800314 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
315 const EventLoop *event_loop)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700316 : server_(server),
317 sock_(sock),
Alex Perry5f474f22020-02-01 12:14:24 -0800318 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800319 config_headers_(PackBuffer(config.span())),
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700320 event_loop_(event_loop) {
321 connection_.set_on_negotiation_needed([]() {
322 VLOG(1) << "Negotiation needed, not offering so not creating offer.";
323 });
Alex Perryb3b50792020-01-18 16:13:45 -0800324
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700325 connection_.set_on_local_candidate(
326 [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
327 char const *const url) { LocalCandidate(candidate, url); });
328
329 connection_.set_on_data_channel(
330 [this](std::shared_ptr<ScopedDataChannel> channel) {
331 OnDataChannel(channel);
332 });
333
334 connection_.Open();
335}
336
337ApplicationConnection::~ApplicationConnection() {
338 for (auto &it : channels_) {
339 it.second.data_channel->Close();
340 it.second.data_channel = nullptr;
341 }
342
343 // Eh, we are done, tell the channel to shut down. If we didn't, it would
344 // just hang around until the connection closes, which is rather shortly
345 // after.
346 if (channel_) {
347 channel_->Close();
348 }
349}
350
351void ApplicationConnection::OnSdp(const char *sdp) {
352 struct rawrtc_peer_connection_description *remote_description = NULL;
353
354 auto error = rawrtc_peer_connection_description_create(
355 &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
356 if (error) {
357 LOG(WARNING) << "Cannot parse remote description: "
358 << rawrtc_code_to_str(error);
James Kuszmaul48671362020-12-24 13:54:16 -0800359 return;
360 }
Alex Perryb3b50792020-01-18 16:13:45 -0800361
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700362 CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
363 connection_.connection(), remote_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800364
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700365 struct rawrtc_peer_connection_description *local_description;
366 CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
367 connection_.connection()));
368 CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
369 connection_.connection(), local_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800370
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700371 enum rawrtc_sdp_type type;
372 char *local_sdp = nullptr;
373 // Get SDP type & the SDP itself
374 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
375 &type, local_description));
376 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
377 local_description));
James Kuszmaul8d928d02020-12-25 17:47:49 -0800378
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700379 flatbuffers::FlatBufferBuilder fbb;
Alex Perryb3b50792020-01-18 16:13:45 -0800380 flatbuffers::Offset<WebSocketSdp> sdp_fb =
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700381 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800382 flatbuffers::Offset<WebSocketMessage> answer_message =
383 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700384
385 VLOG(1) << aos::FlatbufferToJson(
386 flatbuffers::GetTemporaryPointer(fbb, answer_message));
Alex Perryb3b50792020-01-18 16:13:45 -0800387 fbb.Finish(answer_message);
388
389 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700390 mem_deref(local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800391}
392
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700393void ApplicationConnection::OnIce(const WebSocketIce *ice) {
394 if (!ice->has_candidate()) {
395 return;
396 }
397 uint8_t sdpMLineIndex = ice->sdpMLineIndex();
398
399 struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
400 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
401 &ice_candidate, ice->candidate()->c_str(), ice->sdpMid()->c_str(),
402 &sdpMLineIndex, nullptr));
403
404 rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
405 ice_candidate);
406
407 mem_deref(ice_candidate);
408}
409
410void ApplicationConnection::LocalCandidate(
411 struct rawrtc_peer_connection_ice_candidate *const candidate,
412 char const *const url) {
413 struct rawrtc_ice_candidate *ortc_candidate = nullptr;
414 if (candidate) {
415 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
416 &ortc_candidate, candidate));
417
418 flatbuffers::FlatBufferBuilder fbb;
419 char *sdpp = nullptr;
420 CHECK_RAWRTC(
421 rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
422 char *midp = nullptr;
423 CHECK_RAWRTC(
424 rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
425
426 uint8_t media_line_index;
427 enum rawrtc_code error =
428 rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
429 &media_line_index, candidate);
430
431 flatbuffers::Offset<flatbuffers::String> sdpp_offset =
432 fbb.CreateString(sdpp);
433 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
434 fbb.CreateString(midp);
435
436 WebSocketIce::Builder web_socket_ice_builder(fbb);
437
438 web_socket_ice_builder.add_candidate(sdpp_offset);
439 web_socket_ice_builder.add_sdpMid(sdp_mid_offset);
440
441 if (error == RAWRTC_CODE_SUCCESS) {
442 web_socket_ice_builder.add_sdpMLineIndex(media_line_index);
James Kuszmaul1ec74432020-07-30 20:26:45 -0700443 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700444 flatbuffers::Offset<WebSocketIce> ice_offset =
445 web_socket_ice_builder.Finish();
446
447 flatbuffers::Offset<WebSocketMessage> ice_message =
448 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
449 VLOG(1) << url << ": "
450 << aos::FlatbufferToJson(
451 flatbuffers::GetTemporaryPointer(fbb, ice_message));
452 fbb.Finish(ice_message);
453
454 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
455
456 mem_deref(sdpp);
457 mem_deref(midp);
Alex Perry5f474f22020-02-01 12:14:24 -0800458 }
459}
460
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700461void ApplicationConnection::OnDataChannel(
462 std::shared_ptr<ScopedDataChannel> channel) {
463 if (channel->label() == std::string_view("signalling")) {
464 CHECK(!channel_);
465 channel_ = channel;
466
467 channel_->set_on_message(
468 [this](struct mbuf *const buffer,
469 const enum rawrtc_data_channel_message_flag flags) {
470 HandleSignallingData(buffer, flags);
471 });
472
473 channel_->set_on_open([this]() {
474 for (const auto &header : config_headers_) {
475 channel_->Send(header.buffer());
476 }
477 });
478
479 channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
480
481 // Register an on_close callback which does nothing but keeps channel alive
482 // until it is done. This keeps the memory around until rawrtc can finish
483 // calling the close callback.
484 channel_->set_on_close([channel]() {});
485 } else {
486 channel_->set_on_close([channel]() {});
487 channel->Close();
488 }
489}
490
491void ApplicationConnection::HandleSignallingData(
492 struct mbuf *const
493 buffer, // nullable (in case partial delivery has been requested)
494 const enum rawrtc_data_channel_message_flag /*flags*/) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800495 FlatbufferSpan<SubscriberRequest> message(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700496 {mbuf_buf(buffer), mbuf_get_left(buffer)});
James Kuszmaul71a81932020-12-15 21:08:01 -0800497 if (!message.Verify()) {
498 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
499 return;
500 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700501 VLOG(1) << "Got a subscription message "
James Kuszmaul71a81932020-12-15 21:08:01 -0800502 << aos::FlatbufferToJson(&message.message());
503 if (!message.message().has_channels_to_transfer()) {
504 LOG(ERROR) << "No channels requested for transfer.";
505 return;
506 }
Alex Perry5f474f22020-02-01 12:14:24 -0800507
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700508 // The client each time sends a full list of everything it wants to be
509 // subscribed to. It is our responsibility to remove channels which aren't
510 // in that list and add ones which need to be.
511 //
512 // Start by clearing a tracking bit on each channel. This takes O(number of
513 // open channels), which should be small.
514 //
515 // Then open any new channels. For any we visit which are already open,
516 // don't update those.
517 //
518 // Finally, iterate over the channel list and purge anything which we didn't
519 // touch.
520 for (auto &it : channels_) {
521 it.second.requested = false;
522 }
523 for (auto channel_request : *message.message().channels_to_transfer()) {
524 const Channel *channel = channel_request->channel();
525 if (channel == nullptr) {
526 LOG(ERROR) << "Got unpopulated channel.";
527 continue;
Alex Perry5f474f22020-02-01 12:14:24 -0800528 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700529 const TransferMethod transfer_method = channel_request->method();
530 // Call GetChannel() before comparing the channel name/type to each
531 // subscriber. This allows us to resolve any node or application
532 // specific mappings.
533 const Channel *comparison_channel =
534 configuration::GetChannel(event_loop_->configuration(), channel,
535 event_loop_->name(), event_loop_->node());
536 if (comparison_channel == nullptr) {
537 LOG(ERROR) << "Channel not available: "
538 << configuration::StrippedChannelToString(channel);
539 continue;
540 }
541
542 size_t channel_index = configuration::ChannelIndex(
543 event_loop_->configuration(), comparison_channel);
544
545 auto it = channels_.find(channel_index);
546 if (it == channels_.end()) {
547 std::shared_ptr<ScopedDataChannel> data_channel =
548 std::make_shared<ScopedDataChannel>();
549
550 std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
551
552 data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
553 channel_index]() {
554 std::shared_ptr<ScopedDataChannel> data_channel =
555 data_channel_weak_ptr.lock();
556 CHECK(data_channel) << ": Subscriber got destroyed before we started.";
557 // Weak ptr inside the subscriber so we don't have a circular
558 // reference. AddListener will close it.
559 subscribers_[channel_index]->AddListener(data_channel, transfer_method);
560 });
561
562 Subscriber *subscriber = subscribers_[channel_index].get();
563 data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
564 std::shared_ptr<ScopedDataChannel> data_channel =
565 data_channel_weak_ptr.lock();
566 CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
567 subscriber->RemoveListener(data_channel);
568 });
569
570 data_channel->Open(
571 connection_.connection(),
572 absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
573
574 auto pair = channels_.insert({channel_index, {data_channel, true}});
575 it = pair.first;
576 }
577
578 it->second.requested = true;
579
580 VLOG(1) << "Subscribe to: " << channel->type()->str();
581 }
582
583 for (auto &it : channels_) {
584 if (!it.second.requested) {
585 it.second.data_channel->Close();
586 it.second.data_channel = nullptr;
Alex Perry5f474f22020-02-01 12:14:24 -0800587 }
588 }
Alex Perryb3b50792020-01-18 16:13:45 -0800589}
590
591} // namespace web_proxy
592} // namespace aos