blob: b53feab397ff8252cd9679d6ddf64f2a65a9c7d1 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08007#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -08008#include "glog/logging.h"
James Kuszmaul48671362020-12-24 13:54:16 -08009#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080010
Austin Schuh52e5e3a2021-04-24 22:30:02 -070011extern "C" {
12#include <rawrtc.h>
13
14#define DEBUG_LEVEL 7
15#define DEBUG_MODULE "web-proxy"
16#include <re_dbg.h>
17struct list *tmrl_get(void);
18}
19
James Kuszmaul1e95bed2021-01-09 21:02:49 -080020DEFINE_int32(proxy_port, 8080, "Port to use for the web proxy server.");
21
Alex Perryb3b50792020-01-18 16:13:45 -080022namespace aos {
23namespace web_proxy {
James Kuszmaul7ad91522020-09-01 19:15:35 -070024WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul71a81932020-12-15 21:08:01 -080025 aos::EventLoop *event_loop, int buffer_size)
James Kuszmaul7ad91522020-09-01 19:15:35 -070026 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080027 config_(aos::CopyFlatBuffer(event_loop->configuration())),
28 event_loop_(event_loop) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070029 if (VLOG_IS_ON(2)) {
30 dbg_init(DBG_DEBUG, DBG_ALL);
31 }
32 CHECK_RAWRTC(rawrtc_init(true));
33
James Kuszmaul48671362020-12-24 13:54:16 -080034 // We need to reference findEmbeddedContent() to make the linker happy...
35 findEmbeddedContent("");
Austin Schuh52e5e3a2021-04-24 22:30:02 -070036 const aos::Node *self = event_loop_->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070037
Austin Schuh52e5e3a2021-04-24 22:30:02 -070038 subscribers_.reserve(event_loop_->configuration()->channels()->size());
39 for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
40 auto channel = event_loop_->configuration()->channels()->Get(i);
James Kuszmaul7ad91522020-09-01 19:15:35 -070041 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070042 auto fetcher = event_loop_->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080043 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
44 std::move(fetcher), i, buffer_size));
Austin Schuh52e5e3a2021-04-24 22:30:02 -070045 } else {
46 subscribers_.emplace_back(nullptr);
James Kuszmaul7ad91522020-09-01 19:15:35 -070047 }
48 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -070049 TimerHandler *const timer = event_loop_->AddTimer([this]() {
James Kuszmaul7ad91522020-09-01 19:15:35 -070050 for (auto &subscriber : subscribers_) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070051 if (subscriber) subscriber->RunIteration();
James Kuszmaul7ad91522020-09-01 19:15:35 -070052 }
53 });
54
Austin Schuh52e5e3a2021-04-24 22:30:02 -070055 event_loop_->OnRun([this, timer]() {
56 timer->Setup(event_loop_->monotonic_now(), std::chrono::milliseconds(100));
James Kuszmaul7ad91522020-09-01 19:15:35 -070057 });
58}
Alex Perryb3b50792020-01-18 16:13:45 -080059
60void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070061 std::unique_ptr<ApplicationConnection> connection =
62 std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
63 config_, event_loop_);
64
65 connections_.insert({sock, std::move(connection)});
Alex Perryb3b50792020-01-18 16:13:45 -080066}
67
68void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
69 size_t size) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070070 const FlatbufferSpan<WebSocketMessage> message({data, size});
71 if (!message.Verify()) {
72 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
73 return;
74 }
75 VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
76 switch (message.message().payload_type()) {
77 case Payload::WebSocketSdp: {
78 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
79 if (offer->type() != SdpType::OFFER) {
80 LOG(WARNING) << "Got the wrong sdp type from client";
81 break;
82 }
83 const flatbuffers::String *sdp = offer->payload();
84 connections_[sock]->OnSdp(sdp->c_str());
85 break;
86 }
87 case Payload::WebSocketIce: {
88 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
89 connections_[sock]->OnIce(ice);
90 break;
91 }
92 default: { break; }
93 }
Alex Perryb3b50792020-01-18 16:13:45 -080094}
95
96void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
97 connections_.erase(sock);
98}
99
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700100// Global epoll pointer
101static aos::internal::EPoll *global_epoll = nullptr;
102
103static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
104 if (flags & 0x1) {
105 global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
106 }
107 if (flags & 0x2) {
108 global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
109 }
110 if (flags & 0x4) {
111 global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
112 }
113 return 0;
114}
115
116static void ReFdClose(int fd) {
117 CHECK(global_epoll != nullptr);
118 global_epoll->DeleteFd(fd);
119}
120
James Kuszmaul71a81932020-12-15 21:08:01 -0800121WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
122 : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
123
124WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
125 : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
126
127WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
128 int buffer_size)
129 : epoll_(epoll),
130 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
131 ::seasocks::Logger::Level::Info)),
132 websocket_handler_(
133 new WebsocketHandler(&server_, event_loop, buffer_size)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700134 CHECK(!global_epoll);
135 global_epoll = epoll;
136
137 re_fd_set_listen_callback(&ReFdListen);
138 re_fd_set_close_callback(&ReFdClose);
139
140 epoll->BeforeWait([]() {
141 const uint64_t to = tmr_next_timeout(tmrl_get());
142 if (to != 0) {
143 VLOG(1) << "Next timeout " << to;
144 }
145 // Note: this only works because we are spinning on it...
146 // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
147 // for handling tmr.
148 tmr_poll(tmrl_get());
149 });
150
James Kuszmaul71a81932020-12-15 21:08:01 -0800151 server_.addWebSocketHandler("/ws", websocket_handler_);
James Kuszmaul1e95bed2021-01-09 21:02:49 -0800152 CHECK(server_.startListening(FLAGS_proxy_port));
James Kuszmaul71a81932020-12-15 21:08:01 -0800153
154 epoll->OnReadable(server_.fd(), [this]() {
155 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
156 });
157
158 if (&internal_epoll_ == epoll) {
159 TimerHandler *const timer = event_loop->AddTimer([this]() {
160 // Run the epoll poller until there are no more events (if we are being
161 // backed by a shm event loop, there won't be anything registered to
162 // internal_epoll_ and this will just return false).
163 // We just deal with clearing all the epoll events using a simulated
164 // timer. This does mean that we will spin rather than actually sleeping
165 // in any coherent manner, which will be particularly noticeable when past
166 // the end of processing other events.
167 while (internal_epoll_.Poll(false)) {
168 continue;
169 }
170 });
171
172 event_loop->OnRun([timer, event_loop]() {
173 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
174 });
175 }
176}
177
178WebProxy::~WebProxy() {
179 epoll_->DeleteFd(server_.fd());
180 server_.terminate();
181 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700182 CHECK(global_epoll == epoll_);
183 global_epoll = nullptr;
James Kuszmaul71a81932020-12-15 21:08:01 -0800184}
185
Alex Perry5f474f22020-02-01 12:14:24 -0800186void Subscriber::RunIteration() {
James Kuszmaul71a81932020-12-15 21:08:01 -0800187 if (channels_.empty() && buffer_size_ == 0) {
Alex Perry5f474f22020-02-01 12:14:24 -0800188 return;
189 }
190
James Kuszmaul71a81932020-12-15 21:08:01 -0800191 while (fetcher_->FetchNext()) {
192 // If we aren't building up a buffer, short-circuit the FetchNext().
193 if (buffer_size_ == 0) {
194 fetcher_->Fetch();
195 }
196 Message message;
197 message.index = fetcher_->context().queue_index;
198 VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
199 << "packets";
200 for (int packet_index = 0;
201 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
Austin Schuhd16ef442021-04-25 14:44:42 -0700202 // Pack directly into the mbuffer. This is admittedly a bit painful.
203 const size_t packet_size =
204 PackedMessageSize(fetcher_->context(), packet_index);
205 struct mbuf *mbuffer = mbuf_alloc(packet_size);
Alex Perry5f474f22020-02-01 12:14:24 -0800206
Austin Schuhd16ef442021-04-25 14:44:42 -0700207 {
208 // Wrap a pre-allocated builder around the mbuffer.
209 PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
210 flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
211 flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
212 &fbb, fetcher_->context(), channel_index_, packet_index);
213 fbb.Finish(message_offset);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700214
Austin Schuhd16ef442021-04-25 14:44:42 -0700215 // Now, the flatbuffer is built from the back to the front. So any
216 // extra memory will be at the front. Setup the end and start pointers
217 // on the mbuf.
218 mbuf_set_end(mbuffer, packet_size);
219 mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
220 }
Alex Perry5f474f22020-02-01 12:14:24 -0800221
James Kuszmaul71a81932020-12-15 21:08:01 -0800222 message.data.emplace_back(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700223 std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
James Kuszmaul71a81932020-12-15 21:08:01 -0800224 }
225 message_buffer_.push_back(std::move(message));
226 }
227 for (auto &conn : channels_) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700228 std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first;
James Kuszmaul71a81932020-12-15 21:08:01 -0800229 ChannelInformation *channel_data = &conn.second;
230 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
231 SkipToLastMessage(channel_data);
232 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700233 std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
234 while (buffer) {
235 // TODO(austin): This is a nop so we just buffer forever. Fix this when
236 // we care.
James Kuszmaul71a81932020-12-15 21:08:01 -0800237 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800238 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800239 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800240 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700241
242 rtc_channel->Send(buffer.get());
James Kuszmaul71a81932020-12-15 21:08:01 -0800243 buffer = NextBuffer(channel_data);
244 }
245 }
246 if (buffer_size_ >= 0) {
247 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
248 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800249 }
250 }
251}
252
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700253void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
254 TransferMethod transfer_method) {
255 VLOG(1) << "Adding listener for " << data_channel.get();
James Kuszmaul71a81932020-12-15 21:08:01 -0800256 ChannelInformation info;
257 info.transfer_method = transfer_method;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700258
259 channels_.emplace(data_channel, info);
James Kuszmaul71a81932020-12-15 21:08:01 -0800260}
261
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700262void Subscriber::RemoveListener(
263 std::shared_ptr<ScopedDataChannel> data_channel) {
264 VLOG(1) << "Removing listener for " << data_channel.get();
265 channels_.erase(data_channel);
266}
267
268std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
269 ChannelInformation *channel) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800270 CHECK_NOTNULL(channel);
271 if (message_buffer_.empty()) {
272 return nullptr;
273 }
274 const uint32_t earliest_index = message_buffer_.front().index;
275 const uint32_t latest_index = message_buffer_.back().index;
276 const bool fell_behind = channel->current_queue_index < earliest_index;
277 if (fell_behind) {
278 channel->current_queue_index = earliest_index;
279 channel->next_packet_number = 0;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700280 return message_buffer_.front().data.at(0);
James Kuszmaul71a81932020-12-15 21:08:01 -0800281 }
282 if (channel->current_queue_index > latest_index) {
283 // We are still waiting on the next message to appear; return.
284 return nullptr;
285 }
286 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
287 << "Inconsistent queue indices.";
288 const size_t packets_in_message =
289 message_buffer_[channel->current_queue_index - earliest_index]
290 .data.size();
291 CHECK_LT(0u, packets_in_message);
292 CHECK_LT(channel->next_packet_number, packets_in_message);
293
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700294 std::shared_ptr<struct mbuf> original_data =
295 message_buffer_[channel->current_queue_index - earliest_index].data.at(
James Kuszmaul71a81932020-12-15 21:08:01 -0800296 channel->next_packet_number);
297
298 ++channel->next_packet_number;
299 if (channel->next_packet_number == packets_in_message) {
300 ++channel->current_queue_index;
301 channel->next_packet_number = 0;
302 }
303
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700304 // Trigger a copy of the mbuf without copying the data.
305 return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
306 mem_deref);
James Kuszmaul71a81932020-12-15 21:08:01 -0800307}
308
309void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
310 CHECK_NOTNULL(channel);
311 if (message_buffer_.empty() ||
312 channel->current_queue_index == message_buffer_.back().index) {
313 return;
314 }
315 channel->current_queue_index = message_buffer_.back().index;
316 channel->next_packet_number = 0;
317}
318
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700319ApplicationConnection::ApplicationConnection(
320 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
Alex Perry5f474f22020-02-01 12:14:24 -0800321 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800322 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
323 const EventLoop *event_loop)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700324 : server_(server),
325 sock_(sock),
Alex Perry5f474f22020-02-01 12:14:24 -0800326 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800327 config_headers_(PackBuffer(config.span())),
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700328 event_loop_(event_loop) {
329 connection_.set_on_negotiation_needed([]() {
330 VLOG(1) << "Negotiation needed, not offering so not creating offer.";
331 });
Alex Perryb3b50792020-01-18 16:13:45 -0800332
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700333 connection_.set_on_local_candidate(
334 [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
335 char const *const url) { LocalCandidate(candidate, url); });
336
337 connection_.set_on_data_channel(
338 [this](std::shared_ptr<ScopedDataChannel> channel) {
339 OnDataChannel(channel);
340 });
341
342 connection_.Open();
343}
344
345ApplicationConnection::~ApplicationConnection() {
346 for (auto &it : channels_) {
347 it.second.data_channel->Close();
348 it.second.data_channel = nullptr;
349 }
350
351 // Eh, we are done, tell the channel to shut down. If we didn't, it would
352 // just hang around until the connection closes, which is rather shortly
353 // after.
354 if (channel_) {
355 channel_->Close();
356 }
357}
358
359void ApplicationConnection::OnSdp(const char *sdp) {
360 struct rawrtc_peer_connection_description *remote_description = NULL;
361
362 auto error = rawrtc_peer_connection_description_create(
363 &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
364 if (error) {
365 LOG(WARNING) << "Cannot parse remote description: "
366 << rawrtc_code_to_str(error);
James Kuszmaul48671362020-12-24 13:54:16 -0800367 return;
368 }
Alex Perryb3b50792020-01-18 16:13:45 -0800369
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700370 CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
371 connection_.connection(), remote_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800372
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700373 struct rawrtc_peer_connection_description *local_description;
374 CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
375 connection_.connection()));
376 CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
377 connection_.connection(), local_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800378
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700379 enum rawrtc_sdp_type type;
380 char *local_sdp = nullptr;
381 // Get SDP type & the SDP itself
382 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
383 &type, local_description));
384 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
385 local_description));
James Kuszmaul8d928d02020-12-25 17:47:49 -0800386
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700387 flatbuffers::FlatBufferBuilder fbb;
Alex Perryb3b50792020-01-18 16:13:45 -0800388 flatbuffers::Offset<WebSocketSdp> sdp_fb =
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700389 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800390 flatbuffers::Offset<WebSocketMessage> answer_message =
391 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700392
393 VLOG(1) << aos::FlatbufferToJson(
394 flatbuffers::GetTemporaryPointer(fbb, answer_message));
Alex Perryb3b50792020-01-18 16:13:45 -0800395 fbb.Finish(answer_message);
396
397 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700398 mem_deref(local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800399}
400
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700401void ApplicationConnection::OnIce(const WebSocketIce *ice) {
402 if (!ice->has_candidate()) {
403 return;
404 }
405 uint8_t sdpMLineIndex = ice->sdpMLineIndex();
406
407 struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
408 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
409 &ice_candidate, ice->candidate()->c_str(), ice->sdpMid()->c_str(),
410 &sdpMLineIndex, nullptr));
411
412 rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
413 ice_candidate);
414
415 mem_deref(ice_candidate);
416}
417
418void ApplicationConnection::LocalCandidate(
419 struct rawrtc_peer_connection_ice_candidate *const candidate,
420 char const *const url) {
421 struct rawrtc_ice_candidate *ortc_candidate = nullptr;
422 if (candidate) {
423 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
424 &ortc_candidate, candidate));
425
426 flatbuffers::FlatBufferBuilder fbb;
427 char *sdpp = nullptr;
428 CHECK_RAWRTC(
429 rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
430 char *midp = nullptr;
431 CHECK_RAWRTC(
432 rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
433
434 uint8_t media_line_index;
435 enum rawrtc_code error =
436 rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
437 &media_line_index, candidate);
438
439 flatbuffers::Offset<flatbuffers::String> sdpp_offset =
440 fbb.CreateString(sdpp);
441 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
442 fbb.CreateString(midp);
443
444 WebSocketIce::Builder web_socket_ice_builder(fbb);
445
446 web_socket_ice_builder.add_candidate(sdpp_offset);
447 web_socket_ice_builder.add_sdpMid(sdp_mid_offset);
448
449 if (error == RAWRTC_CODE_SUCCESS) {
450 web_socket_ice_builder.add_sdpMLineIndex(media_line_index);
James Kuszmaul1ec74432020-07-30 20:26:45 -0700451 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700452 flatbuffers::Offset<WebSocketIce> ice_offset =
453 web_socket_ice_builder.Finish();
454
455 flatbuffers::Offset<WebSocketMessage> ice_message =
456 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
457 VLOG(1) << url << ": "
458 << aos::FlatbufferToJson(
459 flatbuffers::GetTemporaryPointer(fbb, ice_message));
460 fbb.Finish(ice_message);
461
462 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
463
464 mem_deref(sdpp);
465 mem_deref(midp);
Alex Perry5f474f22020-02-01 12:14:24 -0800466 }
467}
468
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700469void ApplicationConnection::OnDataChannel(
470 std::shared_ptr<ScopedDataChannel> channel) {
471 if (channel->label() == std::string_view("signalling")) {
472 CHECK(!channel_);
473 channel_ = channel;
474
475 channel_->set_on_message(
476 [this](struct mbuf *const buffer,
477 const enum rawrtc_data_channel_message_flag flags) {
478 HandleSignallingData(buffer, flags);
479 });
480
481 channel_->set_on_open([this]() {
482 for (const auto &header : config_headers_) {
483 channel_->Send(header.buffer());
484 }
485 });
486
487 channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
488
489 // Register an on_close callback which does nothing but keeps channel alive
490 // until it is done. This keeps the memory around until rawrtc can finish
491 // calling the close callback.
492 channel_->set_on_close([channel]() {});
493 } else {
494 channel_->set_on_close([channel]() {});
495 channel->Close();
496 }
497}
498
499void ApplicationConnection::HandleSignallingData(
500 struct mbuf *const
501 buffer, // nullable (in case partial delivery has been requested)
502 const enum rawrtc_data_channel_message_flag /*flags*/) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800503 FlatbufferSpan<SubscriberRequest> message(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700504 {mbuf_buf(buffer), mbuf_get_left(buffer)});
James Kuszmaul71a81932020-12-15 21:08:01 -0800505 if (!message.Verify()) {
506 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
507 return;
508 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700509 VLOG(1) << "Got a subscription message "
James Kuszmaul71a81932020-12-15 21:08:01 -0800510 << aos::FlatbufferToJson(&message.message());
511 if (!message.message().has_channels_to_transfer()) {
512 LOG(ERROR) << "No channels requested for transfer.";
513 return;
514 }
Alex Perry5f474f22020-02-01 12:14:24 -0800515
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700516 // The client each time sends a full list of everything it wants to be
517 // subscribed to. It is our responsibility to remove channels which aren't
518 // in that list and add ones which need to be.
519 //
520 // Start by clearing a tracking bit on each channel. This takes O(number of
521 // open channels), which should be small.
522 //
523 // Then open any new channels. For any we visit which are already open,
524 // don't update those.
525 //
526 // Finally, iterate over the channel list and purge anything which we didn't
527 // touch.
528 for (auto &it : channels_) {
529 it.second.requested = false;
530 }
531 for (auto channel_request : *message.message().channels_to_transfer()) {
532 const Channel *channel = channel_request->channel();
533 if (channel == nullptr) {
534 LOG(ERROR) << "Got unpopulated channel.";
535 continue;
Alex Perry5f474f22020-02-01 12:14:24 -0800536 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700537 const TransferMethod transfer_method = channel_request->method();
538 // Call GetChannel() before comparing the channel name/type to each
539 // subscriber. This allows us to resolve any node or application
540 // specific mappings.
541 const Channel *comparison_channel =
542 configuration::GetChannel(event_loop_->configuration(), channel,
543 event_loop_->name(), event_loop_->node());
544 if (comparison_channel == nullptr) {
545 LOG(ERROR) << "Channel not available: "
546 << configuration::StrippedChannelToString(channel);
547 continue;
548 }
549
550 size_t channel_index = configuration::ChannelIndex(
551 event_loop_->configuration(), comparison_channel);
552
553 auto it = channels_.find(channel_index);
554 if (it == channels_.end()) {
555 std::shared_ptr<ScopedDataChannel> data_channel =
556 std::make_shared<ScopedDataChannel>();
557
558 std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
559
560 data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
561 channel_index]() {
562 std::shared_ptr<ScopedDataChannel> data_channel =
563 data_channel_weak_ptr.lock();
564 CHECK(data_channel) << ": Subscriber got destroyed before we started.";
565 // Weak ptr inside the subscriber so we don't have a circular
566 // reference. AddListener will close it.
567 subscribers_[channel_index]->AddListener(data_channel, transfer_method);
568 });
569
570 Subscriber *subscriber = subscribers_[channel_index].get();
571 data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
572 std::shared_ptr<ScopedDataChannel> data_channel =
573 data_channel_weak_ptr.lock();
574 CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
575 subscriber->RemoveListener(data_channel);
576 });
577
578 data_channel->Open(
579 connection_.connection(),
580 absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
581
582 auto pair = channels_.insert({channel_index, {data_channel, true}});
583 it = pair.first;
584 }
585
586 it->second.requested = true;
587
588 VLOG(1) << "Subscribe to: " << channel->type()->str();
589 }
590
591 for (auto &it : channels_) {
592 if (!it.second.requested) {
593 it.second.data_channel->Close();
594 it.second.data_channel = nullptr;
Alex Perry5f474f22020-02-01 12:14:24 -0800595 }
596 }
Alex Perryb3b50792020-01-18 16:13:45 -0800597}
598
599} // namespace web_proxy
600} // namespace aos