blob: 94473fbd8ac43d48d7bca305b2daffb0d2514eef [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08007#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -08008#include "glog/logging.h"
James Kuszmaul48671362020-12-24 13:54:16 -08009#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080010
Austin Schuh52e5e3a2021-04-24 22:30:02 -070011extern "C" {
12#include <rawrtc.h>
13
14#define DEBUG_LEVEL 7
15#define DEBUG_MODULE "web-proxy"
16#include <re_dbg.h>
17struct list *tmrl_get(void);
18}
19
Austin Schuhbbc3df32021-10-29 23:06:39 -070020DEFINE_int32(proxy_port, 1180, "Port to use for the web proxy server.");
James Kuszmaul1e95bed2021-01-09 21:02:49 -080021
Alex Perryb3b50792020-01-18 16:13:45 -080022namespace aos {
23namespace web_proxy {
James Kuszmaul7ad91522020-09-01 19:15:35 -070024WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul71a81932020-12-15 21:08:01 -080025 aos::EventLoop *event_loop, int buffer_size)
James Kuszmaul7ad91522020-09-01 19:15:35 -070026 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080027 config_(aos::CopyFlatBuffer(event_loop->configuration())),
28 event_loop_(event_loop) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070029 if (VLOG_IS_ON(2)) {
30 dbg_init(DBG_DEBUG, DBG_ALL);
31 }
32 CHECK_RAWRTC(rawrtc_init(true));
33
James Kuszmaul48671362020-12-24 13:54:16 -080034 // We need to reference findEmbeddedContent() to make the linker happy...
35 findEmbeddedContent("");
Austin Schuh52e5e3a2021-04-24 22:30:02 -070036 const aos::Node *self = event_loop_->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070037
Austin Schuh52e5e3a2021-04-24 22:30:02 -070038 subscribers_.reserve(event_loop_->configuration()->channels()->size());
39 for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
40 auto channel = event_loop_->configuration()->channels()->Get(i);
James Kuszmaul7ad91522020-09-01 19:15:35 -070041 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070042 auto fetcher = event_loop_->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080043 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
44 std::move(fetcher), i, buffer_size));
Austin Schuh52e5e3a2021-04-24 22:30:02 -070045 } else {
46 subscribers_.emplace_back(nullptr);
James Kuszmaul7ad91522020-09-01 19:15:35 -070047 }
48 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -070049 TimerHandler *const timer = event_loop_->AddTimer([this]() {
James Kuszmaul7ad91522020-09-01 19:15:35 -070050 for (auto &subscriber : subscribers_) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070051 if (subscriber) subscriber->RunIteration();
James Kuszmaul7ad91522020-09-01 19:15:35 -070052 }
53 });
54
Austin Schuh52e5e3a2021-04-24 22:30:02 -070055 event_loop_->OnRun([this, timer]() {
56 timer->Setup(event_loop_->monotonic_now(), std::chrono::milliseconds(100));
James Kuszmaul7ad91522020-09-01 19:15:35 -070057 });
58}
Alex Perryb3b50792020-01-18 16:13:45 -080059
60void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070061 std::unique_ptr<ApplicationConnection> connection =
62 std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
63 config_, event_loop_);
64
65 connections_.insert({sock, std::move(connection)});
Alex Perryb3b50792020-01-18 16:13:45 -080066}
67
68void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
69 size_t size) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070070 const FlatbufferSpan<WebSocketMessage> message({data, size});
71 if (!message.Verify()) {
72 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
73 return;
74 }
75 VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
76 switch (message.message().payload_type()) {
77 case Payload::WebSocketSdp: {
78 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
79 if (offer->type() != SdpType::OFFER) {
80 LOG(WARNING) << "Got the wrong sdp type from client";
81 break;
82 }
83 const flatbuffers::String *sdp = offer->payload();
84 connections_[sock]->OnSdp(sdp->c_str());
85 break;
86 }
87 case Payload::WebSocketIce: {
88 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
89 connections_[sock]->OnIce(ice);
90 break;
91 }
Brian Silverman225c5072021-11-17 19:56:31 -080092 default: {
93 break;
94 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -070095 }
Alex Perryb3b50792020-01-18 16:13:45 -080096}
97
98void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
99 connections_.erase(sock);
100}
101
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700102// Global epoll pointer
103static aos::internal::EPoll *global_epoll = nullptr;
104
105static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
106 if (flags & 0x1) {
107 global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
108 }
109 if (flags & 0x2) {
110 global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
111 }
112 if (flags & 0x4) {
113 global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
114 }
115 return 0;
116}
117
118static void ReFdClose(int fd) {
119 CHECK(global_epoll != nullptr);
120 global_epoll->DeleteFd(fd);
121}
122
James Kuszmaul71a81932020-12-15 21:08:01 -0800123WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
124 : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
125
126WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
127 : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
128
129WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
130 int buffer_size)
131 : epoll_(epoll),
132 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
133 ::seasocks::Logger::Level::Info)),
134 websocket_handler_(
135 new WebsocketHandler(&server_, event_loop, buffer_size)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700136 CHECK(!global_epoll);
137 global_epoll = epoll;
138
139 re_fd_set_listen_callback(&ReFdListen);
140 re_fd_set_close_callback(&ReFdClose);
141
142 epoll->BeforeWait([]() {
143 const uint64_t to = tmr_next_timeout(tmrl_get());
144 if (to != 0) {
Austin Schuh0c8dd362021-10-30 10:23:25 -0700145 VLOG(3) << "Next timeout " << to;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700146 }
147 // Note: this only works because we are spinning on it...
148 // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
149 // for handling tmr.
150 tmr_poll(tmrl_get());
151 });
152
James Kuszmaul71a81932020-12-15 21:08:01 -0800153 server_.addWebSocketHandler("/ws", websocket_handler_);
James Kuszmaul1e95bed2021-01-09 21:02:49 -0800154 CHECK(server_.startListening(FLAGS_proxy_port));
James Kuszmaul71a81932020-12-15 21:08:01 -0800155
156 epoll->OnReadable(server_.fd(), [this]() {
157 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
158 });
159
160 if (&internal_epoll_ == epoll) {
161 TimerHandler *const timer = event_loop->AddTimer([this]() {
162 // Run the epoll poller until there are no more events (if we are being
163 // backed by a shm event loop, there won't be anything registered to
164 // internal_epoll_ and this will just return false).
165 // We just deal with clearing all the epoll events using a simulated
166 // timer. This does mean that we will spin rather than actually sleeping
167 // in any coherent manner, which will be particularly noticeable when past
168 // the end of processing other events.
169 while (internal_epoll_.Poll(false)) {
170 continue;
171 }
172 });
173
174 event_loop->OnRun([timer, event_loop]() {
175 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
176 });
177 }
178}
179
180WebProxy::~WebProxy() {
181 epoll_->DeleteFd(server_.fd());
182 server_.terminate();
183 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700184 CHECK(global_epoll == epoll_);
185 global_epoll = nullptr;
James Kuszmaul71a81932020-12-15 21:08:01 -0800186}
187
Alex Perry5f474f22020-02-01 12:14:24 -0800188void Subscriber::RunIteration() {
James Kuszmaul71a81932020-12-15 21:08:01 -0800189 if (channels_.empty() && buffer_size_ == 0) {
Alex Perry5f474f22020-02-01 12:14:24 -0800190 return;
191 }
192
James Kuszmaul71a81932020-12-15 21:08:01 -0800193 while (fetcher_->FetchNext()) {
194 // If we aren't building up a buffer, short-circuit the FetchNext().
195 if (buffer_size_ == 0) {
196 fetcher_->Fetch();
197 }
198 Message message;
199 message.index = fetcher_->context().queue_index;
200 VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
201 << "packets";
202 for (int packet_index = 0;
203 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
Austin Schuhd16ef442021-04-25 14:44:42 -0700204 // Pack directly into the mbuffer. This is admittedly a bit painful.
205 const size_t packet_size =
206 PackedMessageSize(fetcher_->context(), packet_index);
207 struct mbuf *mbuffer = mbuf_alloc(packet_size);
Alex Perry5f474f22020-02-01 12:14:24 -0800208
Austin Schuhd16ef442021-04-25 14:44:42 -0700209 {
210 // Wrap a pre-allocated builder around the mbuffer.
211 PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
212 flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
213 flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
214 &fbb, fetcher_->context(), channel_index_, packet_index);
215 fbb.Finish(message_offset);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700216
Austin Schuhd16ef442021-04-25 14:44:42 -0700217 // Now, the flatbuffer is built from the back to the front. So any
218 // extra memory will be at the front. Setup the end and start pointers
219 // on the mbuf.
220 mbuf_set_end(mbuffer, packet_size);
221 mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
222 }
Alex Perry5f474f22020-02-01 12:14:24 -0800223
James Kuszmaul71a81932020-12-15 21:08:01 -0800224 message.data.emplace_back(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700225 std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
James Kuszmaul71a81932020-12-15 21:08:01 -0800226 }
227 message_buffer_.push_back(std::move(message));
James Kuszmaul45139e62021-09-11 11:41:03 -0700228 // If we aren't keeping a buffer, then we should only do one iteration of
229 // the while loop--otherwise, if additional messages arrive between the
230 // first FetchNext() and the second iteration then we can end up behaving
231 // poorly (since we do a Fetch() when buffer_size_ == 0).
232 if (buffer_size_ == 0) {
233 break;
234 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800235 }
236 for (auto &conn : channels_) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700237 std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first;
James Kuszmaul71a81932020-12-15 21:08:01 -0800238 ChannelInformation *channel_data = &conn.second;
239 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
240 SkipToLastMessage(channel_data);
241 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700242 std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
243 while (buffer) {
244 // TODO(austin): This is a nop so we just buffer forever. Fix this when
245 // we care.
James Kuszmaul71a81932020-12-15 21:08:01 -0800246 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800247 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800248 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800249 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700250
251 rtc_channel->Send(buffer.get());
James Kuszmaul71a81932020-12-15 21:08:01 -0800252 buffer = NextBuffer(channel_data);
253 }
254 }
255 if (buffer_size_ >= 0) {
256 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
257 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800258 }
259 }
260}
261
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700262void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
263 TransferMethod transfer_method) {
264 VLOG(1) << "Adding listener for " << data_channel.get();
James Kuszmaul71a81932020-12-15 21:08:01 -0800265 ChannelInformation info;
266 info.transfer_method = transfer_method;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700267
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700268 // If we aren't keeping a buffer and there are no existing listeners, call
269 // Fetch() to avoid falling behind on future calls to FetchNext().
270 if (channels_.empty() && buffer_size_ == 0) {
271 fetcher_->Fetch();
272 }
273
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700274 channels_.emplace(data_channel, info);
James Kuszmaul71a81932020-12-15 21:08:01 -0800275}
276
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700277void Subscriber::RemoveListener(
278 std::shared_ptr<ScopedDataChannel> data_channel) {
279 VLOG(1) << "Removing listener for " << data_channel.get();
280 channels_.erase(data_channel);
281}
282
283std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
284 ChannelInformation *channel) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800285 CHECK_NOTNULL(channel);
286 if (message_buffer_.empty()) {
287 return nullptr;
288 }
289 const uint32_t earliest_index = message_buffer_.front().index;
290 const uint32_t latest_index = message_buffer_.back().index;
291 const bool fell_behind = channel->current_queue_index < earliest_index;
292 if (fell_behind) {
293 channel->current_queue_index = earliest_index;
294 channel->next_packet_number = 0;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700295 return message_buffer_.front().data.at(0);
James Kuszmaul71a81932020-12-15 21:08:01 -0800296 }
297 if (channel->current_queue_index > latest_index) {
298 // We are still waiting on the next message to appear; return.
299 return nullptr;
300 }
301 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
302 << "Inconsistent queue indices.";
303 const size_t packets_in_message =
304 message_buffer_[channel->current_queue_index - earliest_index]
305 .data.size();
306 CHECK_LT(0u, packets_in_message);
307 CHECK_LT(channel->next_packet_number, packets_in_message);
308
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700309 std::shared_ptr<struct mbuf> original_data =
310 message_buffer_[channel->current_queue_index - earliest_index].data.at(
James Kuszmaul71a81932020-12-15 21:08:01 -0800311 channel->next_packet_number);
312
313 ++channel->next_packet_number;
314 if (channel->next_packet_number == packets_in_message) {
315 ++channel->current_queue_index;
316 channel->next_packet_number = 0;
317 }
318
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700319 // Trigger a copy of the mbuf without copying the data.
320 return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
321 mem_deref);
James Kuszmaul71a81932020-12-15 21:08:01 -0800322}
323
324void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
325 CHECK_NOTNULL(channel);
326 if (message_buffer_.empty() ||
327 channel->current_queue_index == message_buffer_.back().index) {
328 return;
329 }
330 channel->current_queue_index = message_buffer_.back().index;
331 channel->next_packet_number = 0;
332}
333
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700334ApplicationConnection::ApplicationConnection(
335 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
Alex Perry5f474f22020-02-01 12:14:24 -0800336 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800337 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
338 const EventLoop *event_loop)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700339 : server_(server),
340 sock_(sock),
Alex Perry5f474f22020-02-01 12:14:24 -0800341 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800342 config_headers_(PackBuffer(config.span())),
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700343 event_loop_(event_loop) {
344 connection_.set_on_negotiation_needed([]() {
345 VLOG(1) << "Negotiation needed, not offering so not creating offer.";
346 });
Alex Perryb3b50792020-01-18 16:13:45 -0800347
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700348 connection_.set_on_local_candidate(
349 [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
350 char const *const url) { LocalCandidate(candidate, url); });
351
352 connection_.set_on_data_channel(
353 [this](std::shared_ptr<ScopedDataChannel> channel) {
354 OnDataChannel(channel);
355 });
356
357 connection_.Open();
358}
359
360ApplicationConnection::~ApplicationConnection() {
361 for (auto &it : channels_) {
362 it.second.data_channel->Close();
363 it.second.data_channel = nullptr;
364 }
365
366 // Eh, we are done, tell the channel to shut down. If we didn't, it would
367 // just hang around until the connection closes, which is rather shortly
368 // after.
369 if (channel_) {
370 channel_->Close();
371 }
372}
373
374void ApplicationConnection::OnSdp(const char *sdp) {
375 struct rawrtc_peer_connection_description *remote_description = NULL;
376
377 auto error = rawrtc_peer_connection_description_create(
378 &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
379 if (error) {
380 LOG(WARNING) << "Cannot parse remote description: "
381 << rawrtc_code_to_str(error);
James Kuszmaul48671362020-12-24 13:54:16 -0800382 return;
383 }
Alex Perryb3b50792020-01-18 16:13:45 -0800384
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700385 CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
386 connection_.connection(), remote_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800387
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700388 struct rawrtc_peer_connection_description *local_description;
389 CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
390 connection_.connection()));
391 CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
392 connection_.connection(), local_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800393
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700394 enum rawrtc_sdp_type type;
395 char *local_sdp = nullptr;
396 // Get SDP type & the SDP itself
397 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
398 &type, local_description));
399 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
400 local_description));
James Kuszmaul8d928d02020-12-25 17:47:49 -0800401
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700402 flatbuffers::FlatBufferBuilder fbb;
Alex Perryb3b50792020-01-18 16:13:45 -0800403 flatbuffers::Offset<WebSocketSdp> sdp_fb =
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700404 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800405 flatbuffers::Offset<WebSocketMessage> answer_message =
406 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700407
408 VLOG(1) << aos::FlatbufferToJson(
409 flatbuffers::GetTemporaryPointer(fbb, answer_message));
Alex Perryb3b50792020-01-18 16:13:45 -0800410 fbb.Finish(answer_message);
411
412 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700413 mem_deref(local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800414}
415
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700416void ApplicationConnection::OnIce(const WebSocketIce *ice) {
417 if (!ice->has_candidate()) {
418 return;
419 }
Brian Silverman225c5072021-11-17 19:56:31 -0800420 uint8_t sdp_m_line_index = ice->sdp_m_line_index();
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700421
422 struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
423 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
Brian Silverman225c5072021-11-17 19:56:31 -0800424 &ice_candidate, ice->candidate()->c_str(), ice->sdp_mid()->c_str(),
425 &sdp_m_line_index, nullptr));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700426
427 rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
428 ice_candidate);
429
430 mem_deref(ice_candidate);
431}
432
433void ApplicationConnection::LocalCandidate(
434 struct rawrtc_peer_connection_ice_candidate *const candidate,
435 char const *const url) {
436 struct rawrtc_ice_candidate *ortc_candidate = nullptr;
437 if (candidate) {
438 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
439 &ortc_candidate, candidate));
440
441 flatbuffers::FlatBufferBuilder fbb;
442 char *sdpp = nullptr;
443 CHECK_RAWRTC(
444 rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
445 char *midp = nullptr;
446 CHECK_RAWRTC(
447 rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
448
449 uint8_t media_line_index;
450 enum rawrtc_code error =
451 rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
452 &media_line_index, candidate);
453
454 flatbuffers::Offset<flatbuffers::String> sdpp_offset =
455 fbb.CreateString(sdpp);
456 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
457 fbb.CreateString(midp);
458
459 WebSocketIce::Builder web_socket_ice_builder(fbb);
460
461 web_socket_ice_builder.add_candidate(sdpp_offset);
Brian Silverman225c5072021-11-17 19:56:31 -0800462 web_socket_ice_builder.add_sdp_mid(sdp_mid_offset);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700463
464 if (error == RAWRTC_CODE_SUCCESS) {
Brian Silverman225c5072021-11-17 19:56:31 -0800465 web_socket_ice_builder.add_sdp_m_line_index(media_line_index);
James Kuszmaul1ec74432020-07-30 20:26:45 -0700466 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700467 flatbuffers::Offset<WebSocketIce> ice_offset =
468 web_socket_ice_builder.Finish();
469
470 flatbuffers::Offset<WebSocketMessage> ice_message =
471 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
472 VLOG(1) << url << ": "
473 << aos::FlatbufferToJson(
474 flatbuffers::GetTemporaryPointer(fbb, ice_message));
475 fbb.Finish(ice_message);
476
477 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
478
479 mem_deref(sdpp);
480 mem_deref(midp);
Alex Perry5f474f22020-02-01 12:14:24 -0800481 }
482}
483
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700484void ApplicationConnection::OnDataChannel(
485 std::shared_ptr<ScopedDataChannel> channel) {
486 if (channel->label() == std::string_view("signalling")) {
487 CHECK(!channel_);
488 channel_ = channel;
489
490 channel_->set_on_message(
491 [this](struct mbuf *const buffer,
492 const enum rawrtc_data_channel_message_flag flags) {
493 HandleSignallingData(buffer, flags);
494 });
495
496 channel_->set_on_open([this]() {
497 for (const auto &header : config_headers_) {
498 channel_->Send(header.buffer());
499 }
500 });
501
502 channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
503
504 // Register an on_close callback which does nothing but keeps channel alive
505 // until it is done. This keeps the memory around until rawrtc can finish
506 // calling the close callback.
507 channel_->set_on_close([channel]() {});
508 } else {
509 channel_->set_on_close([channel]() {});
510 channel->Close();
511 }
512}
513
514void ApplicationConnection::HandleSignallingData(
515 struct mbuf *const
516 buffer, // nullable (in case partial delivery has been requested)
517 const enum rawrtc_data_channel_message_flag /*flags*/) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800518 FlatbufferSpan<SubscriberRequest> message(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700519 {mbuf_buf(buffer), mbuf_get_left(buffer)});
James Kuszmaul71a81932020-12-15 21:08:01 -0800520 if (!message.Verify()) {
521 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
522 return;
523 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700524 VLOG(1) << "Got a subscription message "
James Kuszmaul71a81932020-12-15 21:08:01 -0800525 << aos::FlatbufferToJson(&message.message());
526 if (!message.message().has_channels_to_transfer()) {
527 LOG(ERROR) << "No channels requested for transfer.";
528 return;
529 }
Alex Perry5f474f22020-02-01 12:14:24 -0800530
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700531 // The client each time sends a full list of everything it wants to be
532 // subscribed to. It is our responsibility to remove channels which aren't
533 // in that list and add ones which need to be.
534 //
535 // Start by clearing a tracking bit on each channel. This takes O(number of
536 // open channels), which should be small.
537 //
538 // Then open any new channels. For any we visit which are already open,
539 // don't update those.
540 //
541 // Finally, iterate over the channel list and purge anything which we didn't
542 // touch.
543 for (auto &it : channels_) {
544 it.second.requested = false;
545 }
546 for (auto channel_request : *message.message().channels_to_transfer()) {
547 const Channel *channel = channel_request->channel();
548 if (channel == nullptr) {
549 LOG(ERROR) << "Got unpopulated channel.";
550 continue;
Alex Perry5f474f22020-02-01 12:14:24 -0800551 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700552 const TransferMethod transfer_method = channel_request->method();
553 // Call GetChannel() before comparing the channel name/type to each
554 // subscriber. This allows us to resolve any node or application
555 // specific mappings.
556 const Channel *comparison_channel =
557 configuration::GetChannel(event_loop_->configuration(), channel,
558 event_loop_->name(), event_loop_->node());
559 if (comparison_channel == nullptr) {
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700560 LOG(ERROR) << "Channel does not exist: "
561 << configuration::StrippedChannelToString(channel);
562 continue;
563 }
564 if (!configuration::ChannelIsReadableOnNode(comparison_channel,
565 event_loop_->node())) {
566 LOG(ERROR) << "Channel not available on node "
567 << event_loop_->node()->name()->string_view() << ": "
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700568 << configuration::StrippedChannelToString(channel);
569 continue;
570 }
571
572 size_t channel_index = configuration::ChannelIndex(
573 event_loop_->configuration(), comparison_channel);
574
575 auto it = channels_.find(channel_index);
576 if (it == channels_.end()) {
577 std::shared_ptr<ScopedDataChannel> data_channel =
578 std::make_shared<ScopedDataChannel>();
579
580 std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
581
582 data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
583 channel_index]() {
584 std::shared_ptr<ScopedDataChannel> data_channel =
585 data_channel_weak_ptr.lock();
586 CHECK(data_channel) << ": Subscriber got destroyed before we started.";
587 // Weak ptr inside the subscriber so we don't have a circular
588 // reference. AddListener will close it.
589 subscribers_[channel_index]->AddListener(data_channel, transfer_method);
590 });
591
592 Subscriber *subscriber = subscribers_[channel_index].get();
593 data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
594 std::shared_ptr<ScopedDataChannel> data_channel =
595 data_channel_weak_ptr.lock();
596 CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
597 subscriber->RemoveListener(data_channel);
598 });
599
600 data_channel->Open(
601 connection_.connection(),
602 absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
603
604 auto pair = channels_.insert({channel_index, {data_channel, true}});
605 it = pair.first;
606 }
607
608 it->second.requested = true;
609
610 VLOG(1) << "Subscribe to: " << channel->type()->str();
611 }
612
613 for (auto &it : channels_) {
614 if (!it.second.requested) {
615 it.second.data_channel->Close();
616 it.second.data_channel = nullptr;
Alex Perry5f474f22020-02-01 12:14:24 -0800617 }
618 }
Alex Perryb3b50792020-01-18 16:13:45 -0800619}
620
621} // namespace web_proxy
622} // namespace aos