blob: 972b1d7093eee769cc513faa5cf023790cc2e257 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08007#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -08008#include "glog/logging.h"
James Kuszmaul48671362020-12-24 13:54:16 -08009#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080010
Austin Schuh52e5e3a2021-04-24 22:30:02 -070011extern "C" {
12#include <rawrtc.h>
13
14#define DEBUG_LEVEL 7
15#define DEBUG_MODULE "web-proxy"
16#include <re_dbg.h>
17struct list *tmrl_get(void);
18}
19
James Kuszmaul1e95bed2021-01-09 21:02:49 -080020DEFINE_int32(proxy_port, 8080, "Port to use for the web proxy server.");
21
Alex Perryb3b50792020-01-18 16:13:45 -080022namespace aos {
23namespace web_proxy {
James Kuszmaul7ad91522020-09-01 19:15:35 -070024WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul71a81932020-12-15 21:08:01 -080025 aos::EventLoop *event_loop, int buffer_size)
James Kuszmaul7ad91522020-09-01 19:15:35 -070026 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080027 config_(aos::CopyFlatBuffer(event_loop->configuration())),
28 event_loop_(event_loop) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070029 if (VLOG_IS_ON(2)) {
30 dbg_init(DBG_DEBUG, DBG_ALL);
31 }
32 CHECK_RAWRTC(rawrtc_init(true));
33
James Kuszmaul48671362020-12-24 13:54:16 -080034 // We need to reference findEmbeddedContent() to make the linker happy...
35 findEmbeddedContent("");
Austin Schuh52e5e3a2021-04-24 22:30:02 -070036 const aos::Node *self = event_loop_->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070037
Austin Schuh52e5e3a2021-04-24 22:30:02 -070038 subscribers_.reserve(event_loop_->configuration()->channels()->size());
39 for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
40 auto channel = event_loop_->configuration()->channels()->Get(i);
James Kuszmaul7ad91522020-09-01 19:15:35 -070041 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070042 auto fetcher = event_loop_->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080043 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
44 std::move(fetcher), i, buffer_size));
Austin Schuh52e5e3a2021-04-24 22:30:02 -070045 } else {
46 subscribers_.emplace_back(nullptr);
James Kuszmaul7ad91522020-09-01 19:15:35 -070047 }
48 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -070049 TimerHandler *const timer = event_loop_->AddTimer([this]() {
James Kuszmaul7ad91522020-09-01 19:15:35 -070050 for (auto &subscriber : subscribers_) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070051 if (subscriber) subscriber->RunIteration();
James Kuszmaul7ad91522020-09-01 19:15:35 -070052 }
53 });
54
Austin Schuh52e5e3a2021-04-24 22:30:02 -070055 event_loop_->OnRun([this, timer]() {
56 timer->Setup(event_loop_->monotonic_now(), std::chrono::milliseconds(100));
James Kuszmaul7ad91522020-09-01 19:15:35 -070057 });
58}
Alex Perryb3b50792020-01-18 16:13:45 -080059
60void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070061 std::unique_ptr<ApplicationConnection> connection =
62 std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
63 config_, event_loop_);
64
65 connections_.insert({sock, std::move(connection)});
Alex Perryb3b50792020-01-18 16:13:45 -080066}
67
68void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
69 size_t size) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070070 const FlatbufferSpan<WebSocketMessage> message({data, size});
71 if (!message.Verify()) {
72 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
73 return;
74 }
75 VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
76 switch (message.message().payload_type()) {
77 case Payload::WebSocketSdp: {
78 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
79 if (offer->type() != SdpType::OFFER) {
80 LOG(WARNING) << "Got the wrong sdp type from client";
81 break;
82 }
83 const flatbuffers::String *sdp = offer->payload();
84 connections_[sock]->OnSdp(sdp->c_str());
85 break;
86 }
87 case Payload::WebSocketIce: {
88 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
89 connections_[sock]->OnIce(ice);
90 break;
91 }
92 default: { break; }
93 }
Alex Perryb3b50792020-01-18 16:13:45 -080094}
95
96void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
97 connections_.erase(sock);
98}
99
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700100// Global epoll pointer
101static aos::internal::EPoll *global_epoll = nullptr;
102
103static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
104 if (flags & 0x1) {
105 global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
106 }
107 if (flags & 0x2) {
108 global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
109 }
110 if (flags & 0x4) {
111 global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
112 }
113 return 0;
114}
115
116static void ReFdClose(int fd) {
117 CHECK(global_epoll != nullptr);
118 global_epoll->DeleteFd(fd);
119}
120
James Kuszmaul71a81932020-12-15 21:08:01 -0800121WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
122 : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
123
124WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
125 : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
126
127WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
128 int buffer_size)
129 : epoll_(epoll),
130 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
131 ::seasocks::Logger::Level::Info)),
132 websocket_handler_(
133 new WebsocketHandler(&server_, event_loop, buffer_size)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700134 CHECK(!global_epoll);
135 global_epoll = epoll;
136
137 re_fd_set_listen_callback(&ReFdListen);
138 re_fd_set_close_callback(&ReFdClose);
139
140 epoll->BeforeWait([]() {
141 const uint64_t to = tmr_next_timeout(tmrl_get());
142 if (to != 0) {
143 VLOG(1) << "Next timeout " << to;
144 }
145 // Note: this only works because we are spinning on it...
146 // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
147 // for handling tmr.
148 tmr_poll(tmrl_get());
149 });
150
James Kuszmaul71a81932020-12-15 21:08:01 -0800151 server_.addWebSocketHandler("/ws", websocket_handler_);
James Kuszmaul1e95bed2021-01-09 21:02:49 -0800152 CHECK(server_.startListening(FLAGS_proxy_port));
James Kuszmaul71a81932020-12-15 21:08:01 -0800153
154 epoll->OnReadable(server_.fd(), [this]() {
155 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
156 });
157
158 if (&internal_epoll_ == epoll) {
159 TimerHandler *const timer = event_loop->AddTimer([this]() {
160 // Run the epoll poller until there are no more events (if we are being
161 // backed by a shm event loop, there won't be anything registered to
162 // internal_epoll_ and this will just return false).
163 // We just deal with clearing all the epoll events using a simulated
164 // timer. This does mean that we will spin rather than actually sleeping
165 // in any coherent manner, which will be particularly noticeable when past
166 // the end of processing other events.
167 while (internal_epoll_.Poll(false)) {
168 continue;
169 }
170 });
171
172 event_loop->OnRun([timer, event_loop]() {
173 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
174 });
175 }
176}
177
178WebProxy::~WebProxy() {
179 epoll_->DeleteFd(server_.fd());
180 server_.terminate();
181 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700182 CHECK(global_epoll == epoll_);
183 global_epoll = nullptr;
James Kuszmaul71a81932020-12-15 21:08:01 -0800184}
185
Alex Perry5f474f22020-02-01 12:14:24 -0800186void Subscriber::RunIteration() {
James Kuszmaul71a81932020-12-15 21:08:01 -0800187 if (channels_.empty() && buffer_size_ == 0) {
Alex Perry5f474f22020-02-01 12:14:24 -0800188 return;
189 }
190
James Kuszmaul71a81932020-12-15 21:08:01 -0800191 while (fetcher_->FetchNext()) {
192 // If we aren't building up a buffer, short-circuit the FetchNext().
193 if (buffer_size_ == 0) {
194 fetcher_->Fetch();
195 }
196 Message message;
197 message.index = fetcher_->context().queue_index;
198 VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
199 << "packets";
200 for (int packet_index = 0;
201 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
Austin Schuhd16ef442021-04-25 14:44:42 -0700202 // Pack directly into the mbuffer. This is admittedly a bit painful.
203 const size_t packet_size =
204 PackedMessageSize(fetcher_->context(), packet_index);
205 struct mbuf *mbuffer = mbuf_alloc(packet_size);
Alex Perry5f474f22020-02-01 12:14:24 -0800206
Austin Schuhd16ef442021-04-25 14:44:42 -0700207 {
208 // Wrap a pre-allocated builder around the mbuffer.
209 PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
210 flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
211 flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
212 &fbb, fetcher_->context(), channel_index_, packet_index);
213 fbb.Finish(message_offset);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700214
Austin Schuhd16ef442021-04-25 14:44:42 -0700215 // Now, the flatbuffer is built from the back to the front. So any
216 // extra memory will be at the front. Setup the end and start pointers
217 // on the mbuf.
218 mbuf_set_end(mbuffer, packet_size);
219 mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
220 }
Alex Perry5f474f22020-02-01 12:14:24 -0800221
James Kuszmaul71a81932020-12-15 21:08:01 -0800222 message.data.emplace_back(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700223 std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
James Kuszmaul71a81932020-12-15 21:08:01 -0800224 }
225 message_buffer_.push_back(std::move(message));
226 }
227 for (auto &conn : channels_) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700228 std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first;
James Kuszmaul71a81932020-12-15 21:08:01 -0800229 ChannelInformation *channel_data = &conn.second;
230 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
231 SkipToLastMessage(channel_data);
232 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700233 std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
234 while (buffer) {
235 // TODO(austin): This is a nop so we just buffer forever. Fix this when
236 // we care.
James Kuszmaul71a81932020-12-15 21:08:01 -0800237 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800238 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800239 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800240 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700241
242 rtc_channel->Send(buffer.get());
James Kuszmaul71a81932020-12-15 21:08:01 -0800243 buffer = NextBuffer(channel_data);
244 }
245 }
246 if (buffer_size_ >= 0) {
247 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
248 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800249 }
250 }
251}
252
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700253void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
254 TransferMethod transfer_method) {
255 VLOG(1) << "Adding listener for " << data_channel.get();
James Kuszmaul71a81932020-12-15 21:08:01 -0800256 ChannelInformation info;
257 info.transfer_method = transfer_method;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700258
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700259 // If we aren't keeping a buffer and there are no existing listeners, call
260 // Fetch() to avoid falling behind on future calls to FetchNext().
261 if (channels_.empty() && buffer_size_ == 0) {
262 fetcher_->Fetch();
263 }
264
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700265 channels_.emplace(data_channel, info);
James Kuszmaul71a81932020-12-15 21:08:01 -0800266}
267
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700268void Subscriber::RemoveListener(
269 std::shared_ptr<ScopedDataChannel> data_channel) {
270 VLOG(1) << "Removing listener for " << data_channel.get();
271 channels_.erase(data_channel);
272}
273
274std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
275 ChannelInformation *channel) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800276 CHECK_NOTNULL(channel);
277 if (message_buffer_.empty()) {
278 return nullptr;
279 }
280 const uint32_t earliest_index = message_buffer_.front().index;
281 const uint32_t latest_index = message_buffer_.back().index;
282 const bool fell_behind = channel->current_queue_index < earliest_index;
283 if (fell_behind) {
284 channel->current_queue_index = earliest_index;
285 channel->next_packet_number = 0;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700286 return message_buffer_.front().data.at(0);
James Kuszmaul71a81932020-12-15 21:08:01 -0800287 }
288 if (channel->current_queue_index > latest_index) {
289 // We are still waiting on the next message to appear; return.
290 return nullptr;
291 }
292 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
293 << "Inconsistent queue indices.";
294 const size_t packets_in_message =
295 message_buffer_[channel->current_queue_index - earliest_index]
296 .data.size();
297 CHECK_LT(0u, packets_in_message);
298 CHECK_LT(channel->next_packet_number, packets_in_message);
299
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700300 std::shared_ptr<struct mbuf> original_data =
301 message_buffer_[channel->current_queue_index - earliest_index].data.at(
James Kuszmaul71a81932020-12-15 21:08:01 -0800302 channel->next_packet_number);
303
304 ++channel->next_packet_number;
305 if (channel->next_packet_number == packets_in_message) {
306 ++channel->current_queue_index;
307 channel->next_packet_number = 0;
308 }
309
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700310 // Trigger a copy of the mbuf without copying the data.
311 return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
312 mem_deref);
James Kuszmaul71a81932020-12-15 21:08:01 -0800313}
314
315void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
316 CHECK_NOTNULL(channel);
317 if (message_buffer_.empty() ||
318 channel->current_queue_index == message_buffer_.back().index) {
319 return;
320 }
321 channel->current_queue_index = message_buffer_.back().index;
322 channel->next_packet_number = 0;
323}
324
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700325ApplicationConnection::ApplicationConnection(
326 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
Alex Perry5f474f22020-02-01 12:14:24 -0800327 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800328 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
329 const EventLoop *event_loop)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700330 : server_(server),
331 sock_(sock),
Alex Perry5f474f22020-02-01 12:14:24 -0800332 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800333 config_headers_(PackBuffer(config.span())),
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700334 event_loop_(event_loop) {
335 connection_.set_on_negotiation_needed([]() {
336 VLOG(1) << "Negotiation needed, not offering so not creating offer.";
337 });
Alex Perryb3b50792020-01-18 16:13:45 -0800338
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700339 connection_.set_on_local_candidate(
340 [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
341 char const *const url) { LocalCandidate(candidate, url); });
342
343 connection_.set_on_data_channel(
344 [this](std::shared_ptr<ScopedDataChannel> channel) {
345 OnDataChannel(channel);
346 });
347
348 connection_.Open();
349}
350
351ApplicationConnection::~ApplicationConnection() {
352 for (auto &it : channels_) {
353 it.second.data_channel->Close();
354 it.second.data_channel = nullptr;
355 }
356
357 // Eh, we are done, tell the channel to shut down. If we didn't, it would
358 // just hang around until the connection closes, which is rather shortly
359 // after.
360 if (channel_) {
361 channel_->Close();
362 }
363}
364
365void ApplicationConnection::OnSdp(const char *sdp) {
366 struct rawrtc_peer_connection_description *remote_description = NULL;
367
368 auto error = rawrtc_peer_connection_description_create(
369 &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
370 if (error) {
371 LOG(WARNING) << "Cannot parse remote description: "
372 << rawrtc_code_to_str(error);
James Kuszmaul48671362020-12-24 13:54:16 -0800373 return;
374 }
Alex Perryb3b50792020-01-18 16:13:45 -0800375
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700376 CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
377 connection_.connection(), remote_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800378
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700379 struct rawrtc_peer_connection_description *local_description;
380 CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
381 connection_.connection()));
382 CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
383 connection_.connection(), local_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800384
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700385 enum rawrtc_sdp_type type;
386 char *local_sdp = nullptr;
387 // Get SDP type & the SDP itself
388 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
389 &type, local_description));
390 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
391 local_description));
James Kuszmaul8d928d02020-12-25 17:47:49 -0800392
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700393 flatbuffers::FlatBufferBuilder fbb;
Alex Perryb3b50792020-01-18 16:13:45 -0800394 flatbuffers::Offset<WebSocketSdp> sdp_fb =
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700395 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800396 flatbuffers::Offset<WebSocketMessage> answer_message =
397 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700398
399 VLOG(1) << aos::FlatbufferToJson(
400 flatbuffers::GetTemporaryPointer(fbb, answer_message));
Alex Perryb3b50792020-01-18 16:13:45 -0800401 fbb.Finish(answer_message);
402
403 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700404 mem_deref(local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800405}
406
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700407void ApplicationConnection::OnIce(const WebSocketIce *ice) {
408 if (!ice->has_candidate()) {
409 return;
410 }
411 uint8_t sdpMLineIndex = ice->sdpMLineIndex();
412
413 struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
414 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
415 &ice_candidate, ice->candidate()->c_str(), ice->sdpMid()->c_str(),
416 &sdpMLineIndex, nullptr));
417
418 rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
419 ice_candidate);
420
421 mem_deref(ice_candidate);
422}
423
424void ApplicationConnection::LocalCandidate(
425 struct rawrtc_peer_connection_ice_candidate *const candidate,
426 char const *const url) {
427 struct rawrtc_ice_candidate *ortc_candidate = nullptr;
428 if (candidate) {
429 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
430 &ortc_candidate, candidate));
431
432 flatbuffers::FlatBufferBuilder fbb;
433 char *sdpp = nullptr;
434 CHECK_RAWRTC(
435 rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
436 char *midp = nullptr;
437 CHECK_RAWRTC(
438 rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
439
440 uint8_t media_line_index;
441 enum rawrtc_code error =
442 rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
443 &media_line_index, candidate);
444
445 flatbuffers::Offset<flatbuffers::String> sdpp_offset =
446 fbb.CreateString(sdpp);
447 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
448 fbb.CreateString(midp);
449
450 WebSocketIce::Builder web_socket_ice_builder(fbb);
451
452 web_socket_ice_builder.add_candidate(sdpp_offset);
453 web_socket_ice_builder.add_sdpMid(sdp_mid_offset);
454
455 if (error == RAWRTC_CODE_SUCCESS) {
456 web_socket_ice_builder.add_sdpMLineIndex(media_line_index);
James Kuszmaul1ec74432020-07-30 20:26:45 -0700457 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700458 flatbuffers::Offset<WebSocketIce> ice_offset =
459 web_socket_ice_builder.Finish();
460
461 flatbuffers::Offset<WebSocketMessage> ice_message =
462 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
463 VLOG(1) << url << ": "
464 << aos::FlatbufferToJson(
465 flatbuffers::GetTemporaryPointer(fbb, ice_message));
466 fbb.Finish(ice_message);
467
468 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
469
470 mem_deref(sdpp);
471 mem_deref(midp);
Alex Perry5f474f22020-02-01 12:14:24 -0800472 }
473}
474
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700475void ApplicationConnection::OnDataChannel(
476 std::shared_ptr<ScopedDataChannel> channel) {
477 if (channel->label() == std::string_view("signalling")) {
478 CHECK(!channel_);
479 channel_ = channel;
480
481 channel_->set_on_message(
482 [this](struct mbuf *const buffer,
483 const enum rawrtc_data_channel_message_flag flags) {
484 HandleSignallingData(buffer, flags);
485 });
486
487 channel_->set_on_open([this]() {
488 for (const auto &header : config_headers_) {
489 channel_->Send(header.buffer());
490 }
491 });
492
493 channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
494
495 // Register an on_close callback which does nothing but keeps channel alive
496 // until it is done. This keeps the memory around until rawrtc can finish
497 // calling the close callback.
498 channel_->set_on_close([channel]() {});
499 } else {
500 channel_->set_on_close([channel]() {});
501 channel->Close();
502 }
503}
504
505void ApplicationConnection::HandleSignallingData(
506 struct mbuf *const
507 buffer, // nullable (in case partial delivery has been requested)
508 const enum rawrtc_data_channel_message_flag /*flags*/) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800509 FlatbufferSpan<SubscriberRequest> message(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700510 {mbuf_buf(buffer), mbuf_get_left(buffer)});
James Kuszmaul71a81932020-12-15 21:08:01 -0800511 if (!message.Verify()) {
512 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
513 return;
514 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700515 VLOG(1) << "Got a subscription message "
James Kuszmaul71a81932020-12-15 21:08:01 -0800516 << aos::FlatbufferToJson(&message.message());
517 if (!message.message().has_channels_to_transfer()) {
518 LOG(ERROR) << "No channels requested for transfer.";
519 return;
520 }
Alex Perry5f474f22020-02-01 12:14:24 -0800521
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700522 // The client each time sends a full list of everything it wants to be
523 // subscribed to. It is our responsibility to remove channels which aren't
524 // in that list and add ones which need to be.
525 //
526 // Start by clearing a tracking bit on each channel. This takes O(number of
527 // open channels), which should be small.
528 //
529 // Then open any new channels. For any we visit which are already open,
530 // don't update those.
531 //
532 // Finally, iterate over the channel list and purge anything which we didn't
533 // touch.
534 for (auto &it : channels_) {
535 it.second.requested = false;
536 }
537 for (auto channel_request : *message.message().channels_to_transfer()) {
538 const Channel *channel = channel_request->channel();
539 if (channel == nullptr) {
540 LOG(ERROR) << "Got unpopulated channel.";
541 continue;
Alex Perry5f474f22020-02-01 12:14:24 -0800542 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700543 const TransferMethod transfer_method = channel_request->method();
544 // Call GetChannel() before comparing the channel name/type to each
545 // subscriber. This allows us to resolve any node or application
546 // specific mappings.
547 const Channel *comparison_channel =
548 configuration::GetChannel(event_loop_->configuration(), channel,
549 event_loop_->name(), event_loop_->node());
550 if (comparison_channel == nullptr) {
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700551 LOG(ERROR) << "Channel does not exist: "
552 << configuration::StrippedChannelToString(channel);
553 continue;
554 }
555 if (!configuration::ChannelIsReadableOnNode(comparison_channel,
556 event_loop_->node())) {
557 LOG(ERROR) << "Channel not available on node "
558 << event_loop_->node()->name()->string_view() << ": "
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700559 << configuration::StrippedChannelToString(channel);
560 continue;
561 }
562
563 size_t channel_index = configuration::ChannelIndex(
564 event_loop_->configuration(), comparison_channel);
565
566 auto it = channels_.find(channel_index);
567 if (it == channels_.end()) {
568 std::shared_ptr<ScopedDataChannel> data_channel =
569 std::make_shared<ScopedDataChannel>();
570
571 std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
572
573 data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
574 channel_index]() {
575 std::shared_ptr<ScopedDataChannel> data_channel =
576 data_channel_weak_ptr.lock();
577 CHECK(data_channel) << ": Subscriber got destroyed before we started.";
578 // Weak ptr inside the subscriber so we don't have a circular
579 // reference. AddListener will close it.
580 subscribers_[channel_index]->AddListener(data_channel, transfer_method);
581 });
582
583 Subscriber *subscriber = subscribers_[channel_index].get();
584 data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
585 std::shared_ptr<ScopedDataChannel> data_channel =
586 data_channel_weak_ptr.lock();
587 CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
588 subscriber->RemoveListener(data_channel);
589 });
590
591 data_channel->Open(
592 connection_.connection(),
593 absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
594
595 auto pair = channels_.insert({channel_index, {data_channel, true}});
596 it = pair.first;
597 }
598
599 it->second.requested = true;
600
601 VLOG(1) << "Subscribe to: " << channel->type()->str();
602 }
603
604 for (auto &it : channels_) {
605 if (!it.second.requested) {
606 it.second.data_channel->Close();
607 it.second.data_channel = nullptr;
Alex Perry5f474f22020-02-01 12:14:24 -0800608 }
609 }
Alex Perryb3b50792020-01-18 16:13:45 -0800610}
611
612} // namespace web_proxy
613} // namespace aos