blob: 5902d30d87aebfb49285a7468b2f007f4e21cc91 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08007#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -08008#include "api/create_peerconnection_factory.h"
9#include "glog/logging.h"
James Kuszmaul48671362020-12-24 13:54:16 -080010#include "internal/Embedded.h"
Alex Perryb3b50792020-01-18 16:13:45 -080011
12namespace aos {
13namespace web_proxy {
14
15namespace {
16// Based on webrtc examples. In our controlled environment we expect setting sdp
17// to always succeed, and we can't do anything about a failure, so just ignore
18// everything.
19class DummySetSessionDescriptionObserver
20 : public webrtc::SetSessionDescriptionObserver {
21 public:
22 static DummySetSessionDescriptionObserver *Create() {
23 return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>();
24 }
25 virtual void OnSuccess() {}
James Kuszmaul48671362020-12-24 13:54:16 -080026 virtual void OnFailure(webrtc::RTCError /*error*/) {}
Alex Perryb3b50792020-01-18 16:13:45 -080027};
28
29} // namespace
30
James Kuszmaul7ad91522020-09-01 19:15:35 -070031WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul71a81932020-12-15 21:08:01 -080032 aos::EventLoop *event_loop, int buffer_size)
James Kuszmaul7ad91522020-09-01 19:15:35 -070033 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080034 config_(aos::CopyFlatBuffer(event_loop->configuration())),
35 event_loop_(event_loop) {
James Kuszmaul48671362020-12-24 13:54:16 -080036 // We need to reference findEmbeddedContent() to make the linker happy...
37 findEmbeddedContent("");
James Kuszmaul71a81932020-12-15 21:08:01 -080038 const aos::Node *self = event_loop->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070039
40 for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) {
41 auto channel = event_loop->configuration()->channels()->Get(i);
42 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
43 auto fetcher = event_loop->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080044 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
45 std::move(fetcher), i, buffer_size));
James Kuszmaul7ad91522020-09-01 19:15:35 -070046 }
47 }
James Kuszmaul7ad91522020-09-01 19:15:35 -070048 TimerHandler *const timer = event_loop->AddTimer([this]() {
49 for (auto &subscriber : subscribers_) {
50 subscriber->RunIteration();
51 }
52 });
53
54 event_loop->OnRun([timer, event_loop]() {
55 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(100));
56 });
57}
Alex Perryb3b50792020-01-18 16:13:45 -080058
59void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
James Kuszmaul71a81932020-12-15 21:08:01 -080060 std::unique_ptr<Connection> conn = std::make_unique<Connection>(
61 sock, server_, subscribers_, config_, event_loop_);
Alex Perryb3b50792020-01-18 16:13:45 -080062 connections_.insert({sock, std::move(conn)});
63}
64
65void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
66 size_t size) {
67 connections_[sock]->HandleWebSocketData(data, size);
68}
69
70void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
71 connections_.erase(sock);
72}
73
James Kuszmaul71a81932020-12-15 21:08:01 -080074WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
75 : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
76
77WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
78 : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
79
80WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
81 int buffer_size)
82 : epoll_(epoll),
83 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
84 ::seasocks::Logger::Level::Info)),
85 websocket_handler_(
86 new WebsocketHandler(&server_, event_loop, buffer_size)) {
87 server_.addWebSocketHandler("/ws", websocket_handler_);
88 CHECK(server_.startListening(8080));
89
90 epoll->OnReadable(server_.fd(), [this]() {
91 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
92 });
93
94 if (&internal_epoll_ == epoll) {
95 TimerHandler *const timer = event_loop->AddTimer([this]() {
96 // Run the epoll poller until there are no more events (if we are being
97 // backed by a shm event loop, there won't be anything registered to
98 // internal_epoll_ and this will just return false).
99 // We just deal with clearing all the epoll events using a simulated
100 // timer. This does mean that we will spin rather than actually sleeping
101 // in any coherent manner, which will be particularly noticeable when past
102 // the end of processing other events.
103 while (internal_epoll_.Poll(false)) {
104 continue;
105 }
106 });
107
108 event_loop->OnRun([timer, event_loop]() {
109 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
110 });
111 }
112}
113
114WebProxy::~WebProxy() {
115 epoll_->DeleteFd(server_.fd());
116 server_.terminate();
117 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
118}
119
Alex Perry5f474f22020-02-01 12:14:24 -0800120void Subscriber::RunIteration() {
James Kuszmaul8d928d02020-12-25 17:47:49 -0800121 {
122 // Manage updating the channels_ map given the pending_* members from the
123 // *Listeners() methods.
124 // We handle all the removals first so that we correctly handle the
125 // situation where the user calls RemoveListener() and then AddListener()
126 // between calls to RunIteration(). The reverse order (adding and then
127 // removing before an update) is handled directly in RemoveListener() where
128 // we remove things from the pending_channels_ map directly.
129 MutexLocker lock(&mutex_);
130 for (const auto &channel : pending_removal_) {
131 channels_.erase(channel);
132 }
133 pending_removal_.clear();
134 for (const auto &channel : pending_channels_) {
135 channels_.insert(channel);
136 }
137 pending_channels_.clear();
138 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800139 if (channels_.empty() && buffer_size_ == 0) {
Alex Perry5f474f22020-02-01 12:14:24 -0800140 return;
141 }
142
James Kuszmaul71a81932020-12-15 21:08:01 -0800143 while (fetcher_->FetchNext()) {
144 // If we aren't building up a buffer, short-circuit the FetchNext().
145 if (buffer_size_ == 0) {
146 fetcher_->Fetch();
147 }
148 Message message;
149 message.index = fetcher_->context().queue_index;
150 VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
151 << "packets";
152 for (int packet_index = 0;
153 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
154 flatbuffers::Offset<MessageHeader> message_offset =
155 PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
156 fbb_.Finish(message_offset);
Alex Perry5f474f22020-02-01 12:14:24 -0800157
James Kuszmaul71a81932020-12-15 21:08:01 -0800158 const flatbuffers::DetachedBuffer buffer = fbb_.Release();
Alex Perry5f474f22020-02-01 12:14:24 -0800159
James Kuszmaul71a81932020-12-15 21:08:01 -0800160 message.data.emplace_back(
161 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
162 true /* binary array */);
163 }
164 message_buffer_.push_back(std::move(message));
165 }
166 for (auto &conn : channels_) {
167 rtc::scoped_refptr<webrtc::DataChannelInterface> rtc_channel = conn.first;
168 ChannelInformation *channel_data = &conn.second;
169 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
170 SkipToLastMessage(channel_data);
171 }
172 const webrtc::DataBuffer *buffer = NextBuffer(channel_data);
173 while (buffer != nullptr) {
174 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800175 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800176 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800177 }
James Kuszmaul8d928d02020-12-25 17:47:49 -0800178 // Call Send() from the signalling thread. The Invoke() call blocks until
179 // the handler has been called, so we do not need to handle any
180 // synchronization on this end. The body of the handler should be kept as
181 // short as possible to avoid blocking the signalling thread continuously
182 // for any longer than necessary.
183 channel_data->signaling_thread->Invoke<void>(
184 RTC_FROM_HERE,
185 [rtc_channel, buffer]() { rtc_channel->Send(*buffer); });
James Kuszmaul71a81932020-12-15 21:08:01 -0800186 buffer = NextBuffer(channel_data);
187 }
188 }
189 if (buffer_size_ >= 0) {
190 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
191 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800192 }
193 }
194}
195
196bool Subscriber::Compare(const Channel *channel) const {
Alex Perry22824d72020-02-29 17:11:43 -0800197 return channel->name()->string_view() ==
198 fetcher_->channel()->name()->string_view() &&
199 channel->type()->string_view() ==
200 fetcher_->channel()->type()->string_view();
Alex Perry5f474f22020-02-01 12:14:24 -0800201}
202
James Kuszmaul71a81932020-12-15 21:08:01 -0800203void Subscriber::AddListener(
204 rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
James Kuszmaul8d928d02020-12-25 17:47:49 -0800205 TransferMethod transfer_method, rtc::Thread *signaling_thread) {
206 MutexLocker lock(&mutex_);
James Kuszmaul71a81932020-12-15 21:08:01 -0800207 ChannelInformation info;
208 info.transfer_method = transfer_method;
James Kuszmaul8d928d02020-12-25 17:47:49 -0800209 info.signaling_thread = signaling_thread;
210 pending_channels_.emplace(channel, info);
James Kuszmaul71a81932020-12-15 21:08:01 -0800211}
212
213const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
214 CHECK_NOTNULL(channel);
215 if (message_buffer_.empty()) {
216 return nullptr;
217 }
218 const uint32_t earliest_index = message_buffer_.front().index;
219 const uint32_t latest_index = message_buffer_.back().index;
220 const bool fell_behind = channel->current_queue_index < earliest_index;
221 if (fell_behind) {
222 channel->current_queue_index = earliest_index;
223 channel->next_packet_number = 0;
224 return &message_buffer_.front().data.at(0);
225 }
226 if (channel->current_queue_index > latest_index) {
227 // We are still waiting on the next message to appear; return.
228 return nullptr;
229 }
230 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
231 << "Inconsistent queue indices.";
232 const size_t packets_in_message =
233 message_buffer_[channel->current_queue_index - earliest_index]
234 .data.size();
235 CHECK_LT(0u, packets_in_message);
236 CHECK_LT(channel->next_packet_number, packets_in_message);
237
238 const webrtc::DataBuffer *data =
239 &message_buffer_[channel->current_queue_index - earliest_index].data.at(
240 channel->next_packet_number);
241
242 ++channel->next_packet_number;
243 if (channel->next_packet_number == packets_in_message) {
244 ++channel->current_queue_index;
245 channel->next_packet_number = 0;
246 }
247
248 return data;
249}
250
251void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
252 CHECK_NOTNULL(channel);
253 if (message_buffer_.empty() ||
254 channel->current_queue_index == message_buffer_.back().index) {
255 return;
256 }
257 channel->current_queue_index = message_buffer_.back().index;
258 channel->next_packet_number = 0;
259}
260
261void Subscriber::RemoveListener(
262 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
James Kuszmaul8d928d02020-12-25 17:47:49 -0800263 MutexLocker lock(&mutex_);
264 pending_channels_.erase(channel);
265 pending_removal_.push_back(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -0800266}
267
Alex Perry5f474f22020-02-01 12:14:24 -0800268Connection::Connection(
269 ::seasocks::WebSocket *sock, ::seasocks::Server *server,
270 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800271 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
272 const EventLoop *event_loop)
Alex Perry5f474f22020-02-01 12:14:24 -0800273 : sock_(sock),
274 server_(server),
275 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800276 config_headers_(PackBuffer(config.span())),
277 event_loop_(event_loop) {}
Alex Perryb3b50792020-01-18 16:13:45 -0800278
James Kuszmaul71a81932020-12-15 21:08:01 -0800279// Function called for web socket data. Parses the flatbuffer and
280// handles it appropriately.
Alex Perryb3b50792020-01-18 16:13:45 -0800281void Connection::HandleWebSocketData(const uint8_t *data, size_t size) {
James Kuszmaul48671362020-12-24 13:54:16 -0800282 const FlatbufferSpan<WebSocketMessage> message({data, size});
283 if (!message.Verify()) {
284 LOG(ERROR) << "Invalid WebsocketMessage received from browser.";
285 return;
286 }
287 switch (message.message().payload_type()) {
Alex Perryb3b50792020-01-18 16:13:45 -0800288 case Payload::WebSocketSdp: {
James Kuszmaul48671362020-12-24 13:54:16 -0800289 const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp();
Alex Perryb3b50792020-01-18 16:13:45 -0800290 if (offer->type() != SdpType::OFFER) {
291 LOG(WARNING) << "Got the wrong sdp type from client";
292 break;
293 }
294 const flatbuffers::String *sdp = offer->payload();
295 webrtc::SdpParseError error;
296 std::unique_ptr<webrtc::SessionDescriptionInterface> desc =
297 CreateSessionDescription(webrtc::SdpType::kOffer, sdp->str(), &error);
298 if (!desc) {
299 LOG(WARNING) << "Failed to parse sdp description: "
300 << error.description;
301 // TODO(alex): send a message back to browser for failure.
302 break;
303 }
304
James Kuszmaul71a81932020-12-15 21:08:01 -0800305 // We can only start creating the PeerConnection once we have
306 // something to give it, so we wait until we get an offer before
307 // starting.
Alex Perryb3b50792020-01-18 16:13:45 -0800308 webrtc::PeerConnectionInterface::RTCConfiguration config;
309 config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
310 config.enable_dtls_srtp = true;
311
312 std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create();
313 signaling_thread->SetName("signaling_thread", nullptr);
314 signaling_thread->Start();
315
James Kuszmaul8d928d02020-12-25 17:47:49 -0800316 signaling_thread_ = signaling_thread.get();
317
Alex Perryb3b50792020-01-18 16:13:45 -0800318 webrtc::PeerConnectionFactoryDependencies factory_deps;
319 factory_deps.signaling_thread = signaling_thread.release();
320 rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
321 CreateModularPeerConnectionFactory(std::move(factory_deps));
322
323 peer_connection_ =
324 factory->CreatePeerConnection(config, nullptr, nullptr, this);
325
326 peer_connection_->SetRemoteDescription(
327 DummySetSessionDescriptionObserver::Create(), desc.release());
328
329 peer_connection_->CreateAnswer(
330 this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
331 break;
332 }
333 case Payload::WebSocketIce: {
James Kuszmaul48671362020-12-24 13:54:16 -0800334 const WebSocketIce *ice = message.message().payload_as_WebSocketIce();
Alex Perryb3b50792020-01-18 16:13:45 -0800335 std::string candidate = ice->candidate()->str();
336 std::string sdpMid = ice->sdpMid()->str();
337 int sdpMLineIndex = ice->sdpMLineIndex();
338 webrtc::SdpParseError error;
339 webrtc::IceCandidateInterface *ice_candidate =
340 webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error);
341 if (!ice_candidate) {
342 LOG(WARNING) << "Failed to parse ice candidate: " << error.description;
343 // TODO(alex): send a message back to browser for failure.
344 break;
345 }
346 peer_connection_->AddIceCandidate(ice_candidate);
347 break;
348 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800349 default: {
350 break;
351 }
Alex Perryb3b50792020-01-18 16:13:45 -0800352 }
353}
354
Alex Perry5f474f22020-02-01 12:14:24 -0800355void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
356 webrtc::DataBuffer data_buffer(
357 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
358 true /* binary array */);
Alex Perry3dfcb812020-03-04 19:32:17 -0800359 VLOG(1) << "Sending " << buffer.size() << "bytes to a client";
Alex Perry5f474f22020-02-01 12:14:24 -0800360 data_channel_->Send(data_buffer);
361}
362
Alex Perryb3b50792020-01-18 16:13:45 -0800363void Connection::OnDataChannel(
364 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
365 data_channel_ = channel;
366 data_channel_->RegisterObserver(this);
367}
368
369void Connection::OnIceCandidate(
370 const webrtc::IceCandidateInterface *candidate) {
371 flatbuffers::FlatBufferBuilder fbb(512);
372 std::string ice_string;
373 candidate->ToString(&ice_string);
374
375 flatbuffers::Offset<WebSocketIce> ice_fb = CreateWebSocketIceDirect(
376 fbb, ice_string.c_str(), candidate->sdp_mid().c_str(),
377 candidate->sdp_mline_index());
378 flatbuffers::Offset<WebSocketMessage> ice_message =
379 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
380 fbb.Finish(ice_message);
381
382 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
383}
384
385// This is the callback for creating an sdp. We have to manually assign it
386// locally and send it to the client.
387void Connection::OnSuccess(webrtc::SessionDescriptionInterface *desc) {
388 peer_connection_->SetLocalDescription(
389 DummySetSessionDescriptionObserver::Create(), desc);
390 flatbuffers::FlatBufferBuilder fbb(512);
391 std::string answer_string;
392 desc->ToString(&answer_string);
393 flatbuffers::Offset<WebSocketSdp> sdp_fb =
394 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, answer_string.c_str());
395 flatbuffers::Offset<WebSocketMessage> answer_message =
396 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
397 fbb.Finish(answer_message);
398
399 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
400}
401
Alex Perry5f474f22020-02-01 12:14:24 -0800402// Wait until the data channel is ready for data before sending the config.
403void Connection::OnStateChange() {
404 if (peer_connection_.get() != nullptr &&
405 data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800406 for (const auto &header : config_headers_) {
James Kuszmaul1ec74432020-07-30 20:26:45 -0700407 Send(header.buffer());
408 }
Alex Perry5f474f22020-02-01 12:14:24 -0800409 }
410}
411
412// Handle DataChannel messages. Subscribe to each listener that matches the
413// subscribe message
Alex Perryb3b50792020-01-18 16:13:45 -0800414void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
James Kuszmaul8d928d02020-12-25 17:47:49 -0800415 // Sanity check--we are relying on the Add/RemoveListener calls being made
416 // from the correct thread.
417 CHECK(signaling_thread_->IsCurrent());
James Kuszmaul71a81932020-12-15 21:08:01 -0800418 FlatbufferSpan<SubscriberRequest> message(
419 {buffer.data.data(), buffer.data.size()});
420 if (!message.Verify()) {
421 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
422 return;
423 }
424 VLOG(2) << "Got a subscription message "
425 << aos::FlatbufferToJson(&message.message());
426 if (!message.message().has_channels_to_transfer()) {
427 LOG(ERROR) << "No channels requested for transfer.";
428 return;
429 }
Alex Perry5f474f22020-02-01 12:14:24 -0800430 for (auto &subscriber : subscribers_) {
431 bool found_match = false;
James Kuszmaul71a81932020-12-15 21:08:01 -0800432 for (auto channel_request : *message.message().channels_to_transfer()) {
433 const Channel *channel = channel_request->channel();
434 if (channel == nullptr) {
435 LOG(ERROR) << "Got unpopulated channel.";
436 continue;
437 }
438 const TransferMethod transfer_method = channel_request->method();
439 // Call GetChannel() before comparing the channel name/type to each
440 // subscriber. This allows us to resolve any node or application specific
441 // mappings.
442 const Channel *comparison_channel =
443 configuration::GetChannel(event_loop_->configuration(), channel,
444 event_loop_->name(), event_loop_->node());
445 if (comparison_channel == nullptr) {
446 LOG(ERROR) << "Channel not available: "
447 << configuration::StrippedChannelToString(channel);
448 continue;
449 }
450 if (subscriber->Compare(comparison_channel)) {
Alex Perry5f474f22020-02-01 12:14:24 -0800451 int index = subscriber->index();
452 auto it = channels_.find(index);
453 if (it == channels_.end()) {
454 auto pair = channels_.insert(
455 {index, peer_connection_->CreateDataChannel(
456 channel->name()->str() + "/" + channel->type()->str(),
457 nullptr)});
458 it = pair.first;
459 }
James Kuszmaul8d928d02020-12-25 17:47:49 -0800460 subscriber->AddListener(it->second, transfer_method, signaling_thread_);
Alex Perry5f474f22020-02-01 12:14:24 -0800461
462 VLOG(1) << "Subscribe to: " << channel->type()->str();
463 found_match = true;
464 break;
465 }
466 }
467 if (!found_match) {
468 int index = subscriber->index();
469 auto it = channels_.find(index);
470 subscriber->RemoveListener(it->second);
471 }
472 }
Alex Perryb3b50792020-01-18 16:13:45 -0800473}
474
475} // namespace web_proxy
476} // namespace aos