blob: d384f72c3a2eca54a656b6d280e0e99b32a3cbeb [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08007#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -08008#include "glog/logging.h"
James Kuszmaul48671362020-12-24 13:54:16 -08009#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080010
Austin Schuh52e5e3a2021-04-24 22:30:02 -070011extern "C" {
12#include <rawrtc.h>
13
14#define DEBUG_LEVEL 7
15#define DEBUG_MODULE "web-proxy"
16#include <re_dbg.h>
17struct list *tmrl_get(void);
18}
19
Austin Schuhbbc3df32021-10-29 23:06:39 -070020DEFINE_int32(proxy_port, 1180, "Port to use for the web proxy server.");
James Kuszmaula5822682021-12-23 18:39:28 -080021DEFINE_int32(pre_send_messages, 10000,
22 "Number of messages / queue to send to a client before waiting on "
23 "confirmation that the initial message was received. If set to "
24 "-1, will not throttle messages at all. This prevents a situation "
25 "where, when run on localhost, the large number of WebRTC packets "
26 "can overwhelm the browser and crash the webpage.");
James Kuszmaul1e95bed2021-01-09 21:02:49 -080027
Alex Perryb3b50792020-01-18 16:13:45 -080028namespace aos {
29namespace web_proxy {
James Kuszmaul7ad91522020-09-01 19:15:35 -070030WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul1a29c082022-02-03 14:02:47 -080031 aos::EventLoop *event_loop,
32 StoreHistory store_history,
33 int per_channel_buffer_size_bytes)
James Kuszmaul7ad91522020-09-01 19:15:35 -070034 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080035 config_(aos::CopyFlatBuffer(event_loop->configuration())),
36 event_loop_(event_loop) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070037 if (VLOG_IS_ON(2)) {
38 dbg_init(DBG_DEBUG, DBG_ALL);
39 }
40 CHECK_RAWRTC(rawrtc_init(true));
41
James Kuszmaul48671362020-12-24 13:54:16 -080042 // We need to reference findEmbeddedContent() to make the linker happy...
43 findEmbeddedContent("");
Austin Schuh52e5e3a2021-04-24 22:30:02 -070044 const aos::Node *self = event_loop_->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070045
Austin Schuh52e5e3a2021-04-24 22:30:02 -070046 subscribers_.reserve(event_loop_->configuration()->channels()->size());
47 for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
48 auto channel = event_loop_->configuration()->channels()->Get(i);
James Kuszmaul7ad91522020-09-01 19:15:35 -070049 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070050 auto fetcher = event_loop_->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080051 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
James Kuszmaul1a29c082022-02-03 14:02:47 -080052 std::move(fetcher), i, store_history,
53 per_channel_buffer_size_bytes < 0
54 ? -1
55 : per_channel_buffer_size_bytes / channel->max_size()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -070056 } else {
57 subscribers_.emplace_back(nullptr);
James Kuszmaul7ad91522020-09-01 19:15:35 -070058 }
59 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -070060 TimerHandler *const timer = event_loop_->AddTimer([this]() {
James Kuszmaul7ad91522020-09-01 19:15:35 -070061 for (auto &subscriber : subscribers_) {
James Kuszmaul147b4c12022-07-13 20:35:27 -070062 if (subscriber) subscriber->RunIteration(recording_);
James Kuszmaul7ad91522020-09-01 19:15:35 -070063 }
64 });
65
Austin Schuh52e5e3a2021-04-24 22:30:02 -070066 event_loop_->OnRun([this, timer]() {
67 timer->Setup(event_loop_->monotonic_now(), std::chrono::milliseconds(100));
James Kuszmaul7ad91522020-09-01 19:15:35 -070068 });
69}
Alex Perryb3b50792020-01-18 16:13:45 -080070
71void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070072 std::unique_ptr<ApplicationConnection> connection =
73 std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
74 config_, event_loop_);
75
76 connections_.insert({sock, std::move(connection)});
Alex Perryb3b50792020-01-18 16:13:45 -080077}
78
79void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
80 size_t size) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070081 const FlatbufferSpan<WebSocketMessage> message({data, size});
82 if (!message.Verify()) {
83 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
84 return;
85 }
86 VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
87 switch (message.message().payload_type()) {
88 case Payload::WebSocketSdp: {
89 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
90 if (offer->type() != SdpType::OFFER) {
91 LOG(WARNING) << "Got the wrong sdp type from client";
92 break;
93 }
94 const flatbuffers::String *sdp = offer->payload();
95 connections_[sock]->OnSdp(sdp->c_str());
96 break;
97 }
98 case Payload::WebSocketIce: {
99 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
100 connections_[sock]->OnIce(ice);
101 break;
102 }
Brian Silverman225c5072021-11-17 19:56:31 -0800103 default: {
104 break;
105 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700106 }
Alex Perryb3b50792020-01-18 16:13:45 -0800107}
108
109void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
110 connections_.erase(sock);
111}
112
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700113// Global epoll pointer
114static aos::internal::EPoll *global_epoll = nullptr;
115
116static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
117 if (flags & 0x1) {
118 global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
119 }
120 if (flags & 0x2) {
121 global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
122 }
123 if (flags & 0x4) {
124 global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
125 }
126 return 0;
127}
128
129static void ReFdClose(int fd) {
130 CHECK(global_epoll != nullptr);
131 global_epoll->DeleteFd(fd);
132}
133
James Kuszmaul1a29c082022-02-03 14:02:47 -0800134WebProxy::WebProxy(aos::EventLoop *event_loop, StoreHistory store_history,
135 int per_channel_buffer_size_bytes)
136 : WebProxy(event_loop, &internal_epoll_, store_history,
137 per_channel_buffer_size_bytes) {}
James Kuszmaul71a81932020-12-15 21:08:01 -0800138
James Kuszmaul1a29c082022-02-03 14:02:47 -0800139WebProxy::WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
140 int per_channel_buffer_size_bytes)
141 : WebProxy(event_loop, event_loop->epoll(), store_history,
142 per_channel_buffer_size_bytes) {}
James Kuszmaul71a81932020-12-15 21:08:01 -0800143
144WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
James Kuszmaul1a29c082022-02-03 14:02:47 -0800145 StoreHistory store_history,
146 int per_channel_buffer_size_bytes)
James Kuszmaul71a81932020-12-15 21:08:01 -0800147 : epoll_(epoll),
148 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
149 ::seasocks::Logger::Level::Info)),
James Kuszmaul1a29c082022-02-03 14:02:47 -0800150 websocket_handler_(new WebsocketHandler(
151 &server_, event_loop, store_history, per_channel_buffer_size_bytes)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700152 CHECK(!global_epoll);
153 global_epoll = epoll;
154
155 re_fd_set_listen_callback(&ReFdListen);
156 re_fd_set_close_callback(&ReFdClose);
157
158 epoll->BeforeWait([]() {
159 const uint64_t to = tmr_next_timeout(tmrl_get());
160 if (to != 0) {
Austin Schuh0c8dd362021-10-30 10:23:25 -0700161 VLOG(3) << "Next timeout " << to;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700162 }
163 // Note: this only works because we are spinning on it...
164 // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
165 // for handling tmr.
166 tmr_poll(tmrl_get());
167 });
168
James Kuszmaul71a81932020-12-15 21:08:01 -0800169 server_.addWebSocketHandler("/ws", websocket_handler_);
James Kuszmaul1e95bed2021-01-09 21:02:49 -0800170 CHECK(server_.startListening(FLAGS_proxy_port));
James Kuszmaul71a81932020-12-15 21:08:01 -0800171
172 epoll->OnReadable(server_.fd(), [this]() {
173 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
174 });
175
176 if (&internal_epoll_ == epoll) {
177 TimerHandler *const timer = event_loop->AddTimer([this]() {
178 // Run the epoll poller until there are no more events (if we are being
179 // backed by a shm event loop, there won't be anything registered to
180 // internal_epoll_ and this will just return false).
181 // We just deal with clearing all the epoll events using a simulated
182 // timer. This does mean that we will spin rather than actually sleeping
183 // in any coherent manner, which will be particularly noticeable when past
184 // the end of processing other events.
185 while (internal_epoll_.Poll(false)) {
186 continue;
187 }
188 });
189
190 event_loop->OnRun([timer, event_loop]() {
191 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
192 });
193 }
194}
195
196WebProxy::~WebProxy() {
197 epoll_->DeleteFd(server_.fd());
198 server_.terminate();
199 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700200 CHECK(global_epoll == epoll_);
201 global_epoll = nullptr;
James Kuszmaul71a81932020-12-15 21:08:01 -0800202}
203
James Kuszmaul147b4c12022-07-13 20:35:27 -0700204void WebProxy::StopRecording() { websocket_handler_->StopRecording(); }
Alex Perry5f474f22020-02-01 12:14:24 -0800205
James Kuszmaul147b4c12022-07-13 20:35:27 -0700206void Subscriber::RunIteration(bool fetch_new) {
207 if (fetch_new) {
208 if (channels_.empty() && (buffer_size_ == 0 || !store_history_)) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800209 fetcher_->Fetch();
James Kuszmaul147b4c12022-07-13 20:35:27 -0700210 message_buffer_.clear();
211 return;
James Kuszmaul71a81932020-12-15 21:08:01 -0800212 }
Alex Perry5f474f22020-02-01 12:14:24 -0800213
James Kuszmaul147b4c12022-07-13 20:35:27 -0700214 while (fetcher_->FetchNext()) {
215 // If we aren't building up a buffer, short-circuit the FetchNext().
216 if (buffer_size_ == 0) {
217 fetcher_->Fetch();
Austin Schuhd16ef442021-04-25 14:44:42 -0700218 }
James Kuszmaul147b4c12022-07-13 20:35:27 -0700219 Message message;
220 message.index = fetcher_->context().queue_index;
221 VLOG(2) << "Packing a message with "
222 << GetPacketCount(fetcher_->context()) << "packets";
223 for (int packet_index = 0;
224 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
225 // Pack directly into the mbuffer. This is admittedly a bit painful.
226 const size_t packet_size =
227 PackedMessageSize(fetcher_->context(), packet_index);
228 struct mbuf *mbuffer = mbuf_alloc(packet_size);
Alex Perry5f474f22020-02-01 12:14:24 -0800229
James Kuszmaul147b4c12022-07-13 20:35:27 -0700230 {
231 // Wrap a pre-allocated builder around the mbuffer.
232 PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
233 flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
234 flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
235 &fbb, fetcher_->context(), channel_index_, packet_index);
236 fbb.Finish(message_offset);
237
238 // Now, the flatbuffer is built from the back to the front. So any
239 // extra memory will be at the front. Setup the end and start
240 // pointers on the mbuf.
241 mbuf_set_end(mbuffer, packet_size);
242 mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
243 }
244
245 message.data.emplace_back(
246 std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
247 }
248 message_buffer_.push_back(std::move(message));
249 // If we aren't keeping a buffer, then we should only do one iteration of
250 // the while loop--otherwise, if additional messages arrive between the
251 // first FetchNext() and the second iteration then we can end up behaving
252 // poorly (since we do a Fetch() when buffer_size_ == 0).
253 if (buffer_size_ == 0) {
254 break;
255 }
James Kuszmaul45139e62021-09-11 11:41:03 -0700256 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800257 }
258 for (auto &conn : channels_) {
James Kuszmaula5822682021-12-23 18:39:28 -0800259 std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first.lock();
260 CHECK(rtc_channel) << "data_channel was destroyed too early.";
James Kuszmaul71a81932020-12-15 21:08:01 -0800261 ChannelInformation *channel_data = &conn.second;
262 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
263 SkipToLastMessage(channel_data);
264 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700265 std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
266 while (buffer) {
267 // TODO(austin): This is a nop so we just buffer forever. Fix this when
268 // we care.
James Kuszmaul71a81932020-12-15 21:08:01 -0800269 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800270 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800271 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800272 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700273
274 rtc_channel->Send(buffer.get());
James Kuszmaul71a81932020-12-15 21:08:01 -0800275 buffer = NextBuffer(channel_data);
276 }
277 }
278 if (buffer_size_ >= 0) {
279 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
280 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800281 }
282 }
283}
284
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700285void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
286 TransferMethod transfer_method) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800287 ChannelInformation info;
288 info.transfer_method = transfer_method;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700289
James Kuszmaula5822682021-12-23 18:39:28 -0800290 channels_.emplace_back(std::make_pair(data_channel, info));
291
292 data_channel->set_on_message(
293 [this, index = channels_.size() - 1](
294 struct mbuf *const buffer,
295 const enum rawrtc_data_channel_message_flag /*flags*/) {
296 FlatbufferSpan<ChannelState> message(
297 {mbuf_buf(buffer), mbuf_get_left(buffer)});
298 if (!message.Verify()) {
299 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
300 return;
301 }
302
303 channels_[index].second.reported_queue_index =
304 message.message().queue_index();
305 channels_[index].second.reported_packet_index =
306 message.message().packet_index();
307 });
James Kuszmaul71a81932020-12-15 21:08:01 -0800308}
309
Austin Schuh60e77942022-05-16 17:48:24 -0700310void Subscriber::RemoveListener(
311 std::shared_ptr<ScopedDataChannel> data_channel) {
James Kuszmaula5822682021-12-23 18:39:28 -0800312 channels_.erase(
313 std::remove_if(
314 channels_.begin(), channels_.end(),
315 [data_channel](const std::pair<std::weak_ptr<ScopedDataChannel>,
316 ChannelInformation> &channel) {
317 return channel.first.lock().get() == data_channel.get();
318 }),
319 channels_.end());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700320}
321
322std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
323 ChannelInformation *channel) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800324 CHECK_NOTNULL(channel);
325 if (message_buffer_.empty()) {
326 return nullptr;
327 }
328 const uint32_t earliest_index = message_buffer_.front().index;
329 const uint32_t latest_index = message_buffer_.back().index;
330 const bool fell_behind = channel->current_queue_index < earliest_index;
331 if (fell_behind) {
332 channel->current_queue_index = earliest_index;
333 channel->next_packet_number = 0;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700334 return message_buffer_.front().data.at(0);
James Kuszmaul71a81932020-12-15 21:08:01 -0800335 }
James Kuszmaula5822682021-12-23 18:39:28 -0800336 // TODO(james): Handle queue index wrapping when >2^32 messages are sent on a
337 // channel.
James Kuszmaul71a81932020-12-15 21:08:01 -0800338 if (channel->current_queue_index > latest_index) {
339 // We are still waiting on the next message to appear; return.
340 return nullptr;
341 }
James Kuszmaula5822682021-12-23 18:39:28 -0800342 if (FLAGS_pre_send_messages > 0) {
343 // Don't buffer up an excessive number of messages to the client.
344 // This currently ignores the packet index (and really, any concept of
345 // message size), but the main goal is just to avoid locking up the client
346 // browser, not to be ultra precise about anything. It's also not clear that
347 // message *size* is necessarily even the determining factor in causing
348 // issues.
349 if (channel->reported_queue_index + FLAGS_pre_send_messages <
350 channel->current_queue_index) {
351 return nullptr;
352 }
353 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800354 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
355 << "Inconsistent queue indices.";
356 const size_t packets_in_message =
357 message_buffer_[channel->current_queue_index - earliest_index]
358 .data.size();
359 CHECK_LT(0u, packets_in_message);
360 CHECK_LT(channel->next_packet_number, packets_in_message);
361
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700362 std::shared_ptr<struct mbuf> original_data =
363 message_buffer_[channel->current_queue_index - earliest_index].data.at(
James Kuszmaul71a81932020-12-15 21:08:01 -0800364 channel->next_packet_number);
365
366 ++channel->next_packet_number;
367 if (channel->next_packet_number == packets_in_message) {
368 ++channel->current_queue_index;
369 channel->next_packet_number = 0;
370 }
371
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700372 // Trigger a copy of the mbuf without copying the data.
373 return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
374 mem_deref);
James Kuszmaul71a81932020-12-15 21:08:01 -0800375}
376
377void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
378 CHECK_NOTNULL(channel);
379 if (message_buffer_.empty() ||
380 channel->current_queue_index == message_buffer_.back().index) {
381 return;
382 }
383 channel->current_queue_index = message_buffer_.back().index;
James Kuszmaul87200a42022-03-26 18:09:18 -0700384 channel->reported_queue_index = message_buffer_.back().index;
James Kuszmaul71a81932020-12-15 21:08:01 -0800385 channel->next_packet_number = 0;
386}
387
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700388ApplicationConnection::ApplicationConnection(
389 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
Alex Perry5f474f22020-02-01 12:14:24 -0800390 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800391 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
392 const EventLoop *event_loop)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700393 : server_(server),
394 sock_(sock),
Alex Perry5f474f22020-02-01 12:14:24 -0800395 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800396 config_headers_(PackBuffer(config.span())),
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700397 event_loop_(event_loop) {
398 connection_.set_on_negotiation_needed([]() {
399 VLOG(1) << "Negotiation needed, not offering so not creating offer.";
400 });
Alex Perryb3b50792020-01-18 16:13:45 -0800401
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700402 connection_.set_on_local_candidate(
403 [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
404 char const *const url) { LocalCandidate(candidate, url); });
405
406 connection_.set_on_data_channel(
407 [this](std::shared_ptr<ScopedDataChannel> channel) {
408 OnDataChannel(channel);
409 });
410
411 connection_.Open();
412}
413
414ApplicationConnection::~ApplicationConnection() {
415 for (auto &it : channels_) {
416 it.second.data_channel->Close();
417 it.second.data_channel = nullptr;
418 }
419
420 // Eh, we are done, tell the channel to shut down. If we didn't, it would
421 // just hang around until the connection closes, which is rather shortly
422 // after.
423 if (channel_) {
424 channel_->Close();
425 }
426}
427
428void ApplicationConnection::OnSdp(const char *sdp) {
429 struct rawrtc_peer_connection_description *remote_description = NULL;
430
431 auto error = rawrtc_peer_connection_description_create(
432 &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
433 if (error) {
434 LOG(WARNING) << "Cannot parse remote description: "
435 << rawrtc_code_to_str(error);
James Kuszmaul48671362020-12-24 13:54:16 -0800436 return;
437 }
Alex Perryb3b50792020-01-18 16:13:45 -0800438
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700439 CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
440 connection_.connection(), remote_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800441
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700442 struct rawrtc_peer_connection_description *local_description;
443 CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
444 connection_.connection()));
445 CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
446 connection_.connection(), local_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800447
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700448 enum rawrtc_sdp_type type;
449 char *local_sdp = nullptr;
450 // Get SDP type & the SDP itself
451 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
452 &type, local_description));
453 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
454 local_description));
James Kuszmaul8d928d02020-12-25 17:47:49 -0800455
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700456 flatbuffers::FlatBufferBuilder fbb;
Alex Perryb3b50792020-01-18 16:13:45 -0800457 flatbuffers::Offset<WebSocketSdp> sdp_fb =
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700458 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800459 flatbuffers::Offset<WebSocketMessage> answer_message =
460 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700461
462 VLOG(1) << aos::FlatbufferToJson(
463 flatbuffers::GetTemporaryPointer(fbb, answer_message));
Alex Perryb3b50792020-01-18 16:13:45 -0800464 fbb.Finish(answer_message);
465
466 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700467 mem_deref(local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800468}
469
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700470void ApplicationConnection::OnIce(const WebSocketIce *ice) {
471 if (!ice->has_candidate()) {
472 return;
473 }
Brian Silverman225c5072021-11-17 19:56:31 -0800474 uint8_t sdp_m_line_index = ice->sdp_m_line_index();
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700475
476 struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
477 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
Brian Silverman225c5072021-11-17 19:56:31 -0800478 &ice_candidate, ice->candidate()->c_str(), ice->sdp_mid()->c_str(),
479 &sdp_m_line_index, nullptr));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700480
481 rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
482 ice_candidate);
483
484 mem_deref(ice_candidate);
485}
486
487void ApplicationConnection::LocalCandidate(
488 struct rawrtc_peer_connection_ice_candidate *const candidate,
489 char const *const url) {
490 struct rawrtc_ice_candidate *ortc_candidate = nullptr;
491 if (candidate) {
492 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
493 &ortc_candidate, candidate));
494
495 flatbuffers::FlatBufferBuilder fbb;
496 char *sdpp = nullptr;
497 CHECK_RAWRTC(
498 rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
499 char *midp = nullptr;
500 CHECK_RAWRTC(
501 rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
502
503 uint8_t media_line_index;
504 enum rawrtc_code error =
505 rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
506 &media_line_index, candidate);
507
508 flatbuffers::Offset<flatbuffers::String> sdpp_offset =
509 fbb.CreateString(sdpp);
510 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
511 fbb.CreateString(midp);
512
513 WebSocketIce::Builder web_socket_ice_builder(fbb);
514
515 web_socket_ice_builder.add_candidate(sdpp_offset);
Brian Silverman225c5072021-11-17 19:56:31 -0800516 web_socket_ice_builder.add_sdp_mid(sdp_mid_offset);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700517
518 if (error == RAWRTC_CODE_SUCCESS) {
Brian Silverman225c5072021-11-17 19:56:31 -0800519 web_socket_ice_builder.add_sdp_m_line_index(media_line_index);
James Kuszmaul1ec74432020-07-30 20:26:45 -0700520 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700521 flatbuffers::Offset<WebSocketIce> ice_offset =
522 web_socket_ice_builder.Finish();
523
524 flatbuffers::Offset<WebSocketMessage> ice_message =
525 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
526 VLOG(1) << url << ": "
527 << aos::FlatbufferToJson(
528 flatbuffers::GetTemporaryPointer(fbb, ice_message));
529 fbb.Finish(ice_message);
530
531 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
532
533 mem_deref(sdpp);
534 mem_deref(midp);
Alex Perry5f474f22020-02-01 12:14:24 -0800535 }
536}
537
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700538void ApplicationConnection::OnDataChannel(
539 std::shared_ptr<ScopedDataChannel> channel) {
540 if (channel->label() == std::string_view("signalling")) {
541 CHECK(!channel_);
542 channel_ = channel;
543
544 channel_->set_on_message(
545 [this](struct mbuf *const buffer,
546 const enum rawrtc_data_channel_message_flag flags) {
547 HandleSignallingData(buffer, flags);
548 });
549
550 channel_->set_on_open([this]() {
551 for (const auto &header : config_headers_) {
552 channel_->Send(header.buffer());
553 }
554 });
555
556 channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
557
558 // Register an on_close callback which does nothing but keeps channel alive
559 // until it is done. This keeps the memory around until rawrtc can finish
560 // calling the close callback.
561 channel_->set_on_close([channel]() {});
562 } else {
563 channel_->set_on_close([channel]() {});
564 channel->Close();
565 }
566}
567
568void ApplicationConnection::HandleSignallingData(
569 struct mbuf *const
570 buffer, // nullable (in case partial delivery has been requested)
571 const enum rawrtc_data_channel_message_flag /*flags*/) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800572 FlatbufferSpan<SubscriberRequest> message(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700573 {mbuf_buf(buffer), mbuf_get_left(buffer)});
James Kuszmaul71a81932020-12-15 21:08:01 -0800574 if (!message.Verify()) {
575 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
576 return;
577 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700578 VLOG(1) << "Got a subscription message "
James Kuszmaul71a81932020-12-15 21:08:01 -0800579 << aos::FlatbufferToJson(&message.message());
580 if (!message.message().has_channels_to_transfer()) {
581 LOG(ERROR) << "No channels requested for transfer.";
582 return;
583 }
Alex Perry5f474f22020-02-01 12:14:24 -0800584
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700585 // The client each time sends a full list of everything it wants to be
586 // subscribed to. It is our responsibility to remove channels which aren't
587 // in that list and add ones which need to be.
588 //
589 // Start by clearing a tracking bit on each channel. This takes O(number of
590 // open channels), which should be small.
591 //
592 // Then open any new channels. For any we visit which are already open,
593 // don't update those.
594 //
595 // Finally, iterate over the channel list and purge anything which we didn't
596 // touch.
597 for (auto &it : channels_) {
598 it.second.requested = false;
599 }
600 for (auto channel_request : *message.message().channels_to_transfer()) {
601 const Channel *channel = channel_request->channel();
602 if (channel == nullptr) {
603 LOG(ERROR) << "Got unpopulated channel.";
604 continue;
Alex Perry5f474f22020-02-01 12:14:24 -0800605 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700606 const TransferMethod transfer_method = channel_request->method();
607 // Call GetChannel() before comparing the channel name/type to each
608 // subscriber. This allows us to resolve any node or application
609 // specific mappings.
610 const Channel *comparison_channel =
611 configuration::GetChannel(event_loop_->configuration(), channel,
612 event_loop_->name(), event_loop_->node());
613 if (comparison_channel == nullptr) {
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700614 LOG(ERROR) << "Channel does not exist: "
615 << configuration::StrippedChannelToString(channel);
616 continue;
617 }
618 if (!configuration::ChannelIsReadableOnNode(comparison_channel,
619 event_loop_->node())) {
620 LOG(ERROR) << "Channel not available on node "
621 << event_loop_->node()->name()->string_view() << ": "
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700622 << configuration::StrippedChannelToString(channel);
623 continue;
624 }
625
626 size_t channel_index = configuration::ChannelIndex(
627 event_loop_->configuration(), comparison_channel);
628
629 auto it = channels_.find(channel_index);
630 if (it == channels_.end()) {
631 std::shared_ptr<ScopedDataChannel> data_channel =
James Kuszmaula5822682021-12-23 18:39:28 -0800632 ScopedDataChannel::MakeDataChannel();
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700633
634 std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
635
636 data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
637 channel_index]() {
638 std::shared_ptr<ScopedDataChannel> data_channel =
639 data_channel_weak_ptr.lock();
640 CHECK(data_channel) << ": Subscriber got destroyed before we started.";
James Kuszmaula5822682021-12-23 18:39:28 -0800641 // Raw pointer inside the subscriber so we don't have a circular
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700642 // reference. AddListener will close it.
Austin Schuh60e77942022-05-16 17:48:24 -0700643 subscribers_[channel_index]->AddListener(data_channel, transfer_method);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700644 });
645
646 Subscriber *subscriber = subscribers_[channel_index].get();
647 data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
648 std::shared_ptr<ScopedDataChannel> data_channel =
649 data_channel_weak_ptr.lock();
650 CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
651 subscriber->RemoveListener(data_channel);
652 });
653
654 data_channel->Open(
655 connection_.connection(),
656 absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
657
658 auto pair = channels_.insert({channel_index, {data_channel, true}});
659 it = pair.first;
660 }
661
662 it->second.requested = true;
663
664 VLOG(1) << "Subscribe to: " << channel->type()->str();
665 }
666
667 for (auto &it : channels_) {
668 if (!it.second.requested) {
669 it.second.data_channel->Close();
670 it.second.data_channel = nullptr;
Alex Perry5f474f22020-02-01 12:14:24 -0800671 }
672 }
Alex Perryb3b50792020-01-18 16:13:45 -0800673}
674
675} // namespace web_proxy
676} // namespace aos