blob: 05bb7faa33ae44246c1fd43192aca2b8816399dd [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
Austin Schuh99f7c6a2024-06-25 22:07:44 -07003#include "absl/flags/flag.h"
4#include "absl/log/check.h"
5#include "absl/log/log.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07006
Alex Perry5f474f22020-02-01 12:14:24 -08007#include "aos/flatbuffer_merge.h"
8#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08009#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -080010#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -080011#include "aos/seasocks/seasocks_logger.h"
James Kuszmaul48671362020-12-24 13:54:16 -080012#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080013
Austin Schuh52e5e3a2021-04-24 22:30:02 -070014extern "C" {
15#include <rawrtc.h>
16
17#define DEBUG_LEVEL 7
18#define DEBUG_MODULE "web-proxy"
19#include <re_dbg.h>
20struct list *tmrl_get(void);
21}
22
Austin Schuh99f7c6a2024-06-25 22:07:44 -070023ABSL_FLAG(int32_t, proxy_port, 1180, "Port to use for the web proxy server.");
24ABSL_FLAG(int32_t, pre_send_messages, 10000,
25 "Number of messages / queue to send to a client before waiting on "
26 "confirmation that the initial message was received. If set to "
27 "-1, will not throttle messages at all. This prevents a situation "
28 "where, when run on localhost, the large number of WebRTC packets "
29 "can overwhelm the browser and crash the webpage.");
James Kuszmaule524ed02023-12-09 13:21:03 -080030// Note: sometimes it appears that WebRTC buffer up and stop sending message
31// ack's back from the client page. It is not clear *why* WebRTC is doing this,
32// but since the only reason we use those ack's is to stop ourselves from
33// overloading the client webpage, this setting lets us fall back to just a
34// time-based rate-limit when we stop receiving acks.
Austin Schuh99f7c6a2024-06-25 22:07:44 -070035ABSL_FLAG(double, max_buffer_pause_sec, 0.1,
36 "If we have not received any ack's in this amount of time, we "
37 "start to continue sending messages.");
James Kuszmaul1e95bed2021-01-09 21:02:49 -080038
Stephan Pleinesf63bde82024-01-13 15:59:33 -080039namespace aos::web_proxy {
James Kuszmaul7ad91522020-09-01 19:15:35 -070040WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul1a29c082022-02-03 14:02:47 -080041 aos::EventLoop *event_loop,
42 StoreHistory store_history,
43 int per_channel_buffer_size_bytes)
James Kuszmaul7ad91522020-09-01 19:15:35 -070044 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080045 config_(aos::CopyFlatBuffer(event_loop->configuration())),
46 event_loop_(event_loop) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070047 if (VLOG_IS_ON(2)) {
48 dbg_init(DBG_DEBUG, DBG_ALL);
49 }
50 CHECK_RAWRTC(rawrtc_init(true));
51
James Kuszmaul48671362020-12-24 13:54:16 -080052 // We need to reference findEmbeddedContent() to make the linker happy...
53 findEmbeddedContent("");
Austin Schuh52e5e3a2021-04-24 22:30:02 -070054 const aos::Node *self = event_loop_->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070055
Austin Schuh52e5e3a2021-04-24 22:30:02 -070056 subscribers_.reserve(event_loop_->configuration()->channels()->size());
57 for (uint i = 0; i < event_loop_->configuration()->channels()->size(); ++i) {
58 auto channel = event_loop_->configuration()->channels()->Get(i);
James Kuszmaul7ad91522020-09-01 19:15:35 -070059 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070060 auto fetcher = event_loop_->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080061 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
James Kuszmaul1a29c082022-02-03 14:02:47 -080062 std::move(fetcher), i, store_history,
63 per_channel_buffer_size_bytes < 0
64 ? -1
65 : per_channel_buffer_size_bytes / channel->max_size()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -070066 } else {
67 subscribers_.emplace_back(nullptr);
James Kuszmaul7ad91522020-09-01 19:15:35 -070068 }
69 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -070070 TimerHandler *const timer = event_loop_->AddTimer([this]() {
James Kuszmaul7ad91522020-09-01 19:15:35 -070071 for (auto &subscriber : subscribers_) {
James Kuszmaul147b4c12022-07-13 20:35:27 -070072 if (subscriber) subscriber->RunIteration(recording_);
James Kuszmaul7ad91522020-09-01 19:15:35 -070073 }
74 });
75
Austin Schuh52e5e3a2021-04-24 22:30:02 -070076 event_loop_->OnRun([this, timer]() {
Philipp Schradera6712522023-07-05 20:25:11 -070077 timer->Schedule(event_loop_->monotonic_now(),
78 std::chrono::milliseconds(100));
James Kuszmaul7ad91522020-09-01 19:15:35 -070079 });
80}
Alex Perryb3b50792020-01-18 16:13:45 -080081
82void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070083 std::unique_ptr<ApplicationConnection> connection =
84 std::make_unique<ApplicationConnection>(server_, sock, subscribers_,
85 config_, event_loop_);
86
87 connections_.insert({sock, std::move(connection)});
Alex Perryb3b50792020-01-18 16:13:45 -080088}
89
90void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
91 size_t size) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -070092 const FlatbufferSpan<WebSocketMessage> message({data, size});
93 if (!message.Verify()) {
94 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
95 return;
96 }
97 VLOG(1) << "Got msg " << aos::FlatbufferToJson(message);
98 switch (message.message().payload_type()) {
99 case Payload::WebSocketSdp: {
100 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
101 if (offer->type() != SdpType::OFFER) {
102 LOG(WARNING) << "Got the wrong sdp type from client";
103 break;
104 }
105 const flatbuffers::String *sdp = offer->payload();
106 connections_[sock]->OnSdp(sdp->c_str());
107 break;
108 }
109 case Payload::WebSocketIce: {
110 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
111 connections_[sock]->OnIce(ice);
112 break;
113 }
Brian Silverman225c5072021-11-17 19:56:31 -0800114 default: {
115 break;
116 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700117 }
Alex Perryb3b50792020-01-18 16:13:45 -0800118}
119
120void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
121 connections_.erase(sock);
122}
123
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700124// Global epoll pointer
125static aos::internal::EPoll *global_epoll = nullptr;
126
127static int ReFdListen(int fd, int flags, fd_h *fh, void *arg) {
128 if (flags & 0x1) {
129 global_epoll->OnReadable(fd, [fh, arg]() { (*fh)(0x1, arg); });
130 }
131 if (flags & 0x2) {
132 global_epoll->OnWriteable(fd, [fh, arg]() { (*fh)(0x2, arg); });
133 }
134 if (flags & 0x4) {
135 global_epoll->OnError(fd, [fh, arg]() { (*fh)(0x4, arg); });
136 }
137 return 0;
138}
139
140static void ReFdClose(int fd) {
141 CHECK(global_epoll != nullptr);
142 global_epoll->DeleteFd(fd);
143}
144
James Kuszmaul1a29c082022-02-03 14:02:47 -0800145WebProxy::WebProxy(aos::EventLoop *event_loop, StoreHistory store_history,
146 int per_channel_buffer_size_bytes)
147 : WebProxy(event_loop, &internal_epoll_, store_history,
148 per_channel_buffer_size_bytes) {}
James Kuszmaul71a81932020-12-15 21:08:01 -0800149
James Kuszmaul1a29c082022-02-03 14:02:47 -0800150WebProxy::WebProxy(aos::ShmEventLoop *event_loop, StoreHistory store_history,
151 int per_channel_buffer_size_bytes)
152 : WebProxy(event_loop, event_loop->epoll(), store_history,
153 per_channel_buffer_size_bytes) {}
James Kuszmaul71a81932020-12-15 21:08:01 -0800154
155WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
James Kuszmaul1a29c082022-02-03 14:02:47 -0800156 StoreHistory store_history,
157 int per_channel_buffer_size_bytes)
James Kuszmaul71a81932020-12-15 21:08:01 -0800158 : epoll_(epoll),
159 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
160 ::seasocks::Logger::Level::Info)),
James Kuszmaul1a29c082022-02-03 14:02:47 -0800161 websocket_handler_(new WebsocketHandler(
162 &server_, event_loop, store_history, per_channel_buffer_size_bytes)) {
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700163 CHECK(!global_epoll);
164 global_epoll = epoll;
165
166 re_fd_set_listen_callback(&ReFdListen);
167 re_fd_set_close_callback(&ReFdClose);
168
169 epoll->BeforeWait([]() {
170 const uint64_t to = tmr_next_timeout(tmrl_get());
171 if (to != 0) {
Austin Schuh0c8dd362021-10-30 10:23:25 -0700172 VLOG(3) << "Next timeout " << to;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700173 }
174 // Note: this only works because we are spinning on it...
175 // TODO(austin): If we choose to actually sleep, use a timerfd reserved just
176 // for handling tmr.
177 tmr_poll(tmrl_get());
178 });
179
James Kuszmaul71a81932020-12-15 21:08:01 -0800180 server_.addWebSocketHandler("/ws", websocket_handler_);
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700181 CHECK(server_.startListening(absl::GetFlag(FLAGS_proxy_port)));
James Kuszmaul71a81932020-12-15 21:08:01 -0800182
183 epoll->OnReadable(server_.fd(), [this]() {
184 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
185 });
186
187 if (&internal_epoll_ == epoll) {
188 TimerHandler *const timer = event_loop->AddTimer([this]() {
189 // Run the epoll poller until there are no more events (if we are being
190 // backed by a shm event loop, there won't be anything registered to
191 // internal_epoll_ and this will just return false).
192 // We just deal with clearing all the epoll events using a simulated
193 // timer. This does mean that we will spin rather than actually sleeping
194 // in any coherent manner, which will be particularly noticeable when past
195 // the end of processing other events.
196 while (internal_epoll_.Poll(false)) {
197 continue;
198 }
199 });
200
201 event_loop->OnRun([timer, event_loop]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700202 timer->Schedule(event_loop->monotonic_now(),
203 std::chrono::milliseconds(10));
James Kuszmaul71a81932020-12-15 21:08:01 -0800204 });
205 }
206}
207
208WebProxy::~WebProxy() {
209 epoll_->DeleteFd(server_.fd());
210 server_.terminate();
211 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700212 CHECK(global_epoll == epoll_);
213 global_epoll = nullptr;
James Kuszmaul71a81932020-12-15 21:08:01 -0800214}
215
James Kuszmaul147b4c12022-07-13 20:35:27 -0700216void WebProxy::StopRecording() { websocket_handler_->StopRecording(); }
Alex Perry5f474f22020-02-01 12:14:24 -0800217
James Kuszmaul147b4c12022-07-13 20:35:27 -0700218void Subscriber::RunIteration(bool fetch_new) {
219 if (fetch_new) {
220 if (channels_.empty() && (buffer_size_ == 0 || !store_history_)) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800221 fetcher_->Fetch();
James Kuszmaul147b4c12022-07-13 20:35:27 -0700222 message_buffer_.clear();
223 return;
James Kuszmaul71a81932020-12-15 21:08:01 -0800224 }
Alex Perry5f474f22020-02-01 12:14:24 -0800225
James Kuszmaul147b4c12022-07-13 20:35:27 -0700226 while (fetcher_->FetchNext()) {
227 // If we aren't building up a buffer, short-circuit the FetchNext().
228 if (buffer_size_ == 0) {
229 fetcher_->Fetch();
Austin Schuhd16ef442021-04-25 14:44:42 -0700230 }
James Kuszmaul147b4c12022-07-13 20:35:27 -0700231 Message message;
232 message.index = fetcher_->context().queue_index;
233 VLOG(2) << "Packing a message with "
234 << GetPacketCount(fetcher_->context()) << "packets";
235 for (int packet_index = 0;
236 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
237 // Pack directly into the mbuffer. This is admittedly a bit painful.
238 const size_t packet_size =
239 PackedMessageSize(fetcher_->context(), packet_index);
240 struct mbuf *mbuffer = mbuf_alloc(packet_size);
Alex Perry5f474f22020-02-01 12:14:24 -0800241
James Kuszmaul147b4c12022-07-13 20:35:27 -0700242 {
243 // Wrap a pre-allocated builder around the mbuffer.
244 PreallocatedAllocator allocator(mbuf_buf(mbuffer), packet_size);
245 flatbuffers::FlatBufferBuilder fbb(packet_size, &allocator);
246 flatbuffers::Offset<MessageHeader> message_offset = PackMessage(
247 &fbb, fetcher_->context(), channel_index_, packet_index);
248 fbb.Finish(message_offset);
249
250 // Now, the flatbuffer is built from the back to the front. So any
Philipp Schradera6712522023-07-05 20:25:11 -0700251 // extra memory will be at the front. Set up the end and start
James Kuszmaul147b4c12022-07-13 20:35:27 -0700252 // pointers on the mbuf.
253 mbuf_set_end(mbuffer, packet_size);
254 mbuf_set_pos(mbuffer, packet_size - fbb.GetSize());
255 }
256
257 message.data.emplace_back(
258 std::shared_ptr<struct mbuf>(mbuffer, mem_deref));
259 }
260 message_buffer_.push_back(std::move(message));
261 // If we aren't keeping a buffer, then we should only do one iteration of
262 // the while loop--otherwise, if additional messages arrive between the
263 // first FetchNext() and the second iteration then we can end up behaving
264 // poorly (since we do a Fetch() when buffer_size_ == 0).
265 if (buffer_size_ == 0) {
266 break;
267 }
James Kuszmaul45139e62021-09-11 11:41:03 -0700268 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800269 }
270 for (auto &conn : channels_) {
James Kuszmaula5822682021-12-23 18:39:28 -0800271 std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first.lock();
272 CHECK(rtc_channel) << "data_channel was destroyed too early.";
James Kuszmaul71a81932020-12-15 21:08:01 -0800273 ChannelInformation *channel_data = &conn.second;
274 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
275 SkipToLastMessage(channel_data);
276 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700277 std::shared_ptr<struct mbuf> buffer = NextBuffer(channel_data);
278 while (buffer) {
279 // TODO(austin): This is a nop so we just buffer forever. Fix this when
280 // we care.
James Kuszmaul71a81932020-12-15 21:08:01 -0800281 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800282 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800283 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800284 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700285
286 rtc_channel->Send(buffer.get());
James Kuszmaul71a81932020-12-15 21:08:01 -0800287 buffer = NextBuffer(channel_data);
288 }
289 }
290 if (buffer_size_ >= 0) {
291 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
292 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800293 }
294 }
295}
296
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700297void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
298 TransferMethod transfer_method) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800299 ChannelInformation info;
300 info.transfer_method = transfer_method;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700301
James Kuszmaula5822682021-12-23 18:39:28 -0800302 channels_.emplace_back(std::make_pair(data_channel, info));
303
304 data_channel->set_on_message(
305 [this, index = channels_.size() - 1](
306 struct mbuf *const buffer,
307 const enum rawrtc_data_channel_message_flag /*flags*/) {
308 FlatbufferSpan<ChannelState> message(
309 {mbuf_buf(buffer), mbuf_get_left(buffer)});
310 if (!message.Verify()) {
311 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
312 return;
313 }
314
315 channels_[index].second.reported_queue_index =
316 message.message().queue_index();
317 channels_[index].second.reported_packet_index =
318 message.message().packet_index();
James Kuszmaule524ed02023-12-09 13:21:03 -0800319 // Note: Uses actual clock to handle simulation time.
320 channels_[index].second.last_report = aos::monotonic_clock::now();
James Kuszmaula5822682021-12-23 18:39:28 -0800321 });
James Kuszmaul71a81932020-12-15 21:08:01 -0800322}
323
Austin Schuh60e77942022-05-16 17:48:24 -0700324void Subscriber::RemoveListener(
325 std::shared_ptr<ScopedDataChannel> data_channel) {
James Kuszmaula5822682021-12-23 18:39:28 -0800326 channels_.erase(
327 std::remove_if(
328 channels_.begin(), channels_.end(),
329 [data_channel](const std::pair<std::weak_ptr<ScopedDataChannel>,
330 ChannelInformation> &channel) {
331 return channel.first.lock().get() == data_channel.get();
332 }),
333 channels_.end());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700334}
335
336std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
337 ChannelInformation *channel) {
Austin Schuh6bdcc372024-06-27 14:49:11 -0700338 CHECK(channel != nullptr);
James Kuszmaul71a81932020-12-15 21:08:01 -0800339 if (message_buffer_.empty()) {
340 return nullptr;
341 }
342 const uint32_t earliest_index = message_buffer_.front().index;
343 const uint32_t latest_index = message_buffer_.back().index;
344 const bool fell_behind = channel->current_queue_index < earliest_index;
345 if (fell_behind) {
346 channel->current_queue_index = earliest_index;
347 channel->next_packet_number = 0;
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700348 return message_buffer_.front().data.at(0);
James Kuszmaul71a81932020-12-15 21:08:01 -0800349 }
James Kuszmaula5822682021-12-23 18:39:28 -0800350 // TODO(james): Handle queue index wrapping when >2^32 messages are sent on a
351 // channel.
James Kuszmaul71a81932020-12-15 21:08:01 -0800352 if (channel->current_queue_index > latest_index) {
353 // We are still waiting on the next message to appear; return.
354 return nullptr;
355 }
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700356 if (absl::GetFlag(FLAGS_pre_send_messages) > 0) {
James Kuszmaule524ed02023-12-09 13:21:03 -0800357 // Note: Uses actual clock to handle simulation time.
358 const aos::monotonic_clock::time_point now = aos::monotonic_clock::now();
359 if (channel->last_report.has_value() &&
360 channel->last_report.value() +
361 std::chrono::duration_cast<std::chrono::nanoseconds>(
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700362 std::chrono::duration<double>(
363 absl::GetFlag(FLAGS_max_buffer_pause_sec))) <
James Kuszmaule524ed02023-12-09 13:21:03 -0800364 now) {
365 // Increment the number of messages that we will send over to the client
366 // webpage.
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700367 channel->reported_queue_index +=
368 absl::GetFlag(FLAGS_pre_send_messages) / 10;
James Kuszmaule524ed02023-12-09 13:21:03 -0800369 channel->reported_packet_index = 0;
370 channel->last_report = now;
371 }
James Kuszmaula5822682021-12-23 18:39:28 -0800372 // Don't buffer up an excessive number of messages to the client.
373 // This currently ignores the packet index (and really, any concept of
374 // message size), but the main goal is just to avoid locking up the client
375 // browser, not to be ultra precise about anything. It's also not clear that
376 // message *size* is necessarily even the determining factor in causing
377 // issues.
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700378 if (channel->reported_queue_index + absl::GetFlag(FLAGS_pre_send_messages) <
James Kuszmaula5822682021-12-23 18:39:28 -0800379 channel->current_queue_index) {
380 return nullptr;
381 }
382 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800383 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
384 << "Inconsistent queue indices.";
385 const size_t packets_in_message =
386 message_buffer_[channel->current_queue_index - earliest_index]
387 .data.size();
388 CHECK_LT(0u, packets_in_message);
389 CHECK_LT(channel->next_packet_number, packets_in_message);
390
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700391 std::shared_ptr<struct mbuf> original_data =
392 message_buffer_[channel->current_queue_index - earliest_index].data.at(
James Kuszmaul71a81932020-12-15 21:08:01 -0800393 channel->next_packet_number);
394
395 ++channel->next_packet_number;
396 if (channel->next_packet_number == packets_in_message) {
397 ++channel->current_queue_index;
398 channel->next_packet_number = 0;
399 }
400
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700401 // Trigger a copy of the mbuf without copying the data.
402 return std::shared_ptr<struct mbuf>(mbuf_alloc_ref(original_data.get()),
403 mem_deref);
James Kuszmaul71a81932020-12-15 21:08:01 -0800404}
405
406void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
Austin Schuh6bdcc372024-06-27 14:49:11 -0700407 CHECK(channel != nullptr);
James Kuszmaul71a81932020-12-15 21:08:01 -0800408 if (message_buffer_.empty() ||
409 channel->current_queue_index == message_buffer_.back().index) {
410 return;
411 }
412 channel->current_queue_index = message_buffer_.back().index;
James Kuszmaul87200a42022-03-26 18:09:18 -0700413 channel->reported_queue_index = message_buffer_.back().index;
James Kuszmaule524ed02023-12-09 13:21:03 -0800414 channel->last_report.reset();
James Kuszmaul71a81932020-12-15 21:08:01 -0800415 channel->next_packet_number = 0;
416}
417
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700418ApplicationConnection::ApplicationConnection(
419 ::seasocks::Server *server, ::seasocks::WebSocket *sock,
Alex Perry5f474f22020-02-01 12:14:24 -0800420 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800421 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
422 const EventLoop *event_loop)
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700423 : server_(server),
424 sock_(sock),
Alex Perry5f474f22020-02-01 12:14:24 -0800425 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800426 config_headers_(PackBuffer(config.span())),
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700427 event_loop_(event_loop) {
428 connection_.set_on_negotiation_needed([]() {
429 VLOG(1) << "Negotiation needed, not offering so not creating offer.";
430 });
Alex Perryb3b50792020-01-18 16:13:45 -0800431
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700432 connection_.set_on_local_candidate(
433 [this](struct rawrtc_peer_connection_ice_candidate *const candidate,
434 char const *const url) { LocalCandidate(candidate, url); });
435
436 connection_.set_on_data_channel(
437 [this](std::shared_ptr<ScopedDataChannel> channel) {
438 OnDataChannel(channel);
439 });
440
441 connection_.Open();
442}
443
444ApplicationConnection::~ApplicationConnection() {
445 for (auto &it : channels_) {
446 it.second.data_channel->Close();
447 it.second.data_channel = nullptr;
448 }
449
450 // Eh, we are done, tell the channel to shut down. If we didn't, it would
451 // just hang around until the connection closes, which is rather shortly
452 // after.
453 if (channel_) {
454 channel_->Close();
455 }
456}
457
458void ApplicationConnection::OnSdp(const char *sdp) {
459 struct rawrtc_peer_connection_description *remote_description = NULL;
460
461 auto error = rawrtc_peer_connection_description_create(
462 &remote_description, RAWRTC_SDP_TYPE_OFFER, sdp);
463 if (error) {
464 LOG(WARNING) << "Cannot parse remote description: "
465 << rawrtc_code_to_str(error);
James Kuszmaul48671362020-12-24 13:54:16 -0800466 return;
467 }
Alex Perryb3b50792020-01-18 16:13:45 -0800468
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700469 CHECK_RAWRTC(rawrtc_peer_connection_set_remote_description(
470 connection_.connection(), remote_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800471
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700472 struct rawrtc_peer_connection_description *local_description;
473 CHECK_RAWRTC(rawrtc_peer_connection_create_answer(&local_description,
474 connection_.connection()));
475 CHECK_RAWRTC(rawrtc_peer_connection_set_local_description(
476 connection_.connection(), local_description));
Alex Perryb3b50792020-01-18 16:13:45 -0800477
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700478 enum rawrtc_sdp_type type;
479 char *local_sdp = nullptr;
480 // Get SDP type & the SDP itself
481 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp_type(
482 &type, local_description));
483 CHECK_RAWRTC(rawrtc_peer_connection_description_get_sdp(&local_sdp,
484 local_description));
James Kuszmaul8d928d02020-12-25 17:47:49 -0800485
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700486 flatbuffers::FlatBufferBuilder fbb;
Alex Perryb3b50792020-01-18 16:13:45 -0800487 flatbuffers::Offset<WebSocketSdp> sdp_fb =
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700488 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800489 flatbuffers::Offset<WebSocketMessage> answer_message =
490 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700491
492 VLOG(1) << aos::FlatbufferToJson(
493 flatbuffers::GetTemporaryPointer(fbb, answer_message));
Alex Perryb3b50792020-01-18 16:13:45 -0800494 fbb.Finish(answer_message);
495
496 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700497 mem_deref(local_sdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800498}
499
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700500void ApplicationConnection::OnIce(const WebSocketIce *ice) {
501 if (!ice->has_candidate()) {
502 return;
503 }
Brian Silverman225c5072021-11-17 19:56:31 -0800504 uint8_t sdp_m_line_index = ice->sdp_m_line_index();
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700505
506 struct rawrtc_peer_connection_ice_candidate *ice_candidate = nullptr;
507 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_create(
Brian Silverman225c5072021-11-17 19:56:31 -0800508 &ice_candidate, ice->candidate()->c_str(), ice->sdp_mid()->c_str(),
509 &sdp_m_line_index, nullptr));
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700510
511 rawrtc_peer_connection_add_ice_candidate(connection_.connection(),
512 ice_candidate);
513
514 mem_deref(ice_candidate);
515}
516
517void ApplicationConnection::LocalCandidate(
518 struct rawrtc_peer_connection_ice_candidate *const candidate,
519 char const *const url) {
520 struct rawrtc_ice_candidate *ortc_candidate = nullptr;
521 if (candidate) {
522 CHECK_RAWRTC(rawrtc_peer_connection_ice_candidate_get_ortc_candidate(
523 &ortc_candidate, candidate));
524
525 flatbuffers::FlatBufferBuilder fbb;
526 char *sdpp = nullptr;
527 CHECK_RAWRTC(
528 rawrtc_peer_connection_ice_candidate_get_sdp(&sdpp, candidate));
529 char *midp = nullptr;
530 CHECK_RAWRTC(
531 rawrtc_peer_connection_ice_candidate_get_sdp_mid(&midp, candidate));
532
533 uint8_t media_line_index;
534 enum rawrtc_code error =
535 rawrtc_peer_connection_ice_candidate_get_sdp_media_line_index(
536 &media_line_index, candidate);
537
538 flatbuffers::Offset<flatbuffers::String> sdpp_offset =
539 fbb.CreateString(sdpp);
540 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
541 fbb.CreateString(midp);
542
543 WebSocketIce::Builder web_socket_ice_builder(fbb);
544
545 web_socket_ice_builder.add_candidate(sdpp_offset);
Brian Silverman225c5072021-11-17 19:56:31 -0800546 web_socket_ice_builder.add_sdp_mid(sdp_mid_offset);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700547
548 if (error == RAWRTC_CODE_SUCCESS) {
Brian Silverman225c5072021-11-17 19:56:31 -0800549 web_socket_ice_builder.add_sdp_m_line_index(media_line_index);
James Kuszmaul1ec74432020-07-30 20:26:45 -0700550 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700551 flatbuffers::Offset<WebSocketIce> ice_offset =
552 web_socket_ice_builder.Finish();
553
554 flatbuffers::Offset<WebSocketMessage> ice_message =
555 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_offset.Union());
556 VLOG(1) << url << ": "
557 << aos::FlatbufferToJson(
558 flatbuffers::GetTemporaryPointer(fbb, ice_message));
559 fbb.Finish(ice_message);
560
561 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
562
563 mem_deref(sdpp);
564 mem_deref(midp);
Alex Perry5f474f22020-02-01 12:14:24 -0800565 }
566}
567
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700568void ApplicationConnection::OnDataChannel(
569 std::shared_ptr<ScopedDataChannel> channel) {
570 if (channel->label() == std::string_view("signalling")) {
571 CHECK(!channel_);
572 channel_ = channel;
573
574 channel_->set_on_message(
575 [this](struct mbuf *const buffer,
576 const enum rawrtc_data_channel_message_flag flags) {
577 HandleSignallingData(buffer, flags);
578 });
579
580 channel_->set_on_open([this]() {
581 for (const auto &header : config_headers_) {
582 channel_->Send(header.buffer());
583 }
584 });
585
586 channel_->set_on_error([this]() { LOG(ERROR) << "Error on " << this; });
587
588 // Register an on_close callback which does nothing but keeps channel alive
589 // until it is done. This keeps the memory around until rawrtc can finish
590 // calling the close callback.
591 channel_->set_on_close([channel]() {});
592 } else {
593 channel_->set_on_close([channel]() {});
594 channel->Close();
595 }
596}
597
598void ApplicationConnection::HandleSignallingData(
599 struct mbuf *const
600 buffer, // nullable (in case partial delivery has been requested)
601 const enum rawrtc_data_channel_message_flag /*flags*/) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800602 FlatbufferSpan<SubscriberRequest> message(
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700603 {mbuf_buf(buffer), mbuf_get_left(buffer)});
James Kuszmaul71a81932020-12-15 21:08:01 -0800604 if (!message.Verify()) {
605 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
606 return;
607 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700608 VLOG(1) << "Got a subscription message "
James Kuszmaul71a81932020-12-15 21:08:01 -0800609 << aos::FlatbufferToJson(&message.message());
610 if (!message.message().has_channels_to_transfer()) {
611 LOG(ERROR) << "No channels requested for transfer.";
612 return;
613 }
Alex Perry5f474f22020-02-01 12:14:24 -0800614
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700615 // The client each time sends a full list of everything it wants to be
616 // subscribed to. It is our responsibility to remove channels which aren't
617 // in that list and add ones which need to be.
618 //
619 // Start by clearing a tracking bit on each channel. This takes O(number of
620 // open channels), which should be small.
621 //
622 // Then open any new channels. For any we visit which are already open,
623 // don't update those.
624 //
625 // Finally, iterate over the channel list and purge anything which we didn't
626 // touch.
627 for (auto &it : channels_) {
628 it.second.requested = false;
629 }
630 for (auto channel_request : *message.message().channels_to_transfer()) {
631 const Channel *channel = channel_request->channel();
632 if (channel == nullptr) {
633 LOG(ERROR) << "Got unpopulated channel.";
634 continue;
Alex Perry5f474f22020-02-01 12:14:24 -0800635 }
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700636 const TransferMethod transfer_method = channel_request->method();
637 // Call GetChannel() before comparing the channel name/type to each
638 // subscriber. This allows us to resolve any node or application
639 // specific mappings.
640 const Channel *comparison_channel =
641 configuration::GetChannel(event_loop_->configuration(), channel,
642 event_loop_->name(), event_loop_->node());
643 if (comparison_channel == nullptr) {
James Kuszmaul5e6aa252021-08-28 22:19:29 -0700644 LOG(ERROR) << "Channel does not exist: "
645 << configuration::StrippedChannelToString(channel);
646 continue;
647 }
648 if (!configuration::ChannelIsReadableOnNode(comparison_channel,
649 event_loop_->node())) {
650 LOG(ERROR) << "Channel not available on node "
651 << event_loop_->node()->name()->string_view() << ": "
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700652 << configuration::StrippedChannelToString(channel);
653 continue;
654 }
655
656 size_t channel_index = configuration::ChannelIndex(
657 event_loop_->configuration(), comparison_channel);
658
659 auto it = channels_.find(channel_index);
660 if (it == channels_.end()) {
661 std::shared_ptr<ScopedDataChannel> data_channel =
James Kuszmaula5822682021-12-23 18:39:28 -0800662 ScopedDataChannel::MakeDataChannel();
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700663
664 std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
665
666 data_channel->set_on_open([this, data_channel_weak_ptr, transfer_method,
667 channel_index]() {
668 std::shared_ptr<ScopedDataChannel> data_channel =
669 data_channel_weak_ptr.lock();
670 CHECK(data_channel) << ": Subscriber got destroyed before we started.";
James Kuszmaula5822682021-12-23 18:39:28 -0800671 // Raw pointer inside the subscriber so we don't have a circular
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700672 // reference. AddListener will close it.
Austin Schuh60e77942022-05-16 17:48:24 -0700673 subscribers_[channel_index]->AddListener(data_channel, transfer_method);
Austin Schuh52e5e3a2021-04-24 22:30:02 -0700674 });
675
676 Subscriber *subscriber = subscribers_[channel_index].get();
677 data_channel->set_on_close([subscriber, data_channel_weak_ptr]() {
678 std::shared_ptr<ScopedDataChannel> data_channel =
679 data_channel_weak_ptr.lock();
680 CHECK(data_channel) << ": Subscriber got destroyed before we finished.";
681 subscriber->RemoveListener(data_channel);
682 });
683
684 data_channel->Open(
685 connection_.connection(),
686 absl::StrCat(channel->name()->str(), "/", channel->type()->str()));
687
688 auto pair = channels_.insert({channel_index, {data_channel, true}});
689 it = pair.first;
690 }
691
692 it->second.requested = true;
693
694 VLOG(1) << "Subscribe to: " << channel->type()->str();
695 }
696
697 for (auto &it : channels_) {
698 if (!it.second.requested) {
699 it.second.data_channel->Close();
700 it.second.data_channel = nullptr;
Alex Perry5f474f22020-02-01 12:14:24 -0800701 }
702 }
Alex Perryb3b50792020-01-18 16:13:45 -0800703}
704
Stephan Pleinesf63bde82024-01-13 15:59:33 -0800705} // namespace aos::web_proxy