blob: 15ebb6654e51bb4f44b8feafaffb05ca1440dd0c [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
Philipp Schrader790cb542023-07-05 21:06:52 -07003#include "glog/logging.h"
4
Alex Perry5f474f22020-02-01 12:14:24 -08005#include "aos/flatbuffer_merge.h"
6#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08007#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08008#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08009#include "aos/seasocks/seasocks_logger.h"
James Kuszmaul48671362020-12-24 13:54:16 -080010#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080011
Austin Schuh52e5e3a2021-04-24 22:30:02 -070012extern "C" {
13#include <rawrtc.h>
14
15#define DEBUG_LEVEL 7
16#define DEBUG_MODULE "web-proxy"
17#include <re_dbg.h>
18struct list *tmrl_get(void);
19}
20
Austin Schuhbbc3df32021-10-29 23:06:39 -070021DEFINE_int32(proxy_port, 1180, "Port to use for the web proxy server.");
James Kuszmaula5822682021-12-23 18:39:28 -080022DEFINE_int32(pre_send_messages, 10000,
23 "Number of messages / queue to send to a client before waiting on "
24 "confirmation that the initial message was received. If set to "
25 "-1, will not throttle messages at all. This prevents a situation "
26 "where, when run on localhost, the large number of WebRTC packets "
27 "can overwhelm the browser and crash the webpage.");
James Kuszmaule524ed02023-12-09 13:21:03 -080028// Note: sometimes it appears that WebRTC buffer up and stop sending message
29// ack's back from the client page. It is not clear *why* WebRTC is doing this,
30// but since the only reason we use those ack's is to stop ourselves from
31// overloading the client webpage, this setting lets us fall back to just a
32// time-based rate-limit when we stop receiving acks.
33DEFINE_double(max_buffer_pause_sec, 0.1,
34 "If we have not received any ack's in this amount of time, we "
35 "start to continue sending messages.");
James Kuszmaul1e95bed2021-01-09 21:02:49 -080036
Stephan Pleinesf63bde82024-01-13 15:59:33 -080037namespace aos::web_proxy {
James Kuszmaul7ad91522020-09-01 19:15:35 -070038WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul1a29c082022-02-03 14:02:47 -080039 aos::EventLoop *event_loop,
40 StoreHistory store_history,
41 int per_channel_buffer_size_bytes)
James Kuszmaul7ad91522020-09-01 19:15:35 -070042 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080043 config_(aos::CopyFlatBuffer(event_loop->configuration())),
44 event_loop_(event_loop) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070045 if (VLOG_IS_ON(2)) {
46 dbg_init(DBG_DEBUG, DBG_ALL);
47 }
48 CHECK_RAWRTC(rawrtc_init(true));
49
James Kuszmaul48671362020-12-24 13:54:16 -080050 // We need to reference findEmbeddedContent() to make the linker happy...
51 findEmbeddedContent("");
Austin Schuh52e5e3a2021-04-24 22:30:02 -070052 const aos::Node *self = event_loop_->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070053
Austin Schuh52e5e3a2021-04-24 22:30:02 -070054 subscribers_.reserve(event_loop_->configuration()->channels()->size());
55 for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
56 auto channel = event_loop_->configuration()->channels()->Get(i);
James Kuszmaul7ad91522020-09-01 19:15:35 -070057 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070058 auto fetcher = event_loop_->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080059 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
James Kuszmaul1a29c082022-02-03 14:02:47 -080060 std::move(fetcher), i, store_history,
61 per_channel_buffer_size_bytes < 0
62 ? -1
63 : per_channel_buffer_size_bytes / channel->max_size()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -070064 } else {
65 subscribers_.emplace_back(nullptr);
James Kuszmaul7ad91522020-09-01 19:15:35 -070066 }
67 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -070068 TimerHandler *const timer = event_loop_->AddTimer([this]() {
James Kuszmaul7ad91522020-09-01 19:15:35 -070069 for (auto &subscriber : subscribers_) {
James Kuszmaul147b4c12022-07-13 20:35:27 -070070 if (subscriber) subscriber->RunIteration(recording_);
James Kuszmaul7ad91522020-09-01 19:15:35 -070071 }
72 });
73
Austin Schuh52e5e3a2021-04-24 22:30:02 -070074 event_loop_->OnRun([this, timer]() {
Philipp Schradera6712522023-07-05 20:25:11 -070075 timer->Schedule(event_loop_->monotonic_now(),
76 std::chrono::milliseconds(100));
James Kuszmaul7ad91522020-09-01 19:15:35 -070077 });
78}
Alex Perryb3b50792020-01-18 16:13:45 -080079
80void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070081 std::unique_ptr<ApplicationConnection> connection =
82 std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
83 config_, event_loop_);
84
85 connections_.insert({sock, std::move(connection)});
Alex Perryb3b50792020-01-18 16:13:45 -080086}
87
88void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
89 size_t size) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070090 const FlatbufferSpan<WebSocketMessage> message({data, size});
91 if (!message.Verify()) {
92 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
93 return;
94 }
95 VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
96 switch (message.message().payload_type()) {
97 case Payload::WebSocketSdp: {
98 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
99 if (offer->type() != SdpType::OFFER) {
100 LOG(WARNING) << "Got the wrong sdp type from client";
101 break;
102 }
103 const flatbuffers::String *sdp = offer->payload();
104 connections_[sock]->OnSdp(sdp->c_str());
105 break;
106 }
107 case Payload::WebSocketIce: {
108 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
109 connections_[sock]->OnIce(ice);
110 break;
111 }
Brian Silverman225c5072021-11-17 19:56:31 -0800112 default: {
113 break;
114 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700115 }
Alex Perryb3b50792020-01-18 16:13:45 -0800116}
117
118void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
119 connections_.erase(sock);
120}
121
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700122// Global epoll pointer
123static aos::internal::EPoll *global_epoll = nullptr;
124
125static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
126 if (flags & 0x1) {
127 global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
128 }
129 if (flags & 0x2) {
130 global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
131 }
132 if (flags & 0x4) {
133 global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
134 }
135 return 0;
136}
137
138static void ReFdClose(int fd) {
139 CHECK(global_epoll != nullptr);
140 global_epoll->DeleteFd(fd);
141}
142
James Kuszmaul1a29c082022-02-03 14:02:47 -0800143WebProxy::WebProxy(aos::EventLoop *event_loop, StoreHistory store_history,
144 int per_channel_buffer_size_bytes)
145 : WebProxy(event_loop, &internal_epoll_, store_history,
146 per_channel_buffer_size_bytes) {}
James Kuszmaul71a81932020-12-15 21:08:01 -0800147
James Kuszmaul1a29c082022-02-03 14:02:47 -0800148WebProxy::WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
149 int per_channel_buffer_size_bytes)
150 : WebProxy(event_loop, event_loop->epoll(), store_history,
151 per_channel_buffer_size_bytes) {}
James Kuszmaul71a81932020-12-15 21:08:01 -0800152
153WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
James Kuszmaul1a29c082022-02-03 14:02:47 -0800154 StoreHistory store_history,
155 int per_channel_buffer_size_bytes)
James Kuszmaul71a81932020-12-15 21:08:01 -0800156 : epoll_(epoll),
157 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
158 ::seasocks::Logger::Level::Info)),
James Kuszmaul1a29c082022-02-03 14:02:47 -0800159 websocket_handler_(new WebsocketHandler(
160 &server_, event_loop, store_history, per_channel_buffer_size_bytes)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700161 CHECK(!global_epoll);
162 global_epoll = epoll;
163
164 re_fd_set_listen_callback(&ReFdListen);
165 re_fd_set_close_callback(&ReFdClose);
166
167 epoll->BeforeWait([]() {
168 const uint64_t to = tmr_next_timeout(tmrl_get());
169 if (to != 0) {
Austin Schuh0c8dd362021-10-30 10:23:25 -0700170 VLOG(3) << "Next timeout " << to;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700171 }
172 // Note: this only works because we are spinning on it...
173 // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
174 // for handling tmr.
175 tmr_poll(tmrl_get());
176 });
177
James Kuszmaul71a81932020-12-15 21:08:01 -0800178 server_.addWebSocketHandler("/ws", websocket_handler_);
James Kuszmaul1e95bed2021-01-09 21:02:49 -0800179 CHECK(server_.startListening(FLAGS_proxy_port));
James Kuszmaul71a81932020-12-15 21:08:01 -0800180
181 epoll->OnReadable(server_.fd(), [this]() {
182 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
183 });
184
185 if (&internal_epoll_ == epoll) {
186 TimerHandler *const timer = event_loop->AddTimer([this]() {
187 // Run the epoll poller until there are no more events (if we are being
188 // backed by a shm event loop, there won't be anything registered to
189 // internal_epoll_ and this will just return false).
190 // We just deal with clearing all the epoll events using a simulated
191 // timer. This does mean that we will spin rather than actually sleeping
192 // in any coherent manner, which will be particularly noticeable when past
193 // the end of processing other events.
194 while (internal_epoll_.Poll(false)) {
195 continue;
196 }
197 });
198
199 event_loop->OnRun([timer, event_loop]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700200 timer->Schedule(event_loop->monotonic_now(),
201 std::chrono::milliseconds(10));
James Kuszmaul71a81932020-12-15 21:08:01 -0800202 });
203 }
204}
205
206WebProxy::~WebProxy() {
207 epoll_->DeleteFd(server_.fd());
208 server_.terminate();
209 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700210 CHECK(global_epoll == epoll_);
211 global_epoll = nullptr;
James Kuszmaul71a81932020-12-15 21:08:01 -0800212}
213
James Kuszmaul147b4c12022-07-13 20:35:27 -0700214void WebProxy::StopRecording() { websocket_handler_->StopRecording(); }
Alex Perry5f474f22020-02-01 12:14:24 -0800215
James Kuszmaul147b4c12022-07-13 20:35:27 -0700216void Subscriber::RunIteration(bool fetch_new) {
217 if (fetch_new) {
218 if (channels_.empty() && (buffer_size_ == 0 || !store_history_)) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800219 fetcher_->Fetch();
James Kuszmaul147b4c12022-07-13 20:35:27 -0700220 message_buffer_.clear();
221 return;
James Kuszmaul71a81932020-12-15 21:08:01 -0800222 }
Alex Perry5f474f22020-02-01 12:14:24 -0800223
James Kuszmaul147b4c12022-07-13 20:35:27 -0700224 while (fetcher_->FetchNext()) {
225 // If we aren't building up a buffer, short-circuit the FetchNext().
226 if (buffer_size_ == 0) {
227 fetcher_->Fetch();
Austin Schuhd16ef442021-04-25 14:44:42 -0700228 }
James Kuszmaul147b4c12022-07-13 20:35:27 -0700229 Message message;
230 message.index = fetcher_->context().queue_index;
231 VLOG(2) << "Packing a message with "
232 << GetPacketCount(fetcher_->context()) << "packets";
233 for (int packet_index = 0;
234 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
235 // Pack directly into the mbuffer. This is admittedly a bit painful.
236 const size_t packet_size =
237 PackedMessageSize(fetcher_->context(), packet_index);
238 struct mbuf *mbuffer = mbuf_alloc(packet_size);
Alex Perry5f474f22020-02-01 12:14:24 -0800239
James Kuszmaul147b4c12022-07-13 20:35:27 -0700240 {
241 // Wrap a pre-allocated builder around the mbuffer.
242 PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
243 flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
244 flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
245 &fbb, fetcher_->context(), channel_index_, packet_index);
246 fbb.Finish(message_offset);
247
248 // Now, the flatbuffer is built from the back to the front. So any
Philipp Schradera6712522023-07-05 20:25:11 -0700249 // extra memory will be at the front. Set up the end and start
James Kuszmaul147b4c12022-07-13 20:35:27 -0700250 // pointers on the mbuf.
251 mbuf_set_end(mbuffer, packet_size);
252 mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
253 }
254
255 message.data.emplace_back(
256 std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
257 }
258 message_buffer_.push_back(std::move(message));
259 // If we aren't keeping a buffer, then we should only do one iteration of
260 // the while loop--otherwise, if additional messages arrive between the
261 // first FetchNext() and the second iteration then we can end up behaving
262 // poorly (since we do a Fetch() when buffer_size_ == 0).
263 if (buffer_size_ == 0) {
264 break;
265 }
James Kuszmaul45139e62021-09-11 11:41:03 -0700266 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800267 }
268 for (auto &conn : channels_) {
James Kuszmaula5822682021-12-23 18:39:28 -0800269 std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first.lock();
270 CHECK(rtc_channel) << "data_channel was destroyed too early.";
James Kuszmaul71a81932020-12-15 21:08:01 -0800271 ChannelInformation *channel_data = &conn.second;
272 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
273 SkipToLastMessage(channel_data);
274 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700275 std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
276 while (buffer) {
277 // TODO(austin): This is a nop so we just buffer forever. Fix this when
278 // we care.
James Kuszmaul71a81932020-12-15 21:08:01 -0800279 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800280 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800281 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800282 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700283
284 rtc_channel->Send(buffer.get());
James Kuszmaul71a81932020-12-15 21:08:01 -0800285 buffer = NextBuffer(channel_data);
286 }
287 }
288 if (buffer_size_ >= 0) {
289 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
290 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800291 }
292 }
293}
294
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700295void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
296 TransferMethod transfer_method) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800297 ChannelInformation info;
298 info.transfer_method = transfer_method;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700299
James Kuszmaula5822682021-12-23 18:39:28 -0800300 channels_.emplace_back(std::make_pair(data_channel, info));
301
302 data_channel->set_on_message(
303 [this, index = channels_.size() - 1](
304 struct mbuf *const buffer,
305 const enum rawrtc_data_channel_message_flag /*flags*/) {
306 FlatbufferSpan<ChannelState> message(
307 {mbuf_buf(buffer), mbuf_get_left(buffer)});
308 if (!message.Verify()) {
309 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
310 return;
311 }
312
313 channels_[index].second.reported_queue_index =
314 message.message().queue_index();
315 channels_[index].second.reported_packet_index =
316 message.message().packet_index();
James Kuszmaule524ed02023-12-09 13:21:03 -0800317 // Note: Uses actual clock to handle simulation time.
318 channels_[index].second.last_report = aos::monotonic_clock::now();
James Kuszmaula5822682021-12-23 18:39:28 -0800319 });
James Kuszmaul71a81932020-12-15 21:08:01 -0800320}
321
Austin Schuh60e77942022-05-16 17:48:24 -0700322void Subscriber::RemoveListener(
323 std::shared_ptr<ScopedDataChannel> data_channel) {
James Kuszmaula5822682021-12-23 18:39:28 -0800324 channels_.erase(
325 std::remove_if(
326 channels_.begin(), channels_.end(),
327 [data_channel](const std::pair<std::weak_ptr<ScopedDataChannel>,
328 ChannelInformation> &channel) {
329 return channel.first.lock().get() == data_channel.get();
330 }),
331 channels_.end());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700332}
333
334std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
335 ChannelInformation *channel) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800336 CHECK_NOTNULL(channel);
337 if (message_buffer_.empty()) {
338 return nullptr;
339 }
340 const uint32_t earliest_index = message_buffer_.front().index;
341 const uint32_t latest_index = message_buffer_.back().index;
342 const bool fell_behind = channel->current_queue_index < earliest_index;
343 if (fell_behind) {
344 channel->current_queue_index = earliest_index;
345 channel->next_packet_number = 0;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700346 return message_buffer_.front().data.at(0);
James Kuszmaul71a81932020-12-15 21:08:01 -0800347 }
James Kuszmaula5822682021-12-23 18:39:28 -0800348 // TODO(james): Handle queue index wrapping when >2^32 messages are sent on a
349 // channel.
James Kuszmaul71a81932020-12-15 21:08:01 -0800350 if (channel->current_queue_index > latest_index) {
351 // We are still waiting on the next message to appear; return.
352 return nullptr;
353 }
James Kuszmaula5822682021-12-23 18:39:28 -0800354 if (FLAGS_pre_send_messages > 0) {
James Kuszmaule524ed02023-12-09 13:21:03 -0800355 // Note: Uses actual clock to handle simulation time.
356 const aos::monotonic_clock::time_point now = aos::monotonic_clock::now();
357 if (channel->last_report.has_value() &&
358 channel->last_report.value() +
359 std::chrono::duration_cast<std::chrono::nanoseconds>(
360 std::chrono::duration<double>(FLAGS_max_buffer_pause_sec)) <
361 now) {
362 // Increment the number of messages that we will send over to the client
363 // webpage.
364 channel->reported_queue_index += FLAGS_pre_send_messages / 10;
365 channel->reported_packet_index = 0;
366 channel->last_report = now;
367 }
James Kuszmaula5822682021-12-23 18:39:28 -0800368 // Don't buffer up an excessive number of messages to the client.
369 // This currently ignores the packet index (and really, any concept of
370 // message size), but the main goal is just to avoid locking up the client
371 // browser, not to be ultra precise about anything. It's also not clear that
372 // message *size* is necessarily even the determining factor in causing
373 // issues.
374 if (channel->reported_queue_index + FLAGS_pre_send_messages <
375 channel->current_queue_index) {
376 return nullptr;
377 }
378 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800379 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
380 << "Inconsistent queue indices.";
381 const size_t packets_in_message =
382 message_buffer_[channel->current_queue_index - earliest_index]
383 .data.size();
384 CHECK_LT(0u, packets_in_message);
385 CHECK_LT(channel->next_packet_number, packets_in_message);
386
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700387 std::shared_ptr<struct mbuf> original_data =
388 message_buffer_[channel->current_queue_index - earliest_index].data.at(
James Kuszmaul71a81932020-12-15 21:08:01 -0800389 channel->next_packet_number);
390
391 ++channel->next_packet_number;
392 if (channel->next_packet_number == packets_in_message) {
393 ++channel->current_queue_index;
394 channel->next_packet_number = 0;
395 }
396
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700397 // Trigger a copy of the mbuf without copying the data.
398 return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
399 mem_deref);
James Kuszmaul71a81932020-12-15 21:08:01 -0800400}
401
402void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
403 CHECK_NOTNULL(channel);
404 if (message_buffer_.empty() ||
405 channel->current_queue_index == message_buffer_.back().index) {
406 return;
407 }
408 channel->current_queue_index = message_buffer_.back().index;
James Kuszmaul87200a42022-03-26 18:09:18 -0700409 channel->reported_queue_index = message_buffer_.back().index;
James Kuszmaule524ed02023-12-09 13:21:03 -0800410 channel->last_report.reset();
James Kuszmaul71a81932020-12-15 21:08:01 -0800411 channel->next_packet_number = 0;
412}
413
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700414ApplicationConnection::ApplicationConnection(
415 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
Alex Perry5f474f22020-02-01 12:14:24 -0800416 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800417 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
418 const EventLoop *event_loop)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700419 : server_(server),
420 sock_(sock),
Alex Perry5f474f22020-02-01 12:14:24 -0800421 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800422 config_headers_(PackBuffer(config.span())),
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700423 event_loop_(event_loop) {
424 connection_.set_on_negotiation_needed([]() {
425 VLOG(1) << "Negotiation needed, not offering so not creating offer.";
426 });
Alex Perryb3b50792020-01-18 16:13:45 -0800427
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700428 connection_.set_on_local_candidate(
429 [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
430 char const *const url) { LocalCandidate(candidate, url); });
431
432 connection_.set_on_data_channel(
433 [this](std::shared_ptr<ScopedDataChannel> channel) {
434 OnDataChannel(channel);
435 });
436
437 connection_.Open();
438}
439
440ApplicationConnection::~ApplicationConnection() {
441 for (auto &it : channels_) {
442 it.second.data_channel->Close();
443 it.second.data_channel = nullptr;
444 }
445
446 // Eh, we are done, tell the channel to shut down. If we didn't, it would
447 // just hang around until the connection closes, which is rather shortly
448 // after.
449 if (channel_) {
450 channel_->Close();
451 }
452}
453
454void ApplicationConnection::OnSdp(const char *sdp) {
455 struct rawrtc_peer_connection_description *remote_description = NULL;
456
457 auto error = rawrtc_peer_connection_description_create(
458 &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
459 if (error) {
460 LOG(WARNING) << "Cannot parse remote description: "
461 << rawrtc_code_to_str(error);
James Kuszmaul48671362020-12-24 13:54:16 -0800462 return;
463 }
Alex Perryb3b50792020-01-18 16:13:45 -0800464
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700465 CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
466 connection_.connection(), remote_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800467
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700468 struct rawrtc_peer_connection_description *local_description;
469 CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
470 connection_.connection()));
471 CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
472 connection_.connection(), local_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800473
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700474 enum rawrtc_sdp_type type;
475 char *local_sdp = nullptr;
476 // Get SDP type & the SDP itself
477 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
478 &type, local_description));
479 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
480 local_description));
James Kuszmaul8d928d02020-12-25 17:47:49 -0800481
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700482 flatbuffers::FlatBufferBuilder fbb;
Alex Perryb3b50792020-01-18 16:13:45 -0800483 flatbuffers::Offset<WebSocketSdp> sdp_fb =
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700484 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800485 flatbuffers::Offset<WebSocketMessage> answer_message =
486 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700487
488 VLOG(1) << aos::FlatbufferToJson(
489 flatbuffers::GetTemporaryPointer(fbb, answer_message));
Alex Perryb3b50792020-01-18 16:13:45 -0800490 fbb.Finish(answer_message);
491
492 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700493 mem_deref(local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800494}
495
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700496void ApplicationConnection::OnIce(const WebSocketIce *ice) {
497 if (!ice->has_candidate()) {
498 return;
499 }
Brian Silverman225c5072021-11-17 19:56:31 -0800500 uint8_t sdp_m_line_index = ice->sdp_m_line_index();
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700501
502 struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
503 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
Brian Silverman225c5072021-11-17 19:56:31 -0800504 &ice_candidate, ice->candidate()->c_str(), ice->sdp_mid()->c_str(),
505 &sdp_m_line_index, nullptr));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700506
507 rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
508 ice_candidate);
509
510 mem_deref(ice_candidate);
511}
512
513void ApplicationConnection::LocalCandidate(
514 struct rawrtc_peer_connection_ice_candidate *const candidate,
515 char const *const url) {
516 struct rawrtc_ice_candidate *ortc_candidate = nullptr;
517 if (candidate) {
518 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
519 &ortc_candidate, candidate));
520
521 flatbuffers::FlatBufferBuilder fbb;
522 char *sdpp = nullptr;
523 CHECK_RAWRTC(
524 rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
525 char *midp = nullptr;
526 CHECK_RAWRTC(
527 rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
528
529 uint8_t media_line_index;
530 enum rawrtc_code error =
531 rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
532 &media_line_index, candidate);
533
534 flatbuffers::Offset<flatbuffers::String> sdpp_offset =
535 fbb.CreateString(sdpp);
536 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
537 fbb.CreateString(midp);
538
539 WebSocketIce::Builder web_socket_ice_builder(fbb);
540
541 web_socket_ice_builder.add_candidate(sdpp_offset);
Brian Silverman225c5072021-11-17 19:56:31 -0800542 web_socket_ice_builder.add_sdp_mid(sdp_mid_offset);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700543
544 if (error == RAWRTC_CODE_SUCCESS) {
Brian Silverman225c5072021-11-17 19:56:31 -0800545 web_socket_ice_builder.add_sdp_m_line_index(media_line_index);
James Kuszmaul1ec74432020-07-30 20:26:45 -0700546 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700547 flatbuffers::Offset<WebSocketIce> ice_offset =
548 web_socket_ice_builder.Finish();
549
550 flatbuffers::Offset<WebSocketMessage> ice_message =
551 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
552 VLOG(1) << url << ": "
553 << aos::FlatbufferToJson(
554 flatbuffers::GetTemporaryPointer(fbb, ice_message));
555 fbb.Finish(ice_message);
556
557 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
558
559 mem_deref(sdpp);
560 mem_deref(midp);
Alex Perry5f474f22020-02-01 12:14:24 -0800561 }
562}
563
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700564void ApplicationConnection::OnDataChannel(
565 std::shared_ptr<ScopedDataChannel> channel) {
566 if (channel->label() == std::string_view("signalling")) {
567 CHECK(!channel_);
568 channel_ = channel;
569
570 channel_->set_on_message(
571 [this](struct mbuf *const buffer,
572 const enum rawrtc_data_channel_message_flag flags) {
573 HandleSignallingData(buffer, flags);
574 });
575
576 channel_->set_on_open([this]() {
577 for (const auto &header : config_headers_) {
578 channel_->Send(header.buffer());
579 }
580 });
581
582 channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
583
584 // Register an on_close callback which does nothing but keeps channel alive
585 // until it is done. This keeps the memory around until rawrtc can finish
586 // calling the close callback.
587 channel_->set_on_close([channel]() {});
588 } else {
589 channel_->set_on_close([channel]() {});
590 channel->Close();
591 }
592}
593
594void ApplicationConnection::HandleSignallingData(
595 struct mbuf *const
596 buffer, // nullable (in case partial delivery has been requested)
597 const enum rawrtc_data_channel_message_flag /*flags*/) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800598 FlatbufferSpan<SubscriberRequest> message(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700599 {mbuf_buf(buffer), mbuf_get_left(buffer)});
James Kuszmaul71a81932020-12-15 21:08:01 -0800600 if (!message.Verify()) {
601 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
602 return;
603 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700604 VLOG(1) << "Got a subscription message "
James Kuszmaul71a81932020-12-15 21:08:01 -0800605 << aos::FlatbufferToJson(&message.message());
606 if (!message.message().has_channels_to_transfer()) {
607 LOG(ERROR) << "No channels requested for transfer.";
608 return;
609 }
Alex Perry5f474f22020-02-01 12:14:24 -0800610
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700611 // The client each time sends a full list of everything it wants to be
612 // subscribed to. It is our responsibility to remove channels which aren't
613 // in that list and add ones which need to be.
614 //
615 // Start by clearing a tracking bit on each channel. This takes O(number of
616 // open channels), which should be small.
617 //
618 // Then open any new channels. For any we visit which are already open,
619 // don't update those.
620 //
621 // Finally, iterate over the channel list and purge anything which we didn't
622 // touch.
623 for (auto &it : channels_) {
624 it.second.requested = false;
625 }
626 for (auto channel_request : *message.message().channels_to_transfer()) {
627 const Channel *channel = channel_request->channel();
628 if (channel == nullptr) {
629 LOG(ERROR) << "Got unpopulated channel.";
630 continue;
Alex Perry5f474f22020-02-01 12:14:24 -0800631 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700632 const TransferMethod transfer_method = channel_request->method();
633 // Call GetChannel() before comparing the channel name/type to each
634 // subscriber. This allows us to resolve any node or application
635 // specific mappings.
636 const Channel *comparison_channel =
637 configuration::GetChannel(event_loop_->configuration(), channel,
638 event_loop_->name(), event_loop_->node());
639 if (comparison_channel == nullptr) {
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700640 LOG(ERROR) << "Channel does not exist: "
641 << configuration::StrippedChannelToString(channel);
642 continue;
643 }
644 if (!configuration::ChannelIsReadableOnNode(comparison_channel,
645 event_loop_->node())) {
646 LOG(ERROR) << "Channel not available on node "
647 << event_loop_->node()->name()->string_view() << ": "
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700648 << configuration::StrippedChannelToString(channel);
649 continue;
650 }
651
652 size_t channel_index = configuration::ChannelIndex(
653 event_loop_->configuration(), comparison_channel);
654
655 auto it = channels_.find(channel_index);
656 if (it == channels_.end()) {
657 std::shared_ptr<ScopedDataChannel> data_channel =
James Kuszmaula5822682021-12-23 18:39:28 -0800658 ScopedDataChannel::MakeDataChannel();
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700659
660 std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
661
662 data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
663 channel_index]() {
664 std::shared_ptr<ScopedDataChannel> data_channel =
665 data_channel_weak_ptr.lock();
666 CHECK(data_channel) << ": Subscriber got destroyed before we started.";
James Kuszmaula5822682021-12-23 18:39:28 -0800667 // Raw pointer inside the subscriber so we don't have a circular
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700668 // reference. AddListener will close it.
Austin Schuh60e77942022-05-16 17:48:24 -0700669 subscribers_[channel_index]->AddListener(data_channel, transfer_method);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700670 });
671
672 Subscriber *subscriber = subscribers_[channel_index].get();
673 data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
674 std::shared_ptr<ScopedDataChannel> data_channel =
675 data_channel_weak_ptr.lock();
676 CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
677 subscriber->RemoveListener(data_channel);
678 });
679
680 data_channel->Open(
681 connection_.connection(),
682 absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
683
684 auto pair = channels_.insert({channel_index, {data_channel, true}});
685 it = pair.first;
686 }
687
688 it->second.requested = true;
689
690 VLOG(1) << "Subscribe to: " << channel->type()->str();
691 }
692
693 for (auto &it : channels_) {
694 if (!it.second.requested) {
695 it.second.data_channel->Close();
696 it.second.data_channel = nullptr;
Alex Perry5f474f22020-02-01 12:14:24 -0800697 }
698 }
Alex Perryb3b50792020-01-18 16:13:45 -0800699}
700
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800701} // namespace aos::web_proxy