blob: af1b646508b434b0084e20002fafda3ec3b0c70d [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08007#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -08008#include "api/create_peerconnection_factory.h"
9#include "glog/logging.h"
10
11namespace aos {
12namespace web_proxy {
13
14namespace {
15// Based on webrtc examples. In our controlled environment we expect setting sdp
16// to always succeed, and we can't do anything about a failure, so just ignore
17// everything.
18class DummySetSessionDescriptionObserver
19 : public webrtc::SetSessionDescriptionObserver {
20 public:
21 static DummySetSessionDescriptionObserver *Create() {
22 return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>();
23 }
24 virtual void OnSuccess() {}
25 virtual void OnFailure(webrtc::RTCError error) {}
26};
27
28} // namespace
29
James Kuszmaul7ad91522020-09-01 19:15:35 -070030WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul71a81932020-12-15 21:08:01 -080031 aos::EventLoop *event_loop, int buffer_size)
James Kuszmaul7ad91522020-09-01 19:15:35 -070032 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080033 config_(aos::CopyFlatBuffer(event_loop->configuration())),
34 event_loop_(event_loop) {
35 const aos::Node *self = event_loop->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070036
37 for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) {
38 auto channel = event_loop->configuration()->channels()->Get(i);
39 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
40 auto fetcher = event_loop->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080041 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
42 std::move(fetcher), i, buffer_size));
James Kuszmaul7ad91522020-09-01 19:15:35 -070043 }
44 }
James Kuszmaul7ad91522020-09-01 19:15:35 -070045 TimerHandler *const timer = event_loop->AddTimer([this]() {
46 for (auto &subscriber : subscribers_) {
47 subscriber->RunIteration();
48 }
49 });
50
51 event_loop->OnRun([timer, event_loop]() {
52 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(100));
53 });
54}
Alex Perryb3b50792020-01-18 16:13:45 -080055
56void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
James Kuszmaul71a81932020-12-15 21:08:01 -080057 std::unique_ptr<Connection> conn = std::make_unique<Connection>(
58 sock, server_, subscribers_, config_, event_loop_);
Alex Perryb3b50792020-01-18 16:13:45 -080059 connections_.insert({sock, std::move(conn)});
60}
61
62void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
63 size_t size) {
64 connections_[sock]->HandleWebSocketData(data, size);
65}
66
67void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
68 connections_.erase(sock);
69}
70
James Kuszmaul71a81932020-12-15 21:08:01 -080071WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
72 : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
73
74WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
75 : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
76
77WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
78 int buffer_size)
79 : epoll_(epoll),
80 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
81 ::seasocks::Logger::Level::Info)),
82 websocket_handler_(
83 new WebsocketHandler(&server_, event_loop, buffer_size)) {
84 server_.addWebSocketHandler("/ws", websocket_handler_);
85 CHECK(server_.startListening(8080));
86
87 epoll->OnReadable(server_.fd(), [this]() {
88 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
89 });
90
91 if (&internal_epoll_ == epoll) {
92 TimerHandler *const timer = event_loop->AddTimer([this]() {
93 // Run the epoll poller until there are no more events (if we are being
94 // backed by a shm event loop, there won't be anything registered to
95 // internal_epoll_ and this will just return false).
96 // We just deal with clearing all the epoll events using a simulated
97 // timer. This does mean that we will spin rather than actually sleeping
98 // in any coherent manner, which will be particularly noticeable when past
99 // the end of processing other events.
100 while (internal_epoll_.Poll(false)) {
101 continue;
102 }
103 });
104
105 event_loop->OnRun([timer, event_loop]() {
106 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
107 });
108 }
109}
110
111WebProxy::~WebProxy() {
112 epoll_->DeleteFd(server_.fd());
113 server_.terminate();
114 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
115}
116
Alex Perry5f474f22020-02-01 12:14:24 -0800117void Subscriber::RunIteration() {
James Kuszmaul8d928d02020-12-25 17:47:49 -0800118 {
119 // Manage updating the channels_ map given the pending_* members from the
120 // *Listeners() methods.
121 // We handle all the removals first so that we correctly handle the
122 // situation where the user calls RemoveListener() and then AddListener()
123 // between calls to RunIteration(). The reverse order (adding and then
124 // removing before an update) is handled directly in RemoveListener() where
125 // we remove things from the pending_channels_ map directly.
126 MutexLocker lock(&mutex_);
127 for (const auto &channel : pending_removal_) {
128 channels_.erase(channel);
129 }
130 pending_removal_.clear();
131 for (const auto &channel : pending_channels_) {
132 channels_.insert(channel);
133 }
134 pending_channels_.clear();
135 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800136 if (channels_.empty() && buffer_size_ == 0) {
Alex Perry5f474f22020-02-01 12:14:24 -0800137 return;
138 }
139
James Kuszmaul71a81932020-12-15 21:08:01 -0800140 while (fetcher_->FetchNext()) {
141 // If we aren't building up a buffer, short-circuit the FetchNext().
142 if (buffer_size_ == 0) {
143 fetcher_->Fetch();
144 }
145 Message message;
146 message.index = fetcher_->context().queue_index;
147 VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
148 << "packets";
149 for (int packet_index = 0;
150 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
151 flatbuffers::Offset<MessageHeader> message_offset =
152 PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
153 fbb_.Finish(message_offset);
Alex Perry5f474f22020-02-01 12:14:24 -0800154
James Kuszmaul71a81932020-12-15 21:08:01 -0800155 const flatbuffers::DetachedBuffer buffer = fbb_.Release();
Alex Perry5f474f22020-02-01 12:14:24 -0800156
James Kuszmaul71a81932020-12-15 21:08:01 -0800157 message.data.emplace_back(
158 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
159 true /* binary array */);
160 }
161 message_buffer_.push_back(std::move(message));
162 }
163 for (auto &conn : channels_) {
164 rtc::scoped_refptr<webrtc::DataChannelInterface> rtc_channel = conn.first;
165 ChannelInformation *channel_data = &conn.second;
166 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
167 SkipToLastMessage(channel_data);
168 }
169 const webrtc::DataBuffer *buffer = NextBuffer(channel_data);
170 while (buffer != nullptr) {
171 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800172 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800173 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800174 }
James Kuszmaul8d928d02020-12-25 17:47:49 -0800175 // Call Send() from the signalling thread. The Invoke() call blocks until
176 // the handler has been called, so we do not need to handle any
177 // synchronization on this end. The body of the handler should be kept as
178 // short as possible to avoid blocking the signalling thread continuously
179 // for any longer than necessary.
180 channel_data->signaling_thread->Invoke<void>(
181 RTC_FROM_HERE,
182 [rtc_channel, buffer]() { rtc_channel->Send(*buffer); });
James Kuszmaul71a81932020-12-15 21:08:01 -0800183 buffer = NextBuffer(channel_data);
184 }
185 }
186 if (buffer_size_ >= 0) {
187 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
188 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800189 }
190 }
191}
192
193bool Subscriber::Compare(const Channel *channel) const {
Alex Perry22824d72020-02-29 17:11:43 -0800194 return channel->name()->string_view() ==
195 fetcher_->channel()->name()->string_view() &&
196 channel->type()->string_view() ==
197 fetcher_->channel()->type()->string_view();
Alex Perry5f474f22020-02-01 12:14:24 -0800198}
199
James Kuszmaul71a81932020-12-15 21:08:01 -0800200void Subscriber::AddListener(
201 rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
James Kuszmaul8d928d02020-12-25 17:47:49 -0800202 TransferMethod transfer_method, rtc::Thread *signaling_thread) {
203 MutexLocker lock(&mutex_);
James Kuszmaul71a81932020-12-15 21:08:01 -0800204 ChannelInformation info;
205 info.transfer_method = transfer_method;
James Kuszmaul8d928d02020-12-25 17:47:49 -0800206 info.signaling_thread = signaling_thread;
207 pending_channels_.emplace(channel, info);
James Kuszmaul71a81932020-12-15 21:08:01 -0800208}
209
210const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
211 CHECK_NOTNULL(channel);
212 if (message_buffer_.empty()) {
213 return nullptr;
214 }
215 const uint32_t earliest_index = message_buffer_.front().index;
216 const uint32_t latest_index = message_buffer_.back().index;
217 const bool fell_behind = channel->current_queue_index < earliest_index;
218 if (fell_behind) {
219 channel->current_queue_index = earliest_index;
220 channel->next_packet_number = 0;
221 return &message_buffer_.front().data.at(0);
222 }
223 if (channel->current_queue_index > latest_index) {
224 // We are still waiting on the next message to appear; return.
225 return nullptr;
226 }
227 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
228 << "Inconsistent queue indices.";
229 const size_t packets_in_message =
230 message_buffer_[channel->current_queue_index - earliest_index]
231 .data.size();
232 CHECK_LT(0u, packets_in_message);
233 CHECK_LT(channel->next_packet_number, packets_in_message);
234
235 const webrtc::DataBuffer *data =
236 &message_buffer_[channel->current_queue_index - earliest_index].data.at(
237 channel->next_packet_number);
238
239 ++channel->next_packet_number;
240 if (channel->next_packet_number == packets_in_message) {
241 ++channel->current_queue_index;
242 channel->next_packet_number = 0;
243 }
244
245 return data;
246}
247
248void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
249 CHECK_NOTNULL(channel);
250 if (message_buffer_.empty() ||
251 channel->current_queue_index == message_buffer_.back().index) {
252 return;
253 }
254 channel->current_queue_index = message_buffer_.back().index;
255 channel->next_packet_number = 0;
256}
257
258void Subscriber::RemoveListener(
259 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
James Kuszmaul8d928d02020-12-25 17:47:49 -0800260 MutexLocker lock(&mutex_);
261 pending_channels_.erase(channel);
262 pending_removal_.push_back(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -0800263}
264
Alex Perry5f474f22020-02-01 12:14:24 -0800265Connection::Connection(
266 ::seasocks::WebSocket *sock, ::seasocks::Server *server,
267 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800268 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
269 const EventLoop *event_loop)
Alex Perry5f474f22020-02-01 12:14:24 -0800270 : sock_(sock),
271 server_(server),
272 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800273 config_headers_(PackBuffer(config.span())),
274 event_loop_(event_loop) {}
Alex Perryb3b50792020-01-18 16:13:45 -0800275
James Kuszmaul71a81932020-12-15 21:08:01 -0800276// Function called for web socket data. Parses the flatbuffer and
277// handles it appropriately.
Alex Perryb3b50792020-01-18 16:13:45 -0800278void Connection::HandleWebSocketData(const uint8_t *data, size_t size) {
279 const WebSocketMessage *message =
280 flatbuffers::GetRoot<WebSocketMessage>(data);
281 switch (message->payload_type()) {
282 case Payload::WebSocketSdp: {
283 const WebSocketSdp *offer = message->payload_as_WebSocketSdp();
284 if (offer->type() != SdpType::OFFER) {
285 LOG(WARNING) << "Got the wrong sdp type from client";
286 break;
287 }
288 const flatbuffers::String *sdp = offer->payload();
289 webrtc::SdpParseError error;
290 std::unique_ptr<webrtc::SessionDescriptionInterface> desc =
291 CreateSessionDescription(webrtc::SdpType::kOffer, sdp->str(), &error);
292 if (!desc) {
293 LOG(WARNING) << "Failed to parse sdp description: "
294 << error.description;
295 // TODO(alex): send a message back to browser for failure.
296 break;
297 }
298
James Kuszmaul71a81932020-12-15 21:08:01 -0800299 // We can only start creating the PeerConnection once we have
300 // something to give it, so we wait until we get an offer before
301 // starting.
Alex Perryb3b50792020-01-18 16:13:45 -0800302 webrtc::PeerConnectionInterface::RTCConfiguration config;
303 config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
304 config.enable_dtls_srtp = true;
305
306 std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create();
307 signaling_thread->SetName("signaling_thread", nullptr);
308 signaling_thread->Start();
309
James Kuszmaul8d928d02020-12-25 17:47:49 -0800310 signaling_thread_ = signaling_thread.get();
311
Alex Perryb3b50792020-01-18 16:13:45 -0800312 webrtc::PeerConnectionFactoryDependencies factory_deps;
313 factory_deps.signaling_thread = signaling_thread.release();
314 rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
315 CreateModularPeerConnectionFactory(std::move(factory_deps));
316
317 peer_connection_ =
318 factory->CreatePeerConnection(config, nullptr, nullptr, this);
319
320 peer_connection_->SetRemoteDescription(
321 DummySetSessionDescriptionObserver::Create(), desc.release());
322
323 peer_connection_->CreateAnswer(
324 this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
325 break;
326 }
327 case Payload::WebSocketIce: {
328 const WebSocketIce *ice = message->payload_as_WebSocketIce();
329 std::string candidate = ice->candidate()->str();
330 std::string sdpMid = ice->sdpMid()->str();
331 int sdpMLineIndex = ice->sdpMLineIndex();
332 webrtc::SdpParseError error;
333 webrtc::IceCandidateInterface *ice_candidate =
334 webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error);
335 if (!ice_candidate) {
336 LOG(WARNING) << "Failed to parse ice candidate: " << error.description;
337 // TODO(alex): send a message back to browser for failure.
338 break;
339 }
340 peer_connection_->AddIceCandidate(ice_candidate);
341 break;
342 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800343 default: {
344 break;
345 }
Alex Perryb3b50792020-01-18 16:13:45 -0800346 }
347}
348
Alex Perry5f474f22020-02-01 12:14:24 -0800349void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
350 webrtc::DataBuffer data_buffer(
351 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
352 true /* binary array */);
Alex Perry3dfcb812020-03-04 19:32:17 -0800353 VLOG(1) << "Sending " << buffer.size() << "bytes to a client";
Alex Perry5f474f22020-02-01 12:14:24 -0800354 data_channel_->Send(data_buffer);
355}
356
Alex Perryb3b50792020-01-18 16:13:45 -0800357void Connection::OnDataChannel(
358 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
359 data_channel_ = channel;
360 data_channel_->RegisterObserver(this);
361}
362
363void Connection::OnIceCandidate(
364 const webrtc::IceCandidateInterface *candidate) {
365 flatbuffers::FlatBufferBuilder fbb(512);
366 std::string ice_string;
367 candidate->ToString(&ice_string);
368
369 flatbuffers::Offset<WebSocketIce> ice_fb = CreateWebSocketIceDirect(
370 fbb, ice_string.c_str(), candidate->sdp_mid().c_str(),
371 candidate->sdp_mline_index());
372 flatbuffers::Offset<WebSocketMessage> ice_message =
373 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
374 fbb.Finish(ice_message);
375
376 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
377}
378
379// This is the callback for creating an sdp. We have to manually assign it
380// locally and send it to the client.
381void Connection::OnSuccess(webrtc::SessionDescriptionInterface *desc) {
382 peer_connection_->SetLocalDescription(
383 DummySetSessionDescriptionObserver::Create(), desc);
384 flatbuffers::FlatBufferBuilder fbb(512);
385 std::string answer_string;
386 desc->ToString(&answer_string);
387 flatbuffers::Offset<WebSocketSdp> sdp_fb =
388 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, answer_string.c_str());
389 flatbuffers::Offset<WebSocketMessage> answer_message =
390 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
391 fbb.Finish(answer_message);
392
393 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
394}
395
Alex Perry5f474f22020-02-01 12:14:24 -0800396// Wait until the data channel is ready for data before sending the config.
397void Connection::OnStateChange() {
398 if (peer_connection_.get() != nullptr &&
399 data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800400 for (const auto &header : config_headers_) {
James Kuszmaul1ec74432020-07-30 20:26:45 -0700401 Send(header.buffer());
402 }
Alex Perry5f474f22020-02-01 12:14:24 -0800403 }
404}
405
406// Handle DataChannel messages. Subscribe to each listener that matches the
407// subscribe message
Alex Perryb3b50792020-01-18 16:13:45 -0800408void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
James Kuszmaul8d928d02020-12-25 17:47:49 -0800409 // Sanity check--we are relying on the Add/RemoveListener calls being made
410 // from the correct thread.
411 CHECK(signaling_thread_->IsCurrent());
James Kuszmaul71a81932020-12-15 21:08:01 -0800412 FlatbufferSpan<SubscriberRequest> message(
413 {buffer.data.data(), buffer.data.size()});
414 if (!message.Verify()) {
415 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
416 return;
417 }
418 VLOG(2) << "Got a subscription message "
419 << aos::FlatbufferToJson(&message.message());
420 if (!message.message().has_channels_to_transfer()) {
421 LOG(ERROR) << "No channels requested for transfer.";
422 return;
423 }
Alex Perry5f474f22020-02-01 12:14:24 -0800424 for (auto &subscriber : subscribers_) {
425 bool found_match = false;
James Kuszmaul71a81932020-12-15 21:08:01 -0800426 for (auto channel_request : *message.message().channels_to_transfer()) {
427 const Channel *channel = channel_request->channel();
428 if (channel == nullptr) {
429 LOG(ERROR) << "Got unpopulated channel.";
430 continue;
431 }
432 const TransferMethod transfer_method = channel_request->method();
433 // Call GetChannel() before comparing the channel name/type to each
434 // subscriber. This allows us to resolve any node or application specific
435 // mappings.
436 const Channel *comparison_channel =
437 configuration::GetChannel(event_loop_->configuration(), channel,
438 event_loop_->name(), event_loop_->node());
439 if (comparison_channel == nullptr) {
440 LOG(ERROR) << "Channel not available: "
441 << configuration::StrippedChannelToString(channel);
442 continue;
443 }
444 if (subscriber->Compare(comparison_channel)) {
Alex Perry5f474f22020-02-01 12:14:24 -0800445 int index = subscriber->index();
446 auto it = channels_.find(index);
447 if (it == channels_.end()) {
448 auto pair = channels_.insert(
449 {index, peer_connection_->CreateDataChannel(
450 channel->name()->str() + "/" + channel->type()->str(),
451 nullptr)});
452 it = pair.first;
453 }
James Kuszmaul8d928d02020-12-25 17:47:49 -0800454 subscriber->AddListener(it->second, transfer_method, signaling_thread_);
Alex Perry5f474f22020-02-01 12:14:24 -0800455
456 VLOG(1) << "Subscribe to: " << channel->type()->str();
457 found_match = true;
458 break;
459 }
460 }
461 if (!found_match) {
462 int index = subscriber->index();
463 auto it = channels_.find(index);
464 subscriber->RemoveListener(it->second);
465 }
466 }
Alex Perryb3b50792020-01-18 16:13:45 -0800467}
468
469} // namespace web_proxy
470} // namespace aos