blob: fdd8f1e1d303a238bf08992433537f1004493150 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08007#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -08008#include "glog/logging.h"
James Kuszmaul48671362020-12-24 13:54:16 -08009#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080010
Austin Schuh52e5e3a2021-04-24 22:30:02 -070011extern "C" {
12#include <rawrtc.h>
13
14#define DEBUG_LEVEL 7
15#define DEBUG_MODULE "web-proxy"
16#include <re_dbg.h>
17struct list *tmrl_get(void);
18}
19
Austin Schuhbbc3df32021-10-29 23:06:39 -070020DEFINE_int32(proxy_port, 1180, "Port to use for the web proxy server.");
James Kuszmaula5822682021-12-23 18:39:28 -080021DEFINE_int32(pre_send_messages, 10000,
22 "Number of messages / queue to send to a client before waiting on "
23 "confirmation that the initial message was received. If set to "
24 "-1, will not throttle messages at all. This prevents a situation "
25 "where, when run on localhost, the large number of WebRTC packets "
26 "can overwhelm the browser and crash the webpage.");
James Kuszmaul1e95bed2021-01-09 21:02:49 -080027
Alex Perryb3b50792020-01-18 16:13:45 -080028namespace aos {
29namespace web_proxy {
James Kuszmaul7ad91522020-09-01 19:15:35 -070030WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul71a81932020-12-15 21:08:01 -080031 aos::EventLoop *event_loop, int buffer_size)
James Kuszmaul7ad91522020-09-01 19:15:35 -070032 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080033 config_(aos::CopyFlatBuffer(event_loop->configuration())),
34 event_loop_(event_loop) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070035 if (VLOG_IS_ON(2)) {
36 dbg_init(DBG_DEBUG, DBG_ALL);
37 }
38 CHECK_RAWRTC(rawrtc_init(true));
39
James Kuszmaul48671362020-12-24 13:54:16 -080040 // We need to reference findEmbeddedContent() to make the linker happy...
41 findEmbeddedContent("");
Austin Schuh52e5e3a2021-04-24 22:30:02 -070042 const aos::Node *self = event_loop_->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070043
Austin Schuh52e5e3a2021-04-24 22:30:02 -070044 subscribers_.reserve(event_loop_->configuration()->channels()->size());
45 for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
46 auto channel = event_loop_->configuration()->channels()->Get(i);
James Kuszmaul7ad91522020-09-01 19:15:35 -070047 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070048 auto fetcher = event_loop_->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080049 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
50 std::move(fetcher), i, buffer_size));
Austin Schuh52e5e3a2021-04-24 22:30:02 -070051 } else {
52 subscribers_.emplace_back(nullptr);
James Kuszmaul7ad91522020-09-01 19:15:35 -070053 }
54 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -070055 TimerHandler *const timer = event_loop_->AddTimer([this]() {
James Kuszmaul7ad91522020-09-01 19:15:35 -070056 for (auto &subscriber : subscribers_) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070057 if (subscriber) subscriber->RunIteration();
James Kuszmaul7ad91522020-09-01 19:15:35 -070058 }
59 });
60
Austin Schuh52e5e3a2021-04-24 22:30:02 -070061 event_loop_->OnRun([this, timer]() {
62 timer->Setup(event_loop_->monotonic_now(), std::chrono::milliseconds(100));
James Kuszmaul7ad91522020-09-01 19:15:35 -070063 });
64}
Alex Perryb3b50792020-01-18 16:13:45 -080065
66void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070067 std::unique_ptr<ApplicationConnection> connection =
68 std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
69 config_, event_loop_);
70
71 connections_.insert({sock, std::move(connection)});
Alex Perryb3b50792020-01-18 16:13:45 -080072}
73
74void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
75 size_t size) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070076 const FlatbufferSpan<WebSocketMessage> message({data, size});
77 if (!message.Verify()) {
78 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
79 return;
80 }
81 VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
82 switch (message.message().payload_type()) {
83 case Payload::WebSocketSdp: {
84 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
85 if (offer->type() != SdpType::OFFER) {
86 LOG(WARNING) << "Got the wrong sdp type from client";
87 break;
88 }
89 const flatbuffers::String *sdp = offer->payload();
90 connections_[sock]->OnSdp(sdp->c_str());
91 break;
92 }
93 case Payload::WebSocketIce: {
94 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
95 connections_[sock]->OnIce(ice);
96 break;
97 }
Brian Silverman225c5072021-11-17 19:56:31 -080098 default: {
99 break;
100 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700101 }
Alex Perryb3b50792020-01-18 16:13:45 -0800102}
103
104void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
105 connections_.erase(sock);
106}
107
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700108// Global epoll pointer
109static aos::internal::EPoll *global_epoll = nullptr;
110
111static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
112 if (flags & 0x1) {
113 global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
114 }
115 if (flags & 0x2) {
116 global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
117 }
118 if (flags & 0x4) {
119 global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
120 }
121 return 0;
122}
123
124static void ReFdClose(int fd) {
125 CHECK(global_epoll != nullptr);
126 global_epoll->DeleteFd(fd);
127}
128
James Kuszmaul71a81932020-12-15 21:08:01 -0800129WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
130 : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
131
132WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
133 : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
134
135WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
136 int buffer_size)
137 : epoll_(epoll),
138 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
139 ::seasocks::Logger::Level::Info)),
140 websocket_handler_(
141 new WebsocketHandler(&server_, event_loop, buffer_size)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700142 CHECK(!global_epoll);
143 global_epoll = epoll;
144
145 re_fd_set_listen_callback(&ReFdListen);
146 re_fd_set_close_callback(&ReFdClose);
147
148 epoll->BeforeWait([]() {
149 const uint64_t to = tmr_next_timeout(tmrl_get());
150 if (to != 0) {
Austin Schuh0c8dd362021-10-30 10:23:25 -0700151 VLOG(3) << "Next timeout " << to;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700152 }
153 // Note: this only works because we are spinning on it...
154 // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
155 // for handling tmr.
156 tmr_poll(tmrl_get());
157 });
158
James Kuszmaul71a81932020-12-15 21:08:01 -0800159 server_.addWebSocketHandler("/ws", websocket_handler_);
James Kuszmaul1e95bed2021-01-09 21:02:49 -0800160 CHECK(server_.startListening(FLAGS_proxy_port));
James Kuszmaul71a81932020-12-15 21:08:01 -0800161
162 epoll->OnReadable(server_.fd(), [this]() {
163 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
164 });
165
166 if (&internal_epoll_ == epoll) {
167 TimerHandler *const timer = event_loop->AddTimer([this]() {
168 // Run the epoll poller until there are no more events (if we are being
169 // backed by a shm event loop, there won't be anything registered to
170 // internal_epoll_ and this will just return false).
171 // We just deal with clearing all the epoll events using a simulated
172 // timer. This does mean that we will spin rather than actually sleeping
173 // in any coherent manner, which will be particularly noticeable when past
174 // the end of processing other events.
175 while (internal_epoll_.Poll(false)) {
176 continue;
177 }
178 });
179
180 event_loop->OnRun([timer, event_loop]() {
181 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
182 });
183 }
184}
185
186WebProxy::~WebProxy() {
187 epoll_->DeleteFd(server_.fd());
188 server_.terminate();
189 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700190 CHECK(global_epoll == epoll_);
191 global_epoll = nullptr;
James Kuszmaul71a81932020-12-15 21:08:01 -0800192}
193
Alex Perry5f474f22020-02-01 12:14:24 -0800194void Subscriber::RunIteration() {
James Kuszmaul71a81932020-12-15 21:08:01 -0800195 if (channels_.empty() && buffer_size_ == 0) {
Alex Perry5f474f22020-02-01 12:14:24 -0800196 return;
197 }
198
James Kuszmaul71a81932020-12-15 21:08:01 -0800199 while (fetcher_->FetchNext()) {
200 // If we aren't building up a buffer, short-circuit the FetchNext().
201 if (buffer_size_ == 0) {
202 fetcher_->Fetch();
203 }
204 Message message;
205 message.index = fetcher_->context().queue_index;
206 VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
207 << "packets";
208 for (int packet_index = 0;
209 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
Austin Schuhd16ef442021-04-25 14:44:42 -0700210 // Pack directly into the mbuffer. This is admittedly a bit painful.
211 const size_t packet_size =
212 PackedMessageSize(fetcher_->context(), packet_index);
213 struct mbuf *mbuffer = mbuf_alloc(packet_size);
Alex Perry5f474f22020-02-01 12:14:24 -0800214
Austin Schuhd16ef442021-04-25 14:44:42 -0700215 {
216 // Wrap a pre-allocated builder around the mbuffer.
217 PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
218 flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
219 flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
220 &fbb, fetcher_->context(), channel_index_, packet_index);
221 fbb.Finish(message_offset);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700222
Austin Schuhd16ef442021-04-25 14:44:42 -0700223 // Now, the flatbuffer is built from the back to the front. So any
224 // extra memory will be at the front. Setup the end and start pointers
225 // on the mbuf.
226 mbuf_set_end(mbuffer, packet_size);
227 mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
228 }
Alex Perry5f474f22020-02-01 12:14:24 -0800229
James Kuszmaul71a81932020-12-15 21:08:01 -0800230 message.data.emplace_back(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700231 std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
James Kuszmaul71a81932020-12-15 21:08:01 -0800232 }
233 message_buffer_.push_back(std::move(message));
James Kuszmaul45139e62021-09-11 11:41:03 -0700234 // If we aren't keeping a buffer, then we should only do one iteration of
235 // the while loop--otherwise, if additional messages arrive between the
236 // first FetchNext() and the second iteration then we can end up behaving
237 // poorly (since we do a Fetch() when buffer_size_ == 0).
238 if (buffer_size_ == 0) {
239 break;
240 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800241 }
242 for (auto &conn : channels_) {
James Kuszmaula5822682021-12-23 18:39:28 -0800243 std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first.lock();
244 CHECK(rtc_channel) << "data_channel was destroyed too early.";
James Kuszmaul71a81932020-12-15 21:08:01 -0800245 ChannelInformation *channel_data = &conn.second;
246 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
247 SkipToLastMessage(channel_data);
248 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700249 std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
250 while (buffer) {
251 // TODO(austin): This is a nop so we just buffer forever. Fix this when
252 // we care.
James Kuszmaul71a81932020-12-15 21:08:01 -0800253 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800254 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800255 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800256 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700257
258 rtc_channel->Send(buffer.get());
James Kuszmaul71a81932020-12-15 21:08:01 -0800259 buffer = NextBuffer(channel_data);
260 }
261 }
262 if (buffer_size_ >= 0) {
263 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
264 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800265 }
266 }
267}
268
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700269void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
270 TransferMethod transfer_method) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800271 ChannelInformation info;
272 info.transfer_method = transfer_method;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700273
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700274 // If we aren't keeping a buffer and there are no existing listeners, call
275 // Fetch() to avoid falling behind on future calls to FetchNext().
276 if (channels_.empty() && buffer_size_ == 0) {
277 fetcher_->Fetch();
278 }
279
James Kuszmaula5822682021-12-23 18:39:28 -0800280 channels_.emplace_back(std::make_pair(data_channel, info));
281
282 data_channel->set_on_message(
283 [this, index = channels_.size() - 1](
284 struct mbuf *const buffer,
285 const enum rawrtc_data_channel_message_flag /*flags*/) {
286 FlatbufferSpan<ChannelState> message(
287 {mbuf_buf(buffer), mbuf_get_left(buffer)});
288 if (!message.Verify()) {
289 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
290 return;
291 }
292
293 channels_[index].second.reported_queue_index =
294 message.message().queue_index();
295 channels_[index].second.reported_packet_index =
296 message.message().packet_index();
297 });
James Kuszmaul71a81932020-12-15 21:08:01 -0800298}
299
James Kuszmaula5822682021-12-23 18:39:28 -0800300void Subscriber::RemoveListener(std::shared_ptr<ScopedDataChannel> data_channel) {
301 channels_.erase(
302 std::remove_if(
303 channels_.begin(), channels_.end(),
304 [data_channel](const std::pair<std::weak_ptr<ScopedDataChannel>,
305 ChannelInformation> &channel) {
306 return channel.first.lock().get() == data_channel.get();
307 }),
308 channels_.end());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700309}
310
311std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
312 ChannelInformation *channel) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800313 CHECK_NOTNULL(channel);
314 if (message_buffer_.empty()) {
315 return nullptr;
316 }
317 const uint32_t earliest_index = message_buffer_.front().index;
318 const uint32_t latest_index = message_buffer_.back().index;
319 const bool fell_behind = channel->current_queue_index < earliest_index;
320 if (fell_behind) {
321 channel->current_queue_index = earliest_index;
322 channel->next_packet_number = 0;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700323 return message_buffer_.front().data.at(0);
James Kuszmaul71a81932020-12-15 21:08:01 -0800324 }
James Kuszmaula5822682021-12-23 18:39:28 -0800325 // TODO(james): Handle queue index wrapping when >2^32 messages are sent on a
326 // channel.
James Kuszmaul71a81932020-12-15 21:08:01 -0800327 if (channel->current_queue_index > latest_index) {
328 // We are still waiting on the next message to appear; return.
329 return nullptr;
330 }
James Kuszmaula5822682021-12-23 18:39:28 -0800331 if (FLAGS_pre_send_messages > 0) {
332 // Don't buffer up an excessive number of messages to the client.
333 // This currently ignores the packet index (and really, any concept of
334 // message size), but the main goal is just to avoid locking up the client
335 // browser, not to be ultra precise about anything. It's also not clear that
336 // message *size* is necessarily even the determining factor in causing
337 // issues.
338 if (channel->reported_queue_index + FLAGS_pre_send_messages <
339 channel->current_queue_index) {
340 return nullptr;
341 }
342 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800343 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
344 << "Inconsistent queue indices.";
345 const size_t packets_in_message =
346 message_buffer_[channel->current_queue_index - earliest_index]
347 .data.size();
348 CHECK_LT(0u, packets_in_message);
349 CHECK_LT(channel->next_packet_number, packets_in_message);
350
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700351 std::shared_ptr<struct mbuf> original_data =
352 message_buffer_[channel->current_queue_index - earliest_index].data.at(
James Kuszmaul71a81932020-12-15 21:08:01 -0800353 channel->next_packet_number);
354
355 ++channel->next_packet_number;
356 if (channel->next_packet_number == packets_in_message) {
357 ++channel->current_queue_index;
358 channel->next_packet_number = 0;
359 }
360
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700361 // Trigger a copy of the mbuf without copying the data.
362 return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
363 mem_deref);
James Kuszmaul71a81932020-12-15 21:08:01 -0800364}
365
366void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
367 CHECK_NOTNULL(channel);
368 if (message_buffer_.empty() ||
369 channel->current_queue_index == message_buffer_.back().index) {
370 return;
371 }
372 channel->current_queue_index = message_buffer_.back().index;
373 channel->next_packet_number = 0;
374}
375
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700376ApplicationConnection::ApplicationConnection(
377 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
Alex Perry5f474f22020-02-01 12:14:24 -0800378 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800379 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
380 const EventLoop *event_loop)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700381 : server_(server),
382 sock_(sock),
Alex Perry5f474f22020-02-01 12:14:24 -0800383 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800384 config_headers_(PackBuffer(config.span())),
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700385 event_loop_(event_loop) {
386 connection_.set_on_negotiation_needed([]() {
387 VLOG(1) << "Negotiation needed, not offering so not creating offer.";
388 });
Alex Perryb3b50792020-01-18 16:13:45 -0800389
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700390 connection_.set_on_local_candidate(
391 [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
392 char const *const url) { LocalCandidate(candidate, url); });
393
394 connection_.set_on_data_channel(
395 [this](std::shared_ptr<ScopedDataChannel> channel) {
396 OnDataChannel(channel);
397 });
398
399 connection_.Open();
400}
401
402ApplicationConnection::~ApplicationConnection() {
403 for (auto &it : channels_) {
404 it.second.data_channel->Close();
405 it.second.data_channel = nullptr;
406 }
407
408 // Eh, we are done, tell the channel to shut down. If we didn't, it would
409 // just hang around until the connection closes, which is rather shortly
410 // after.
411 if (channel_) {
412 channel_->Close();
413 }
414}
415
416void ApplicationConnection::OnSdp(const char *sdp) {
417 struct rawrtc_peer_connection_description *remote_description = NULL;
418
419 auto error = rawrtc_peer_connection_description_create(
420 &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
421 if (error) {
422 LOG(WARNING) << "Cannot parse remote description: "
423 << rawrtc_code_to_str(error);
James Kuszmaul48671362020-12-24 13:54:16 -0800424 return;
425 }
Alex Perryb3b50792020-01-18 16:13:45 -0800426
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700427 CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
428 connection_.connection(), remote_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800429
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700430 struct rawrtc_peer_connection_description *local_description;
431 CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
432 connection_.connection()));
433 CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
434 connection_.connection(), local_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800435
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700436 enum rawrtc_sdp_type type;
437 char *local_sdp = nullptr;
438 // Get SDP type & the SDP itself
439 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
440 &type, local_description));
441 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
442 local_description));
James Kuszmaul8d928d02020-12-25 17:47:49 -0800443
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700444 flatbuffers::FlatBufferBuilder fbb;
Alex Perryb3b50792020-01-18 16:13:45 -0800445 flatbuffers::Offset<WebSocketSdp> sdp_fb =
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700446 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800447 flatbuffers::Offset<WebSocketMessage> answer_message =
448 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700449
450 VLOG(1) << aos::FlatbufferToJson(
451 flatbuffers::GetTemporaryPointer(fbb, answer_message));
Alex Perryb3b50792020-01-18 16:13:45 -0800452 fbb.Finish(answer_message);
453
454 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700455 mem_deref(local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800456}
457
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700458void ApplicationConnection::OnIce(const WebSocketIce *ice) {
459 if (!ice->has_candidate()) {
460 return;
461 }
Brian Silverman225c5072021-11-17 19:56:31 -0800462 uint8_t sdp_m_line_index = ice->sdp_m_line_index();
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700463
464 struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
465 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
Brian Silverman225c5072021-11-17 19:56:31 -0800466 &ice_candidate, ice->candidate()->c_str(), ice->sdp_mid()->c_str(),
467 &sdp_m_line_index, nullptr));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700468
469 rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
470 ice_candidate);
471
472 mem_deref(ice_candidate);
473}
474
475void ApplicationConnection::LocalCandidate(
476 struct rawrtc_peer_connection_ice_candidate *const candidate,
477 char const *const url) {
478 struct rawrtc_ice_candidate *ortc_candidate = nullptr;
479 if (candidate) {
480 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
481 &ortc_candidate, candidate));
482
483 flatbuffers::FlatBufferBuilder fbb;
484 char *sdpp = nullptr;
485 CHECK_RAWRTC(
486 rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
487 char *midp = nullptr;
488 CHECK_RAWRTC(
489 rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
490
491 uint8_t media_line_index;
492 enum rawrtc_code error =
493 rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
494 &media_line_index, candidate);
495
496 flatbuffers::Offset<flatbuffers::String> sdpp_offset =
497 fbb.CreateString(sdpp);
498 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
499 fbb.CreateString(midp);
500
501 WebSocketIce::Builder web_socket_ice_builder(fbb);
502
503 web_socket_ice_builder.add_candidate(sdpp_offset);
Brian Silverman225c5072021-11-17 19:56:31 -0800504 web_socket_ice_builder.add_sdp_mid(sdp_mid_offset);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700505
506 if (error == RAWRTC_CODE_SUCCESS) {
Brian Silverman225c5072021-11-17 19:56:31 -0800507 web_socket_ice_builder.add_sdp_m_line_index(media_line_index);
James Kuszmaul1ec74432020-07-30 20:26:45 -0700508 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700509 flatbuffers::Offset<WebSocketIce> ice_offset =
510 web_socket_ice_builder.Finish();
511
512 flatbuffers::Offset<WebSocketMessage> ice_message =
513 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
514 VLOG(1) << url << ": "
515 << aos::FlatbufferToJson(
516 flatbuffers::GetTemporaryPointer(fbb, ice_message));
517 fbb.Finish(ice_message);
518
519 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
520
521 mem_deref(sdpp);
522 mem_deref(midp);
Alex Perry5f474f22020-02-01 12:14:24 -0800523 }
524}
525
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700526void ApplicationConnection::OnDataChannel(
527 std::shared_ptr<ScopedDataChannel> channel) {
528 if (channel->label() == std::string_view("signalling")) {
529 CHECK(!channel_);
530 channel_ = channel;
531
532 channel_->set_on_message(
533 [this](struct mbuf *const buffer,
534 const enum rawrtc_data_channel_message_flag flags) {
535 HandleSignallingData(buffer, flags);
536 });
537
538 channel_->set_on_open([this]() {
539 for (const auto &header : config_headers_) {
540 channel_->Send(header.buffer());
541 }
542 });
543
544 channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
545
546 // Register an on_close callback which does nothing but keeps channel alive
547 // until it is done. This keeps the memory around until rawrtc can finish
548 // calling the close callback.
549 channel_->set_on_close([channel]() {});
550 } else {
551 channel_->set_on_close([channel]() {});
552 channel->Close();
553 }
554}
555
556void ApplicationConnection::HandleSignallingData(
557 struct mbuf *const
558 buffer, // nullable (in case partial delivery has been requested)
559 const enum rawrtc_data_channel_message_flag /*flags*/) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800560 FlatbufferSpan<SubscriberRequest> message(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700561 {mbuf_buf(buffer), mbuf_get_left(buffer)});
James Kuszmaul71a81932020-12-15 21:08:01 -0800562 if (!message.Verify()) {
563 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
564 return;
565 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700566 VLOG(1) << "Got a subscription message "
James Kuszmaul71a81932020-12-15 21:08:01 -0800567 << aos::FlatbufferToJson(&message.message());
568 if (!message.message().has_channels_to_transfer()) {
569 LOG(ERROR) << "No channels requested for transfer.";
570 return;
571 }
Alex Perry5f474f22020-02-01 12:14:24 -0800572
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700573 // The client each time sends a full list of everything it wants to be
574 // subscribed to. It is our responsibility to remove channels which aren't
575 // in that list and add ones which need to be.
576 //
577 // Start by clearing a tracking bit on each channel. This takes O(number of
578 // open channels), which should be small.
579 //
580 // Then open any new channels. For any we visit which are already open,
581 // don't update those.
582 //
583 // Finally, iterate over the channel list and purge anything which we didn't
584 // touch.
585 for (auto &it : channels_) {
586 it.second.requested = false;
587 }
588 for (auto channel_request : *message.message().channels_to_transfer()) {
589 const Channel *channel = channel_request->channel();
590 if (channel == nullptr) {
591 LOG(ERROR) << "Got unpopulated channel.";
592 continue;
Alex Perry5f474f22020-02-01 12:14:24 -0800593 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700594 const TransferMethod transfer_method = channel_request->method();
595 // Call GetChannel() before comparing the channel name/type to each
596 // subscriber. This allows us to resolve any node or application
597 // specific mappings.
598 const Channel *comparison_channel =
599 configuration::GetChannel(event_loop_->configuration(), channel,
600 event_loop_->name(), event_loop_->node());
601 if (comparison_channel == nullptr) {
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700602 LOG(ERROR) << "Channel does not exist: "
603 << configuration::StrippedChannelToString(channel);
604 continue;
605 }
606 if (!configuration::ChannelIsReadableOnNode(comparison_channel,
607 event_loop_->node())) {
608 LOG(ERROR) << "Channel not available on node "
609 << event_loop_->node()->name()->string_view() << ": "
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700610 << configuration::StrippedChannelToString(channel);
611 continue;
612 }
613
614 size_t channel_index = configuration::ChannelIndex(
615 event_loop_->configuration(), comparison_channel);
616
617 auto it = channels_.find(channel_index);
618 if (it == channels_.end()) {
619 std::shared_ptr<ScopedDataChannel> data_channel =
James Kuszmaula5822682021-12-23 18:39:28 -0800620 ScopedDataChannel::MakeDataChannel();
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700621
622 std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
623
624 data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
625 channel_index]() {
626 std::shared_ptr<ScopedDataChannel> data_channel =
627 data_channel_weak_ptr.lock();
628 CHECK(data_channel) << ": Subscriber got destroyed before we started.";
James Kuszmaula5822682021-12-23 18:39:28 -0800629 // Raw pointer inside the subscriber so we don't have a circular
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700630 // reference. AddListener will close it.
James Kuszmaula5822682021-12-23 18:39:28 -0800631 subscribers_[channel_index]->AddListener(data_channel,
632 transfer_method);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700633 });
634
635 Subscriber *subscriber = subscribers_[channel_index].get();
636 data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
637 std::shared_ptr<ScopedDataChannel> data_channel =
638 data_channel_weak_ptr.lock();
639 CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
640 subscriber->RemoveListener(data_channel);
641 });
642
643 data_channel->Open(
644 connection_.connection(),
645 absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
646
647 auto pair = channels_.insert({channel_index, {data_channel, true}});
648 it = pair.first;
649 }
650
651 it->second.requested = true;
652
653 VLOG(1) << "Subscribe to: " << channel->type()->str();
654 }
655
656 for (auto &it : channels_) {
657 if (!it.second.requested) {
658 it.second.data_channel->Close();
659 it.second.data_channel = nullptr;
Alex Perry5f474f22020-02-01 12:14:24 -0800660 }
661 }
Alex Perryb3b50792020-01-18 16:13:45 -0800662}
663
664} // namespace web_proxy
665} // namespace aos