blob: 7f52611bc67c338fa62f8d73ffe1a4f12c7c7554 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "glog/logging.h"
4
Alex Perry5f474f22020-02-01 12:14:24 -08005#include "aos/flatbuffer_merge.h"
6#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08007#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08008#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08009#include "aos/seasocks/seasocks_logger.h"
James Kuszmaul48671362020-12-24 13:54:16 -080010#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080011
Austin Schuh52e5e3a2021-04-24 22:30:02 -070012extern "C" {
13#include <rawrtc.h>
14
15#define DEBUG_LEVEL 7
16#define DEBUG_MODULE "web-proxy"
17#include <re_dbg.h>
18struct list *tmrl_get(void);
19}
20
Austin Schuhbbc3df32021-10-29 23:06:39 -070021DEFINE_int32(proxy_port, 1180, "Port to use for the web proxy server.");
James Kuszmaula5822682021-12-23 18:39:28 -080022DEFINE_int32(pre_send_messages, 10000,
23 "Number of messages / queue to send to a client before waiting on "
24 "confirmation that the initial message was received. If set to "
25 "-1, will not throttle messages at all. This prevents a situation "
26 "where, when run on localhost, the large number of WebRTC packets "
27 "can overwhelm the browser and crash the webpage.");
James Kuszmaul1e95bed2021-01-09 21:02:49 -080028
Alex Perryb3b50792020-01-18 16:13:45 -080029namespace aos {
30namespace web_proxy {
James Kuszmaul7ad91522020-09-01 19:15:35 -070031WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul1a29c082022-02-03 14:02:47 -080032 aos::EventLoop *event_loop,
33 StoreHistory store_history,
34 int per_channel_buffer_size_bytes)
James Kuszmaul7ad91522020-09-01 19:15:35 -070035 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080036 config_(aos::CopyFlatBuffer(event_loop->configuration())),
37 event_loop_(event_loop) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070038 if (VLOG_IS_ON(2)) {
39 dbg_init(DBG_DEBUG, DBG_ALL);
40 }
41 CHECK_RAWRTC(rawrtc_init(true));
42
James Kuszmaul48671362020-12-24 13:54:16 -080043 // We need to reference findEmbeddedContent() to make the linker happy...
44 findEmbeddedContent("");
Austin Schuh52e5e3a2021-04-24 22:30:02 -070045 const aos::Node *self = event_loop_->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070046
Austin Schuh52e5e3a2021-04-24 22:30:02 -070047 subscribers_.reserve(event_loop_->configuration()->channels()->size());
48 for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
49 auto channel = event_loop_->configuration()->channels()->Get(i);
James Kuszmaul7ad91522020-09-01 19:15:35 -070050 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070051 auto fetcher = event_loop_->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080052 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
James Kuszmaul1a29c082022-02-03 14:02:47 -080053 std::move(fetcher), i, store_history,
54 per_channel_buffer_size_bytes < 0
55 ? -1
56 : per_channel_buffer_size_bytes / channel->max_size()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -070057 } else {
58 subscribers_.emplace_back(nullptr);
James Kuszmaul7ad91522020-09-01 19:15:35 -070059 }
60 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -070061 TimerHandler *const timer = event_loop_->AddTimer([this]() {
James Kuszmaul7ad91522020-09-01 19:15:35 -070062 for (auto &subscriber : subscribers_) {
James Kuszmaul147b4c12022-07-13 20:35:27 -070063 if (subscriber) subscriber->RunIteration(recording_);
James Kuszmaul7ad91522020-09-01 19:15:35 -070064 }
65 });
66
Austin Schuh52e5e3a2021-04-24 22:30:02 -070067 event_loop_->OnRun([this, timer]() {
Philipp Schradera6712522023-07-05 20:25:11 -070068 timer->Schedule(event_loop_->monotonic_now(),
69 std::chrono::milliseconds(100));
James Kuszmaul7ad91522020-09-01 19:15:35 -070070 });
71}
Alex Perryb3b50792020-01-18 16:13:45 -080072
73void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070074 std::unique_ptr<ApplicationConnection> connection =
75 std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
76 config_, event_loop_);
77
78 connections_.insert({sock, std::move(connection)});
Alex Perryb3b50792020-01-18 16:13:45 -080079}
80
81void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
82 size_t size) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070083 const FlatbufferSpan<WebSocketMessage> message({data, size});
84 if (!message.Verify()) {
85 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
86 return;
87 }
88 VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
89 switch (message.message().payload_type()) {
90 case Payload::WebSocketSdp: {
91 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
92 if (offer->type() != SdpType::OFFER) {
93 LOG(WARNING) << "Got the wrong sdp type from client";
94 break;
95 }
96 const flatbuffers::String *sdp = offer->payload();
97 connections_[sock]->OnSdp(sdp->c_str());
98 break;
99 }
100 case Payload::WebSocketIce: {
101 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
102 connections_[sock]->OnIce(ice);
103 break;
104 }
Brian Silverman225c5072021-11-17 19:56:31 -0800105 default: {
106 break;
107 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700108 }
Alex Perryb3b50792020-01-18 16:13:45 -0800109}
110
111void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
112 connections_.erase(sock);
113}
114
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700115// Global epoll pointer
116static aos::internal::EPoll *global_epoll = nullptr;
117
118static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
119 if (flags & 0x1) {
120 global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
121 }
122 if (flags & 0x2) {
123 global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
124 }
125 if (flags & 0x4) {
126 global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
127 }
128 return 0;
129}
130
131static void ReFdClose(int fd) {
132 CHECK(global_epoll != nullptr);
133 global_epoll->DeleteFd(fd);
134}
135
James Kuszmaul1a29c082022-02-03 14:02:47 -0800136WebProxy::WebProxy(aos::EventLoop *event_loop, StoreHistory store_history,
137 int per_channel_buffer_size_bytes)
138 : WebProxy(event_loop, &internal_epoll_, store_history,
139 per_channel_buffer_size_bytes) {}
James Kuszmaul71a81932020-12-15 21:08:01 -0800140
James Kuszmaul1a29c082022-02-03 14:02:47 -0800141WebProxy::WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
142 int per_channel_buffer_size_bytes)
143 : WebProxy(event_loop, event_loop->epoll(), store_history,
144 per_channel_buffer_size_bytes) {}
James Kuszmaul71a81932020-12-15 21:08:01 -0800145
146WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
James Kuszmaul1a29c082022-02-03 14:02:47 -0800147 StoreHistory store_history,
148 int per_channel_buffer_size_bytes)
James Kuszmaul71a81932020-12-15 21:08:01 -0800149 : epoll_(epoll),
150 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
151 ::seasocks::Logger::Level::Info)),
James Kuszmaul1a29c082022-02-03 14:02:47 -0800152 websocket_handler_(new WebsocketHandler(
153 &server_, event_loop, store_history, per_channel_buffer_size_bytes)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700154 CHECK(!global_epoll);
155 global_epoll = epoll;
156
157 re_fd_set_listen_callback(&ReFdListen);
158 re_fd_set_close_callback(&ReFdClose);
159
160 epoll->BeforeWait([]() {
161 const uint64_t to = tmr_next_timeout(tmrl_get());
162 if (to != 0) {
Austin Schuh0c8dd362021-10-30 10:23:25 -0700163 VLOG(3) << "Next timeout " << to;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700164 }
165 // Note: this only works because we are spinning on it...
166 // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
167 // for handling tmr.
168 tmr_poll(tmrl_get());
169 });
170
James Kuszmaul71a81932020-12-15 21:08:01 -0800171 server_.addWebSocketHandler("/ws", websocket_handler_);
James Kuszmaul1e95bed2021-01-09 21:02:49 -0800172 CHECK(server_.startListening(FLAGS_proxy_port));
James Kuszmaul71a81932020-12-15 21:08:01 -0800173
174 epoll->OnReadable(server_.fd(), [this]() {
175 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
176 });
177
178 if (&internal_epoll_ == epoll) {
179 TimerHandler *const timer = event_loop->AddTimer([this]() {
180 // Run the epoll poller until there are no more events (if we are being
181 // backed by a shm event loop, there won't be anything registered to
182 // internal_epoll_ and this will just return false).
183 // We just deal with clearing all the epoll events using a simulated
184 // timer. This does mean that we will spin rather than actually sleeping
185 // in any coherent manner, which will be particularly noticeable when past
186 // the end of processing other events.
187 while (internal_epoll_.Poll(false)) {
188 continue;
189 }
190 });
191
192 event_loop->OnRun([timer, event_loop]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700193 timer->Schedule(event_loop->monotonic_now(),
194 std::chrono::milliseconds(10));
James Kuszmaul71a81932020-12-15 21:08:01 -0800195 });
196 }
197}
198
199WebProxy::~WebProxy() {
200 epoll_->DeleteFd(server_.fd());
201 server_.terminate();
202 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700203 CHECK(global_epoll == epoll_);
204 global_epoll = nullptr;
James Kuszmaul71a81932020-12-15 21:08:01 -0800205}
206
James Kuszmaul147b4c12022-07-13 20:35:27 -0700207void WebProxy::StopRecording() { websocket_handler_->StopRecording(); }
Alex Perry5f474f22020-02-01 12:14:24 -0800208
James Kuszmaul147b4c12022-07-13 20:35:27 -0700209void Subscriber::RunIteration(bool fetch_new) {
210 if (fetch_new) {
211 if (channels_.empty() && (buffer_size_ == 0 || !store_history_)) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800212 fetcher_->Fetch();
James Kuszmaul147b4c12022-07-13 20:35:27 -0700213 message_buffer_.clear();
214 return;
James Kuszmaul71a81932020-12-15 21:08:01 -0800215 }
Alex Perry5f474f22020-02-01 12:14:24 -0800216
James Kuszmaul147b4c12022-07-13 20:35:27 -0700217 while (fetcher_->FetchNext()) {
218 // If we aren't building up a buffer, short-circuit the FetchNext().
219 if (buffer_size_ == 0) {
220 fetcher_->Fetch();
Austin Schuhd16ef442021-04-25 14:44:42 -0700221 }
James Kuszmaul147b4c12022-07-13 20:35:27 -0700222 Message message;
223 message.index = fetcher_->context().queue_index;
224 VLOG(2) << "Packing a message with "
225 << GetPacketCount(fetcher_->context()) << "packets";
226 for (int packet_index = 0;
227 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
228 // Pack directly into the mbuffer. This is admittedly a bit painful.
229 const size_t packet_size =
230 PackedMessageSize(fetcher_->context(), packet_index);
231 struct mbuf *mbuffer = mbuf_alloc(packet_size);
Alex Perry5f474f22020-02-01 12:14:24 -0800232
James Kuszmaul147b4c12022-07-13 20:35:27 -0700233 {
234 // Wrap a pre-allocated builder around the mbuffer.
235 PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
236 flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
237 flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
238 &fbb, fetcher_->context(), channel_index_, packet_index);
239 fbb.Finish(message_offset);
240
241 // Now, the flatbuffer is built from the back to the front. So any
Philipp Schradera6712522023-07-05 20:25:11 -0700242 // extra memory will be at the front. Set up the end and start
James Kuszmaul147b4c12022-07-13 20:35:27 -0700243 // pointers on the mbuf.
244 mbuf_set_end(mbuffer, packet_size);
245 mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
246 }
247
248 message.data.emplace_back(
249 std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
250 }
251 message_buffer_.push_back(std::move(message));
252 // If we aren't keeping a buffer, then we should only do one iteration of
253 // the while loop--otherwise, if additional messages arrive between the
254 // first FetchNext() and the second iteration then we can end up behaving
255 // poorly (since we do a Fetch() when buffer_size_ == 0).
256 if (buffer_size_ == 0) {
257 break;
258 }
James Kuszmaul45139e62021-09-11 11:41:03 -0700259 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800260 }
261 for (auto &conn : channels_) {
James Kuszmaula5822682021-12-23 18:39:28 -0800262 std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first.lock();
263 CHECK(rtc_channel) << "data_channel was destroyed too early.";
James Kuszmaul71a81932020-12-15 21:08:01 -0800264 ChannelInformation *channel_data = &conn.second;
265 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
266 SkipToLastMessage(channel_data);
267 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700268 std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
269 while (buffer) {
270 // TODO(austin): This is a nop so we just buffer forever. Fix this when
271 // we care.
James Kuszmaul71a81932020-12-15 21:08:01 -0800272 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800273 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800274 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800275 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700276
277 rtc_channel->Send(buffer.get());
James Kuszmaul71a81932020-12-15 21:08:01 -0800278 buffer = NextBuffer(channel_data);
279 }
280 }
281 if (buffer_size_ >= 0) {
282 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
283 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800284 }
285 }
286}
287
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700288void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
289 TransferMethod transfer_method) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800290 ChannelInformation info;
291 info.transfer_method = transfer_method;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700292
James Kuszmaula5822682021-12-23 18:39:28 -0800293 channels_.emplace_back(std::make_pair(data_channel, info));
294
295 data_channel->set_on_message(
296 [this, index = channels_.size() - 1](
297 struct mbuf *const buffer,
298 const enum rawrtc_data_channel_message_flag /*flags*/) {
299 FlatbufferSpan<ChannelState> message(
300 {mbuf_buf(buffer), mbuf_get_left(buffer)});
301 if (!message.Verify()) {
302 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
303 return;
304 }
305
306 channels_[index].second.reported_queue_index =
307 message.message().queue_index();
308 channels_[index].second.reported_packet_index =
309 message.message().packet_index();
310 });
James Kuszmaul71a81932020-12-15 21:08:01 -0800311}
312
Austin Schuh60e77942022-05-16 17:48:24 -0700313void Subscriber::RemoveListener(
314 std::shared_ptr<ScopedDataChannel> data_channel) {
James Kuszmaula5822682021-12-23 18:39:28 -0800315 channels_.erase(
316 std::remove_if(
317 channels_.begin(), channels_.end(),
318 [data_channel](const std::pair<std::weak_ptr<ScopedDataChannel>,
319 ChannelInformation> &channel) {
320 return channel.first.lock().get() == data_channel.get();
321 }),
322 channels_.end());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700323}
324
325std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
326 ChannelInformation *channel) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800327 CHECK_NOTNULL(channel);
328 if (message_buffer_.empty()) {
329 return nullptr;
330 }
331 const uint32_t earliest_index = message_buffer_.front().index;
332 const uint32_t latest_index = message_buffer_.back().index;
333 const bool fell_behind = channel->current_queue_index < earliest_index;
334 if (fell_behind) {
335 channel->current_queue_index = earliest_index;
336 channel->next_packet_number = 0;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700337 return message_buffer_.front().data.at(0);
James Kuszmaul71a81932020-12-15 21:08:01 -0800338 }
James Kuszmaula5822682021-12-23 18:39:28 -0800339 // TODO(james): Handle queue index wrapping when >2^32 messages are sent on a
340 // channel.
James Kuszmaul71a81932020-12-15 21:08:01 -0800341 if (channel->current_queue_index > latest_index) {
342 // We are still waiting on the next message to appear; return.
343 return nullptr;
344 }
James Kuszmaula5822682021-12-23 18:39:28 -0800345 if (FLAGS_pre_send_messages > 0) {
346 // Don't buffer up an excessive number of messages to the client.
347 // This currently ignores the packet index (and really, any concept of
348 // message size), but the main goal is just to avoid locking up the client
349 // browser, not to be ultra precise about anything. It's also not clear that
350 // message *size* is necessarily even the determining factor in causing
351 // issues.
352 if (channel->reported_queue_index + FLAGS_pre_send_messages <
353 channel->current_queue_index) {
354 return nullptr;
355 }
356 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800357 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
358 << "Inconsistent queue indices.";
359 const size_t packets_in_message =
360 message_buffer_[channel->current_queue_index - earliest_index]
361 .data.size();
362 CHECK_LT(0u, packets_in_message);
363 CHECK_LT(channel->next_packet_number, packets_in_message);
364
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700365 std::shared_ptr<struct mbuf> original_data =
366 message_buffer_[channel->current_queue_index - earliest_index].data.at(
James Kuszmaul71a81932020-12-15 21:08:01 -0800367 channel->next_packet_number);
368
369 ++channel->next_packet_number;
370 if (channel->next_packet_number == packets_in_message) {
371 ++channel->current_queue_index;
372 channel->next_packet_number = 0;
373 }
374
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700375 // Trigger a copy of the mbuf without copying the data.
376 return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
377 mem_deref);
James Kuszmaul71a81932020-12-15 21:08:01 -0800378}
379
380void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
381 CHECK_NOTNULL(channel);
382 if (message_buffer_.empty() ||
383 channel->current_queue_index == message_buffer_.back().index) {
384 return;
385 }
386 channel->current_queue_index = message_buffer_.back().index;
James Kuszmaul87200a42022-03-26 18:09:18 -0700387 channel->reported_queue_index = message_buffer_.back().index;
James Kuszmaul71a81932020-12-15 21:08:01 -0800388 channel->next_packet_number = 0;
389}
390
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700391ApplicationConnection::ApplicationConnection(
392 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
Alex Perry5f474f22020-02-01 12:14:24 -0800393 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800394 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
395 const EventLoop *event_loop)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700396 : server_(server),
397 sock_(sock),
Alex Perry5f474f22020-02-01 12:14:24 -0800398 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800399 config_headers_(PackBuffer(config.span())),
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700400 event_loop_(event_loop) {
401 connection_.set_on_negotiation_needed([]() {
402 VLOG(1) << "Negotiation needed, not offering so not creating offer.";
403 });
Alex Perryb3b50792020-01-18 16:13:45 -0800404
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700405 connection_.set_on_local_candidate(
406 [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
407 char const *const url) { LocalCandidate(candidate, url); });
408
409 connection_.set_on_data_channel(
410 [this](std::shared_ptr<ScopedDataChannel> channel) {
411 OnDataChannel(channel);
412 });
413
414 connection_.Open();
415}
416
417ApplicationConnection::~ApplicationConnection() {
418 for (auto &it : channels_) {
419 it.second.data_channel->Close();
420 it.second.data_channel = nullptr;
421 }
422
423 // Eh, we are done, tell the channel to shut down. If we didn't, it would
424 // just hang around until the connection closes, which is rather shortly
425 // after.
426 if (channel_) {
427 channel_->Close();
428 }
429}
430
431void ApplicationConnection::OnSdp(const char *sdp) {
432 struct rawrtc_peer_connection_description *remote_description = NULL;
433
434 auto error = rawrtc_peer_connection_description_create(
435 &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
436 if (error) {
437 LOG(WARNING) << "Cannot parse remote description: "
438 << rawrtc_code_to_str(error);
James Kuszmaul48671362020-12-24 13:54:16 -0800439 return;
440 }
Alex Perryb3b50792020-01-18 16:13:45 -0800441
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700442 CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
443 connection_.connection(), remote_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800444
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700445 struct rawrtc_peer_connection_description *local_description;
446 CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
447 connection_.connection()));
448 CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
449 connection_.connection(), local_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800450
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700451 enum rawrtc_sdp_type type;
452 char *local_sdp = nullptr;
453 // Get SDP type & the SDP itself
454 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
455 &type, local_description));
456 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
457 local_description));
James Kuszmaul8d928d02020-12-25 17:47:49 -0800458
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700459 flatbuffers::FlatBufferBuilder fbb;
Alex Perryb3b50792020-01-18 16:13:45 -0800460 flatbuffers::Offset<WebSocketSdp> sdp_fb =
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700461 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800462 flatbuffers::Offset<WebSocketMessage> answer_message =
463 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700464
465 VLOG(1) << aos::FlatbufferToJson(
466 flatbuffers::GetTemporaryPointer(fbb, answer_message));
Alex Perryb3b50792020-01-18 16:13:45 -0800467 fbb.Finish(answer_message);
468
469 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700470 mem_deref(local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800471}
472
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700473void ApplicationConnection::OnIce(const WebSocketIce *ice) {
474 if (!ice->has_candidate()) {
475 return;
476 }
Brian Silverman225c5072021-11-17 19:56:31 -0800477 uint8_t sdp_m_line_index = ice->sdp_m_line_index();
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700478
479 struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
480 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
Brian Silverman225c5072021-11-17 19:56:31 -0800481 &ice_candidate, ice->candidate()->c_str(), ice->sdp_mid()->c_str(),
482 &sdp_m_line_index, nullptr));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700483
484 rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
485 ice_candidate);
486
487 mem_deref(ice_candidate);
488}
489
490void ApplicationConnection::LocalCandidate(
491 struct rawrtc_peer_connection_ice_candidate *const candidate,
492 char const *const url) {
493 struct rawrtc_ice_candidate *ortc_candidate = nullptr;
494 if (candidate) {
495 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
496 &ortc_candidate, candidate));
497
498 flatbuffers::FlatBufferBuilder fbb;
499 char *sdpp = nullptr;
500 CHECK_RAWRTC(
501 rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
502 char *midp = nullptr;
503 CHECK_RAWRTC(
504 rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
505
506 uint8_t media_line_index;
507 enum rawrtc_code error =
508 rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
509 &media_line_index, candidate);
510
511 flatbuffers::Offset<flatbuffers::String> sdpp_offset =
512 fbb.CreateString(sdpp);
513 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
514 fbb.CreateString(midp);
515
516 WebSocketIce::Builder web_socket_ice_builder(fbb);
517
518 web_socket_ice_builder.add_candidate(sdpp_offset);
Brian Silverman225c5072021-11-17 19:56:31 -0800519 web_socket_ice_builder.add_sdp_mid(sdp_mid_offset);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700520
521 if (error == RAWRTC_CODE_SUCCESS) {
Brian Silverman225c5072021-11-17 19:56:31 -0800522 web_socket_ice_builder.add_sdp_m_line_index(media_line_index);
James Kuszmaul1ec74432020-07-30 20:26:45 -0700523 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700524 flatbuffers::Offset<WebSocketIce> ice_offset =
525 web_socket_ice_builder.Finish();
526
527 flatbuffers::Offset<WebSocketMessage> ice_message =
528 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
529 VLOG(1) << url << ": "
530 << aos::FlatbufferToJson(
531 flatbuffers::GetTemporaryPointer(fbb, ice_message));
532 fbb.Finish(ice_message);
533
534 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
535
536 mem_deref(sdpp);
537 mem_deref(midp);
Alex Perry5f474f22020-02-01 12:14:24 -0800538 }
539}
540
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700541void ApplicationConnection::OnDataChannel(
542 std::shared_ptr<ScopedDataChannel> channel) {
543 if (channel->label() == std::string_view("signalling")) {
544 CHECK(!channel_);
545 channel_ = channel;
546
547 channel_->set_on_message(
548 [this](struct mbuf *const buffer,
549 const enum rawrtc_data_channel_message_flag flags) {
550 HandleSignallingData(buffer, flags);
551 });
552
553 channel_->set_on_open([this]() {
554 for (const auto &header : config_headers_) {
555 channel_->Send(header.buffer());
556 }
557 });
558
559 channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
560
561 // Register an on_close callback which does nothing but keeps channel alive
562 // until it is done. This keeps the memory around until rawrtc can finish
563 // calling the close callback.
564 channel_->set_on_close([channel]() {});
565 } else {
566 channel_->set_on_close([channel]() {});
567 channel->Close();
568 }
569}
570
571void ApplicationConnection::HandleSignallingData(
572 struct mbuf *const
573 buffer, // nullable (in case partial delivery has been requested)
574 const enum rawrtc_data_channel_message_flag /*flags*/) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800575 FlatbufferSpan<SubscriberRequest> message(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700576 {mbuf_buf(buffer), mbuf_get_left(buffer)});
James Kuszmaul71a81932020-12-15 21:08:01 -0800577 if (!message.Verify()) {
578 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
579 return;
580 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700581 VLOG(1) << "Got a subscription message "
James Kuszmaul71a81932020-12-15 21:08:01 -0800582 << aos::FlatbufferToJson(&message.message());
583 if (!message.message().has_channels_to_transfer()) {
584 LOG(ERROR) << "No channels requested for transfer.";
585 return;
586 }
Alex Perry5f474f22020-02-01 12:14:24 -0800587
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700588 // The client each time sends a full list of everything it wants to be
589 // subscribed to. It is our responsibility to remove channels which aren't
590 // in that list and add ones which need to be.
591 //
592 // Start by clearing a tracking bit on each channel. This takes O(number of
593 // open channels), which should be small.
594 //
595 // Then open any new channels. For any we visit which are already open,
596 // don't update those.
597 //
598 // Finally, iterate over the channel list and purge anything which we didn't
599 // touch.
600 for (auto &it : channels_) {
601 it.second.requested = false;
602 }
603 for (auto channel_request : *message.message().channels_to_transfer()) {
604 const Channel *channel = channel_request->channel();
605 if (channel == nullptr) {
606 LOG(ERROR) << "Got unpopulated channel.";
607 continue;
Alex Perry5f474f22020-02-01 12:14:24 -0800608 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700609 const TransferMethod transfer_method = channel_request->method();
610 // Call GetChannel() before comparing the channel name/type to each
611 // subscriber. This allows us to resolve any node or application
612 // specific mappings.
613 const Channel *comparison_channel =
614 configuration::GetChannel(event_loop_->configuration(), channel,
615 event_loop_->name(), event_loop_->node());
616 if (comparison_channel == nullptr) {
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700617 LOG(ERROR) << "Channel does not exist: "
618 << configuration::StrippedChannelToString(channel);
619 continue;
620 }
621 if (!configuration::ChannelIsReadableOnNode(comparison_channel,
622 event_loop_->node())) {
623 LOG(ERROR) << "Channel not available on node "
624 << event_loop_->node()->name()->string_view() << ": "
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700625 << configuration::StrippedChannelToString(channel);
626 continue;
627 }
628
629 size_t channel_index = configuration::ChannelIndex(
630 event_loop_->configuration(), comparison_channel);
631
632 auto it = channels_.find(channel_index);
633 if (it == channels_.end()) {
634 std::shared_ptr<ScopedDataChannel> data_channel =
James Kuszmaula5822682021-12-23 18:39:28 -0800635 ScopedDataChannel::MakeDataChannel();
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700636
637 std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
638
639 data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
640 channel_index]() {
641 std::shared_ptr<ScopedDataChannel> data_channel =
642 data_channel_weak_ptr.lock();
643 CHECK(data_channel) << ": Subscriber got destroyed before we started.";
James Kuszmaula5822682021-12-23 18:39:28 -0800644 // Raw pointer inside the subscriber so we don't have a circular
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700645 // reference. AddListener will close it.
Austin Schuh60e77942022-05-16 17:48:24 -0700646 subscribers_[channel_index]->AddListener(data_channel, transfer_method);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700647 });
648
649 Subscriber *subscriber = subscribers_[channel_index].get();
650 data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
651 std::shared_ptr<ScopedDataChannel> data_channel =
652 data_channel_weak_ptr.lock();
653 CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
654 subscriber->RemoveListener(data_channel);
655 });
656
657 data_channel->Open(
658 connection_.connection(),
659 absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
660
661 auto pair = channels_.insert({channel_index, {data_channel, true}});
662 it = pair.first;
663 }
664
665 it->second.requested = true;
666
667 VLOG(1) << "Subscribe to: " << channel->type()->str();
668 }
669
670 for (auto &it : channels_) {
671 if (!it.second.requested) {
672 it.second.data_channel->Close();
673 it.second.data_channel = nullptr;
Alex Perry5f474f22020-02-01 12:14:24 -0800674 }
675 }
Alex Perryb3b50792020-01-18 16:13:45 -0800676}
677
678} // namespace web_proxy
679} // namespace aos