blob: 380697366164fa3735d1faf1d1200aac67f74492 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08007#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -08008#include "glog/logging.h"
James Kuszmaul48671362020-12-24 13:54:16 -08009#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080010
Austin Schuh52e5e3a2021-04-24 22:30:02 -070011extern "C" {
12#include <rawrtc.h>
13
14#define DEBUG_LEVEL 7
15#define DEBUG_MODULE "web-proxy"
16#include <re_dbg.h>
17struct list *tmrl_get(void);
18}
19
Austin Schuhbbc3df32021-10-29 23:06:39 -070020DEFINE_int32(proxy_port, 1180, "Port to use for the web proxy server.");
James Kuszmaul1e95bed2021-01-09 21:02:49 -080021
Alex Perryb3b50792020-01-18 16:13:45 -080022namespace aos {
23namespace web_proxy {
James Kuszmaul7ad91522020-09-01 19:15:35 -070024WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul71a81932020-12-15 21:08:01 -080025 aos::EventLoop *event_loop, int buffer_size)
James Kuszmaul7ad91522020-09-01 19:15:35 -070026 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080027 config_(aos::CopyFlatBuffer(event_loop->configuration())),
28 event_loop_(event_loop) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070029 if (VLOG_IS_ON(2)) {
30 dbg_init(DBG_DEBUG, DBG_ALL);
31 }
32 CHECK_RAWRTC(rawrtc_init(true));
33
James Kuszmaul48671362020-12-24 13:54:16 -080034 // We need to reference findEmbeddedContent() to make the linker happy...
35 findEmbeddedContent("");
Austin Schuh52e5e3a2021-04-24 22:30:02 -070036 const aos::Node *self = event_loop_->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070037
Austin Schuh52e5e3a2021-04-24 22:30:02 -070038 subscribers_.reserve(event_loop_->configuration()->channels()->size());
39 for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
40 auto channel = event_loop_->configuration()->channels()->Get(i);
James Kuszmaul7ad91522020-09-01 19:15:35 -070041 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070042 auto fetcher = event_loop_->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080043 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
44 std::move(fetcher), i, buffer_size));
Austin Schuh52e5e3a2021-04-24 22:30:02 -070045 } else {
46 subscribers_.emplace_back(nullptr);
James Kuszmaul7ad91522020-09-01 19:15:35 -070047 }
48 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -070049 TimerHandler *const timer = event_loop_->AddTimer([this]() {
James Kuszmaul7ad91522020-09-01 19:15:35 -070050 for (auto &subscriber : subscribers_) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070051 if (subscriber) subscriber->RunIteration();
James Kuszmaul7ad91522020-09-01 19:15:35 -070052 }
53 });
54
Austin Schuh52e5e3a2021-04-24 22:30:02 -070055 event_loop_->OnRun([this, timer]() {
56 timer->Setup(event_loop_->monotonic_now(), std::chrono::milliseconds(100));
James Kuszmaul7ad91522020-09-01 19:15:35 -070057 });
58}
Alex Perryb3b50792020-01-18 16:13:45 -080059
60void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070061 std::unique_ptr<ApplicationConnection> connection =
62 std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
63 config_, event_loop_);
64
65 connections_.insert({sock, std::move(connection)});
Alex Perryb3b50792020-01-18 16:13:45 -080066}
67
68void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
69 size_t size) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070070 const FlatbufferSpan<WebSocketMessage> message({data, size});
71 if (!message.Verify()) {
72 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
73 return;
74 }
75 VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
76 switch (message.message().payload_type()) {
77 case Payload::WebSocketSdp: {
78 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
79 if (offer->type() != SdpType::OFFER) {
80 LOG(WARNING) << "Got the wrong sdp type from client";
81 break;
82 }
83 const flatbuffers::String *sdp = offer->payload();
84 connections_[sock]->OnSdp(sdp->c_str());
85 break;
86 }
87 case Payload::WebSocketIce: {
88 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
89 connections_[sock]->OnIce(ice);
90 break;
91 }
92 default: { break; }
93 }
Alex Perryb3b50792020-01-18 16:13:45 -080094}
95
96void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
97 connections_.erase(sock);
98}
99
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700100// Global epoll pointer
101static aos::internal::EPoll *global_epoll = nullptr;
102
103static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
104 if (flags & 0x1) {
105 global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
106 }
107 if (flags & 0x2) {
108 global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
109 }
110 if (flags & 0x4) {
111 global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
112 }
113 return 0;
114}
115
116static void ReFdClose(int fd) {
117 CHECK(global_epoll != nullptr);
118 global_epoll->DeleteFd(fd);
119}
120
James Kuszmaul71a81932020-12-15 21:08:01 -0800121WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
122 : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
123
124WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
125 : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
126
127WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
128 int buffer_size)
129 : epoll_(epoll),
130 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
131 ::seasocks::Logger::Level::Info)),
132 websocket_handler_(
133 new WebsocketHandler(&server_, event_loop, buffer_size)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700134 CHECK(!global_epoll);
135 global_epoll = epoll;
136
137 re_fd_set_listen_callback(&ReFdListen);
138 re_fd_set_close_callback(&ReFdClose);
139
140 epoll->BeforeWait([]() {
141 const uint64_t to = tmr_next_timeout(tmrl_get());
142 if (to != 0) {
Austin Schuh0c8dd362021-10-30 10:23:25 -0700143 VLOG(3) << "Next timeout " << to;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700144 }
145 // Note: this only works because we are spinning on it...
146 // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
147 // for handling tmr.
148 tmr_poll(tmrl_get());
149 });
150
James Kuszmaul71a81932020-12-15 21:08:01 -0800151 server_.addWebSocketHandler("/ws", websocket_handler_);
James Kuszmaul1e95bed2021-01-09 21:02:49 -0800152 CHECK(server_.startListening(FLAGS_proxy_port));
James Kuszmaul71a81932020-12-15 21:08:01 -0800153
154 epoll->OnReadable(server_.fd(), [this]() {
155 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
156 });
157
158 if (&internal_epoll_ == epoll) {
159 TimerHandler *const timer = event_loop->AddTimer([this]() {
160 // Run the epoll poller until there are no more events (if we are being
161 // backed by a shm event loop, there won't be anything registered to
162 // internal_epoll_ and this will just return false).
163 // We just deal with clearing all the epoll events using a simulated
164 // timer. This does mean that we will spin rather than actually sleeping
165 // in any coherent manner, which will be particularly noticeable when past
166 // the end of processing other events.
167 while (internal_epoll_.Poll(false)) {
168 continue;
169 }
170 });
171
172 event_loop->OnRun([timer, event_loop]() {
173 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
174 });
175 }
176}
177
178WebProxy::~WebProxy() {
179 epoll_->DeleteFd(server_.fd());
180 server_.terminate();
181 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700182 CHECK(global_epoll == epoll_);
183 global_epoll = nullptr;
James Kuszmaul71a81932020-12-15 21:08:01 -0800184}
185
Alex Perry5f474f22020-02-01 12:14:24 -0800186void Subscriber::RunIteration() {
James Kuszmaul71a81932020-12-15 21:08:01 -0800187 if (channels_.empty() && buffer_size_ == 0) {
Alex Perry5f474f22020-02-01 12:14:24 -0800188 return;
189 }
190
James Kuszmaul71a81932020-12-15 21:08:01 -0800191 while (fetcher_->FetchNext()) {
192 // If we aren't building up a buffer, short-circuit the FetchNext().
193 if (buffer_size_ == 0) {
194 fetcher_->Fetch();
195 }
196 Message message;
197 message.index = fetcher_->context().queue_index;
198 VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
199 << "packets";
200 for (int packet_index = 0;
201 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
Austin Schuhd16ef442021-04-25 14:44:42 -0700202 // Pack directly into the mbuffer. This is admittedly a bit painful.
203 const size_t packet_size =
204 PackedMessageSize(fetcher_->context(), packet_index);
205 struct mbuf *mbuffer = mbuf_alloc(packet_size);
Alex Perry5f474f22020-02-01 12:14:24 -0800206
Austin Schuhd16ef442021-04-25 14:44:42 -0700207 {
208 // Wrap a pre-allocated builder around the mbuffer.
209 PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
210 flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
211 flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
212 &fbb, fetcher_->context(), channel_index_, packet_index);
213 fbb.Finish(message_offset);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700214
Austin Schuhd16ef442021-04-25 14:44:42 -0700215 // Now, the flatbuffer is built from the back to the front. So any
216 // extra memory will be at the front. Setup the end and start pointers
217 // on the mbuf.
218 mbuf_set_end(mbuffer, packet_size);
219 mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
220 }
Alex Perry5f474f22020-02-01 12:14:24 -0800221
James Kuszmaul71a81932020-12-15 21:08:01 -0800222 message.data.emplace_back(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700223 std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
James Kuszmaul71a81932020-12-15 21:08:01 -0800224 }
225 message_buffer_.push_back(std::move(message));
James Kuszmaul45139e62021-09-11 11:41:03 -0700226 // If we aren't keeping a buffer, then we should only do one iteration of
227 // the while loop--otherwise, if additional messages arrive between the
228 // first FetchNext() and the second iteration then we can end up behaving
229 // poorly (since we do a Fetch() when buffer_size_ == 0).
230 if (buffer_size_ == 0) {
231 break;
232 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800233 }
234 for (auto &conn : channels_) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700235 std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first;
James Kuszmaul71a81932020-12-15 21:08:01 -0800236 ChannelInformation *channel_data = &conn.second;
237 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
238 SkipToLastMessage(channel_data);
239 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700240 std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
241 while (buffer) {
242 // TODO(austin): This is a nop so we just buffer forever. Fix this when
243 // we care.
James Kuszmaul71a81932020-12-15 21:08:01 -0800244 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800245 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800246 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800247 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700248
249 rtc_channel->Send(buffer.get());
James Kuszmaul71a81932020-12-15 21:08:01 -0800250 buffer = NextBuffer(channel_data);
251 }
252 }
253 if (buffer_size_ >= 0) {
254 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
255 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800256 }
257 }
258}
259
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700260void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
261 TransferMethod transfer_method) {
262 VLOG(1) << "Adding listener for " << data_channel.get();
James Kuszmaul71a81932020-12-15 21:08:01 -0800263 ChannelInformation info;
264 info.transfer_method = transfer_method;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700265
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700266 // If we aren't keeping a buffer and there are no existing listeners, call
267 // Fetch() to avoid falling behind on future calls to FetchNext().
268 if (channels_.empty() && buffer_size_ == 0) {
269 fetcher_->Fetch();
270 }
271
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700272 channels_.emplace(data_channel, info);
James Kuszmaul71a81932020-12-15 21:08:01 -0800273}
274
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700275void Subscriber::RemoveListener(
276 std::shared_ptr<ScopedDataChannel> data_channel) {
277 VLOG(1) << "Removing listener for " << data_channel.get();
278 channels_.erase(data_channel);
279}
280
281std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
282 ChannelInformation *channel) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800283 CHECK_NOTNULL(channel);
284 if (message_buffer_.empty()) {
285 return nullptr;
286 }
287 const uint32_t earliest_index = message_buffer_.front().index;
288 const uint32_t latest_index = message_buffer_.back().index;
289 const bool fell_behind = channel->current_queue_index < earliest_index;
290 if (fell_behind) {
291 channel->current_queue_index = earliest_index;
292 channel->next_packet_number = 0;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700293 return message_buffer_.front().data.at(0);
James Kuszmaul71a81932020-12-15 21:08:01 -0800294 }
295 if (channel->current_queue_index > latest_index) {
296 // We are still waiting on the next message to appear; return.
297 return nullptr;
298 }
299 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
300 << "Inconsistent queue indices.";
301 const size_t packets_in_message =
302 message_buffer_[channel->current_queue_index - earliest_index]
303 .data.size();
304 CHECK_LT(0u, packets_in_message);
305 CHECK_LT(channel->next_packet_number, packets_in_message);
306
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700307 std::shared_ptr<struct mbuf> original_data =
308 message_buffer_[channel->current_queue_index - earliest_index].data.at(
James Kuszmaul71a81932020-12-15 21:08:01 -0800309 channel->next_packet_number);
310
311 ++channel->next_packet_number;
312 if (channel->next_packet_number == packets_in_message) {
313 ++channel->current_queue_index;
314 channel->next_packet_number = 0;
315 }
316
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700317 // Trigger a copy of the mbuf without copying the data.
318 return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
319 mem_deref);
James Kuszmaul71a81932020-12-15 21:08:01 -0800320}
321
322void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
323 CHECK_NOTNULL(channel);
324 if (message_buffer_.empty() ||
325 channel->current_queue_index == message_buffer_.back().index) {
326 return;
327 }
328 channel->current_queue_index = message_buffer_.back().index;
329 channel->next_packet_number = 0;
330}
331
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700332ApplicationConnection::ApplicationConnection(
333 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
Alex Perry5f474f22020-02-01 12:14:24 -0800334 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800335 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
336 const EventLoop *event_loop)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700337 : server_(server),
338 sock_(sock),
Alex Perry5f474f22020-02-01 12:14:24 -0800339 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800340 config_headers_(PackBuffer(config.span())),
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700341 event_loop_(event_loop) {
342 connection_.set_on_negotiation_needed([]() {
343 VLOG(1) << "Negotiation needed, not offering so not creating offer.";
344 });
Alex Perryb3b50792020-01-18 16:13:45 -0800345
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700346 connection_.set_on_local_candidate(
347 [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
348 char const *const url) { LocalCandidate(candidate, url); });
349
350 connection_.set_on_data_channel(
351 [this](std::shared_ptr<ScopedDataChannel> channel) {
352 OnDataChannel(channel);
353 });
354
355 connection_.Open();
356}
357
358ApplicationConnection::~ApplicationConnection() {
359 for (auto &it : channels_) {
360 it.second.data_channel->Close();
361 it.second.data_channel = nullptr;
362 }
363
364 // Eh, we are done, tell the channel to shut down. If we didn't, it would
365 // just hang around until the connection closes, which is rather shortly
366 // after.
367 if (channel_) {
368 channel_->Close();
369 }
370}
371
372void ApplicationConnection::OnSdp(const char *sdp) {
373 struct rawrtc_peer_connection_description *remote_description = NULL;
374
375 auto error = rawrtc_peer_connection_description_create(
376 &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
377 if (error) {
378 LOG(WARNING) << "Cannot parse remote description: "
379 << rawrtc_code_to_str(error);
James Kuszmaul48671362020-12-24 13:54:16 -0800380 return;
381 }
Alex Perryb3b50792020-01-18 16:13:45 -0800382
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700383 CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
384 connection_.connection(), remote_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800385
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700386 struct rawrtc_peer_connection_description *local_description;
387 CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
388 connection_.connection()));
389 CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
390 connection_.connection(), local_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800391
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700392 enum rawrtc_sdp_type type;
393 char *local_sdp = nullptr;
394 // Get SDP type & the SDP itself
395 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
396 &type, local_description));
397 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
398 local_description));
James Kuszmaul8d928d02020-12-25 17:47:49 -0800399
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700400 flatbuffers::FlatBufferBuilder fbb;
Alex Perryb3b50792020-01-18 16:13:45 -0800401 flatbuffers::Offset<WebSocketSdp> sdp_fb =
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700402 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800403 flatbuffers::Offset<WebSocketMessage> answer_message =
404 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700405
406 VLOG(1) << aos::FlatbufferToJson(
407 flatbuffers::GetTemporaryPointer(fbb, answer_message));
Alex Perryb3b50792020-01-18 16:13:45 -0800408 fbb.Finish(answer_message);
409
410 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700411 mem_deref(local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800412}
413
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700414void ApplicationConnection::OnIce(const WebSocketIce *ice) {
415 if (!ice->has_candidate()) {
416 return;
417 }
418 uint8_t sdpMLineIndex = ice->sdpMLineIndex();
419
420 struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
421 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
422 &ice_candidate, ice->candidate()->c_str(), ice->sdpMid()->c_str(),
423 &sdpMLineIndex, nullptr));
424
425 rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
426 ice_candidate);
427
428 mem_deref(ice_candidate);
429}
430
431void ApplicationConnection::LocalCandidate(
432 struct rawrtc_peer_connection_ice_candidate *const candidate,
433 char const *const url) {
434 struct rawrtc_ice_candidate *ortc_candidate = nullptr;
435 if (candidate) {
436 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
437 &ortc_candidate, candidate));
438
439 flatbuffers::FlatBufferBuilder fbb;
440 char *sdpp = nullptr;
441 CHECK_RAWRTC(
442 rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
443 char *midp = nullptr;
444 CHECK_RAWRTC(
445 rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
446
447 uint8_t media_line_index;
448 enum rawrtc_code error =
449 rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
450 &media_line_index, candidate);
451
452 flatbuffers::Offset<flatbuffers::String> sdpp_offset =
453 fbb.CreateString(sdpp);
454 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
455 fbb.CreateString(midp);
456
457 WebSocketIce::Builder web_socket_ice_builder(fbb);
458
459 web_socket_ice_builder.add_candidate(sdpp_offset);
460 web_socket_ice_builder.add_sdpMid(sdp_mid_offset);
461
462 if (error == RAWRTC_CODE_SUCCESS) {
463 web_socket_ice_builder.add_sdpMLineIndex(media_line_index);
James Kuszmaul1ec74432020-07-30 20:26:45 -0700464 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700465 flatbuffers::Offset<WebSocketIce> ice_offset =
466 web_socket_ice_builder.Finish();
467
468 flatbuffers::Offset<WebSocketMessage> ice_message =
469 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
470 VLOG(1) << url << ": "
471 << aos::FlatbufferToJson(
472 flatbuffers::GetTemporaryPointer(fbb, ice_message));
473 fbb.Finish(ice_message);
474
475 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
476
477 mem_deref(sdpp);
478 mem_deref(midp);
Alex Perry5f474f22020-02-01 12:14:24 -0800479 }
480}
481
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700482void ApplicationConnection::OnDataChannel(
483 std::shared_ptr<ScopedDataChannel> channel) {
484 if (channel->label() == std::string_view("signalling")) {
485 CHECK(!channel_);
486 channel_ = channel;
487
488 channel_->set_on_message(
489 [this](struct mbuf *const buffer,
490 const enum rawrtc_data_channel_message_flag flags) {
491 HandleSignallingData(buffer, flags);
492 });
493
494 channel_->set_on_open([this]() {
495 for (const auto &header : config_headers_) {
496 channel_->Send(header.buffer());
497 }
498 });
499
500 channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
501
502 // Register an on_close callback which does nothing but keeps channel alive
503 // until it is done. This keeps the memory around until rawrtc can finish
504 // calling the close callback.
505 channel_->set_on_close([channel]() {});
506 } else {
507 channel_->set_on_close([channel]() {});
508 channel->Close();
509 }
510}
511
512void ApplicationConnection::HandleSignallingData(
513 struct mbuf *const
514 buffer, // nullable (in case partial delivery has been requested)
515 const enum rawrtc_data_channel_message_flag /*flags*/) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800516 FlatbufferSpan<SubscriberRequest> message(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700517 {mbuf_buf(buffer), mbuf_get_left(buffer)});
James Kuszmaul71a81932020-12-15 21:08:01 -0800518 if (!message.Verify()) {
519 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
520 return;
521 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700522 VLOG(1) << "Got a subscription message "
James Kuszmaul71a81932020-12-15 21:08:01 -0800523 << aos::FlatbufferToJson(&message.message());
524 if (!message.message().has_channels_to_transfer()) {
525 LOG(ERROR) << "No channels requested for transfer.";
526 return;
527 }
Alex Perry5f474f22020-02-01 12:14:24 -0800528
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700529 // The client each time sends a full list of everything it wants to be
530 // subscribed to. It is our responsibility to remove channels which aren't
531 // in that list and add ones which need to be.
532 //
533 // Start by clearing a tracking bit on each channel. This takes O(number of
534 // open channels), which should be small.
535 //
536 // Then open any new channels. For any we visit which are already open,
537 // don't update those.
538 //
539 // Finally, iterate over the channel list and purge anything which we didn't
540 // touch.
541 for (auto &it : channels_) {
542 it.second.requested = false;
543 }
544 for (auto channel_request : *message.message().channels_to_transfer()) {
545 const Channel *channel = channel_request->channel();
546 if (channel == nullptr) {
547 LOG(ERROR) << "Got unpopulated channel.";
548 continue;
Alex Perry5f474f22020-02-01 12:14:24 -0800549 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700550 const TransferMethod transfer_method = channel_request->method();
551 // Call GetChannel() before comparing the channel name/type to each
552 // subscriber. This allows us to resolve any node or application
553 // specific mappings.
554 const Channel *comparison_channel =
555 configuration::GetChannel(event_loop_->configuration(), channel,
556 event_loop_->name(), event_loop_->node());
557 if (comparison_channel == nullptr) {
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700558 LOG(ERROR) << "Channel does not exist: "
559 << configuration::StrippedChannelToString(channel);
560 continue;
561 }
562 if (!configuration::ChannelIsReadableOnNode(comparison_channel,
563 event_loop_->node())) {
564 LOG(ERROR) << "Channel not available on node "
565 << event_loop_->node()->name()->string_view() << ": "
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700566 << configuration::StrippedChannelToString(channel);
567 continue;
568 }
569
570 size_t channel_index = configuration::ChannelIndex(
571 event_loop_->configuration(), comparison_channel);
572
573 auto it = channels_.find(channel_index);
574 if (it == channels_.end()) {
575 std::shared_ptr<ScopedDataChannel> data_channel =
576 std::make_shared<ScopedDataChannel>();
577
578 std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
579
580 data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
581 channel_index]() {
582 std::shared_ptr<ScopedDataChannel> data_channel =
583 data_channel_weak_ptr.lock();
584 CHECK(data_channel) << ": Subscriber got destroyed before we started.";
585 // Weak ptr inside the subscriber so we don't have a circular
586 // reference. AddListener will close it.
587 subscribers_[channel_index]->AddListener(data_channel, transfer_method);
588 });
589
590 Subscriber *subscriber = subscribers_[channel_index].get();
591 data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
592 std::shared_ptr<ScopedDataChannel> data_channel =
593 data_channel_weak_ptr.lock();
594 CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
595 subscriber->RemoveListener(data_channel);
596 });
597
598 data_channel->Open(
599 connection_.connection(),
600 absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
601
602 auto pair = channels_.insert({channel_index, {data_channel, true}});
603 it = pair.first;
604 }
605
606 it->second.requested = true;
607
608 VLOG(1) << "Subscribe to: " << channel->type()->str();
609 }
610
611 for (auto &it : channels_) {
612 if (!it.second.requested) {
613 it.second.data_channel->Close();
614 it.second.data_channel = nullptr;
Alex Perry5f474f22020-02-01 12:14:24 -0800615 }
616 }
Alex Perryb3b50792020-01-18 16:13:45 -0800617}
618
619} // namespace web_proxy
620} // namespace aos