blob: 7c34b228433177d4aa34cd0e48c93c377d825112 [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
James Kuszmaul71a81932020-12-15 21:08:01 -08007#include "aos/seasocks/seasocks_logger.h"
Alex Perryb3b50792020-01-18 16:13:45 -08008#include "api/create_peerconnection_factory.h"
9#include "glog/logging.h"
10
11namespace aos {
12namespace web_proxy {
13
14namespace {
15// Based on webrtc examples. In our controlled environment we expect setting sdp
16// to always succeed, and we can't do anything about a failure, so just ignore
17// everything.
18class DummySetSessionDescriptionObserver
19 : public webrtc::SetSessionDescriptionObserver {
20 public:
21 static DummySetSessionDescriptionObserver *Create() {
22 return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>();
23 }
24 virtual void OnSuccess() {}
25 virtual void OnFailure(webrtc::RTCError error) {}
26};
27
28} // namespace
29
James Kuszmaul7ad91522020-09-01 19:15:35 -070030WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
James Kuszmaul71a81932020-12-15 21:08:01 -080031 aos::EventLoop *event_loop, int buffer_size)
James Kuszmaul7ad91522020-09-01 19:15:35 -070032 : server_(server),
James Kuszmaul71a81932020-12-15 21:08:01 -080033 config_(aos::CopyFlatBuffer(event_loop->configuration())),
34 event_loop_(event_loop) {
35 const aos::Node *self = event_loop->node();
James Kuszmaul7ad91522020-09-01 19:15:35 -070036
37 for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) {
38 auto channel = event_loop->configuration()->channels()->Get(i);
39 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
40 auto fetcher = event_loop->MakeRawFetcher(channel);
James Kuszmaul71a81932020-12-15 21:08:01 -080041 subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>(
42 std::move(fetcher), i, buffer_size));
James Kuszmaul7ad91522020-09-01 19:15:35 -070043 }
44 }
James Kuszmaul7ad91522020-09-01 19:15:35 -070045 TimerHandler *const timer = event_loop->AddTimer([this]() {
46 for (auto &subscriber : subscribers_) {
47 subscriber->RunIteration();
48 }
49 });
50
51 event_loop->OnRun([timer, event_loop]() {
52 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(100));
53 });
54}
Alex Perryb3b50792020-01-18 16:13:45 -080055
56void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
James Kuszmaul71a81932020-12-15 21:08:01 -080057 std::unique_ptr<Connection> conn = std::make_unique<Connection>(
58 sock, server_, subscribers_, config_, event_loop_);
Alex Perryb3b50792020-01-18 16:13:45 -080059 connections_.insert({sock, std::move(conn)});
60}
61
62void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
63 size_t size) {
64 connections_[sock]->HandleWebSocketData(data, size);
65}
66
67void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
68 connections_.erase(sock);
69}
70
James Kuszmaul71a81932020-12-15 21:08:01 -080071WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size)
72 : WebProxy(event_loop, &internal_epoll_, buffer_size) {}
73
74WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size)
75 : WebProxy(event_loop, event_loop->epoll(), buffer_size) {}
76
77WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll,
78 int buffer_size)
79 : epoll_(epoll),
80 server_(std::make_shared<aos::seasocks::SeasocksLogger>(
81 ::seasocks::Logger::Level::Info)),
82 websocket_handler_(
83 new WebsocketHandler(&server_, event_loop, buffer_size)) {
84 server_.addWebSocketHandler("/ws", websocket_handler_);
85 CHECK(server_.startListening(8080));
86
87 epoll->OnReadable(server_.fd(), [this]() {
88 CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0));
89 });
90
91 if (&internal_epoll_ == epoll) {
92 TimerHandler *const timer = event_loop->AddTimer([this]() {
93 // Run the epoll poller until there are no more events (if we are being
94 // backed by a shm event loop, there won't be anything registered to
95 // internal_epoll_ and this will just return false).
96 // We just deal with clearing all the epoll events using a simulated
97 // timer. This does mean that we will spin rather than actually sleeping
98 // in any coherent manner, which will be particularly noticeable when past
99 // the end of processing other events.
100 while (internal_epoll_.Poll(false)) {
101 continue;
102 }
103 });
104
105 event_loop->OnRun([timer, event_loop]() {
106 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10));
107 });
108 }
109}
110
111WebProxy::~WebProxy() {
112 epoll_->DeleteFd(server_.fd());
113 server_.terminate();
114 CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0));
115}
116
Alex Perry5f474f22020-02-01 12:14:24 -0800117void Subscriber::RunIteration() {
James Kuszmaul71a81932020-12-15 21:08:01 -0800118 // TODO(james): The channels_ struct gets accessed here, but modified by
119 // Add/RemoveListener, which are called from separate threads.
120 if (channels_.empty() && buffer_size_ == 0) {
Alex Perry5f474f22020-02-01 12:14:24 -0800121 return;
122 }
123
James Kuszmaul71a81932020-12-15 21:08:01 -0800124 while (fetcher_->FetchNext()) {
125 // If we aren't building up a buffer, short-circuit the FetchNext().
126 if (buffer_size_ == 0) {
127 fetcher_->Fetch();
128 }
129 Message message;
130 message.index = fetcher_->context().queue_index;
131 VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context())
132 << "packets";
133 for (int packet_index = 0;
134 packet_index < GetPacketCount(fetcher_->context()); ++packet_index) {
135 flatbuffers::Offset<MessageHeader> message_offset =
136 PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
137 fbb_.Finish(message_offset);
Alex Perry5f474f22020-02-01 12:14:24 -0800138
James Kuszmaul71a81932020-12-15 21:08:01 -0800139 const flatbuffers::DetachedBuffer buffer = fbb_.Release();
Alex Perry5f474f22020-02-01 12:14:24 -0800140
James Kuszmaul71a81932020-12-15 21:08:01 -0800141 message.data.emplace_back(
142 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
143 true /* binary array */);
144 }
145 message_buffer_.push_back(std::move(message));
146 }
147 for (auto &conn : channels_) {
148 rtc::scoped_refptr<webrtc::DataChannelInterface> rtc_channel = conn.first;
149 ChannelInformation *channel_data = &conn.second;
150 if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
151 SkipToLastMessage(channel_data);
152 }
153 const webrtc::DataBuffer *buffer = NextBuffer(channel_data);
154 while (buffer != nullptr) {
155 if (rtc_channel->buffered_amount() > 14000000) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800156 VLOG(1) << "skipping a send because buffered amount is too high";
James Kuszmaul71a81932020-12-15 21:08:01 -0800157 break;
Alex Perry3dfcb812020-03-04 19:32:17 -0800158 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800159 // TODO(james): This Send() should be called from the signalling_thread
160 // created by the Connection.
161 rtc_channel->Send(*buffer);
162 buffer = NextBuffer(channel_data);
163 }
164 }
165 if (buffer_size_ >= 0) {
166 while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) {
167 message_buffer_.pop_front();
Alex Perry5f474f22020-02-01 12:14:24 -0800168 }
169 }
170}
171
172bool Subscriber::Compare(const Channel *channel) const {
Alex Perry22824d72020-02-29 17:11:43 -0800173 return channel->name()->string_view() ==
174 fetcher_->channel()->name()->string_view() &&
175 channel->type()->string_view() ==
176 fetcher_->channel()->type()->string_view();
Alex Perry5f474f22020-02-01 12:14:24 -0800177}
178
James Kuszmaul71a81932020-12-15 21:08:01 -0800179void Subscriber::AddListener(
180 rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
181 TransferMethod transfer_method) {
182 ChannelInformation info;
183 info.transfer_method = transfer_method;
184 channels_.emplace(channel, info);
185}
186
187const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
188 CHECK_NOTNULL(channel);
189 if (message_buffer_.empty()) {
190 return nullptr;
191 }
192 const uint32_t earliest_index = message_buffer_.front().index;
193 const uint32_t latest_index = message_buffer_.back().index;
194 const bool fell_behind = channel->current_queue_index < earliest_index;
195 if (fell_behind) {
196 channel->current_queue_index = earliest_index;
197 channel->next_packet_number = 0;
198 return &message_buffer_.front().data.at(0);
199 }
200 if (channel->current_queue_index > latest_index) {
201 // We are still waiting on the next message to appear; return.
202 return nullptr;
203 }
204 CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
205 << "Inconsistent queue indices.";
206 const size_t packets_in_message =
207 message_buffer_[channel->current_queue_index - earliest_index]
208 .data.size();
209 CHECK_LT(0u, packets_in_message);
210 CHECK_LT(channel->next_packet_number, packets_in_message);
211
212 const webrtc::DataBuffer *data =
213 &message_buffer_[channel->current_queue_index - earliest_index].data.at(
214 channel->next_packet_number);
215
216 ++channel->next_packet_number;
217 if (channel->next_packet_number == packets_in_message) {
218 ++channel->current_queue_index;
219 channel->next_packet_number = 0;
220 }
221
222 return data;
223}
224
225void Subscriber::SkipToLastMessage(ChannelInformation *channel) {
226 CHECK_NOTNULL(channel);
227 if (message_buffer_.empty() ||
228 channel->current_queue_index == message_buffer_.back().index) {
229 return;
230 }
231 channel->current_queue_index = message_buffer_.back().index;
232 channel->next_packet_number = 0;
233}
234
235void Subscriber::RemoveListener(
236 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
237 channels_.erase(channel);
238}
239
Alex Perry5f474f22020-02-01 12:14:24 -0800240Connection::Connection(
241 ::seasocks::WebSocket *sock, ::seasocks::Server *server,
242 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
James Kuszmaul71a81932020-12-15 21:08:01 -0800243 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config,
244 const EventLoop *event_loop)
Alex Perry5f474f22020-02-01 12:14:24 -0800245 : sock_(sock),
246 server_(server),
247 subscribers_(subscribers),
James Kuszmaul71a81932020-12-15 21:08:01 -0800248 config_headers_(PackBuffer(config.span())),
249 event_loop_(event_loop) {}
Alex Perryb3b50792020-01-18 16:13:45 -0800250
James Kuszmaul71a81932020-12-15 21:08:01 -0800251// Function called for web socket data. Parses the flatbuffer and
252// handles it appropriately.
Alex Perryb3b50792020-01-18 16:13:45 -0800253void Connection::HandleWebSocketData(const uint8_t *data, size_t size) {
254 const WebSocketMessage *message =
255 flatbuffers::GetRoot<WebSocketMessage>(data);
256 switch (message->payload_type()) {
257 case Payload::WebSocketSdp: {
258 const WebSocketSdp *offer = message->payload_as_WebSocketSdp();
259 if (offer->type() != SdpType::OFFER) {
260 LOG(WARNING) << "Got the wrong sdp type from client";
261 break;
262 }
263 const flatbuffers::String *sdp = offer->payload();
264 webrtc::SdpParseError error;
265 std::unique_ptr<webrtc::SessionDescriptionInterface> desc =
266 CreateSessionDescription(webrtc::SdpType::kOffer, sdp->str(), &error);
267 if (!desc) {
268 LOG(WARNING) << "Failed to parse sdp description: "
269 << error.description;
270 // TODO(alex): send a message back to browser for failure.
271 break;
272 }
273
James Kuszmaul71a81932020-12-15 21:08:01 -0800274 // We can only start creating the PeerConnection once we have
275 // something to give it, so we wait until we get an offer before
276 // starting.
Alex Perryb3b50792020-01-18 16:13:45 -0800277 webrtc::PeerConnectionInterface::RTCConfiguration config;
278 config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
279 config.enable_dtls_srtp = true;
280
281 std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create();
282 signaling_thread->SetName("signaling_thread", nullptr);
283 signaling_thread->Start();
284
285 webrtc::PeerConnectionFactoryDependencies factory_deps;
286 factory_deps.signaling_thread = signaling_thread.release();
287 rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
288 CreateModularPeerConnectionFactory(std::move(factory_deps));
289
290 peer_connection_ =
291 factory->CreatePeerConnection(config, nullptr, nullptr, this);
292
293 peer_connection_->SetRemoteDescription(
294 DummySetSessionDescriptionObserver::Create(), desc.release());
295
296 peer_connection_->CreateAnswer(
297 this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
298 break;
299 }
300 case Payload::WebSocketIce: {
301 const WebSocketIce *ice = message->payload_as_WebSocketIce();
302 std::string candidate = ice->candidate()->str();
303 std::string sdpMid = ice->sdpMid()->str();
304 int sdpMLineIndex = ice->sdpMLineIndex();
305 webrtc::SdpParseError error;
306 webrtc::IceCandidateInterface *ice_candidate =
307 webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error);
308 if (!ice_candidate) {
309 LOG(WARNING) << "Failed to parse ice candidate: " << error.description;
310 // TODO(alex): send a message back to browser for failure.
311 break;
312 }
313 peer_connection_->AddIceCandidate(ice_candidate);
314 break;
315 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800316 default: {
317 break;
318 }
Alex Perryb3b50792020-01-18 16:13:45 -0800319 }
320}
321
Alex Perry5f474f22020-02-01 12:14:24 -0800322void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
323 webrtc::DataBuffer data_buffer(
324 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
325 true /* binary array */);
Alex Perry3dfcb812020-03-04 19:32:17 -0800326 VLOG(1) << "Sending " << buffer.size() << "bytes to a client";
Alex Perry5f474f22020-02-01 12:14:24 -0800327 data_channel_->Send(data_buffer);
328}
329
Alex Perryb3b50792020-01-18 16:13:45 -0800330void Connection::OnDataChannel(
331 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
332 data_channel_ = channel;
333 data_channel_->RegisterObserver(this);
334}
335
336void Connection::OnIceCandidate(
337 const webrtc::IceCandidateInterface *candidate) {
338 flatbuffers::FlatBufferBuilder fbb(512);
339 std::string ice_string;
340 candidate->ToString(&ice_string);
341
342 flatbuffers::Offset<WebSocketIce> ice_fb = CreateWebSocketIceDirect(
343 fbb, ice_string.c_str(), candidate->sdp_mid().c_str(),
344 candidate->sdp_mline_index());
345 flatbuffers::Offset<WebSocketMessage> ice_message =
346 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
347 fbb.Finish(ice_message);
348
349 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
350}
351
352// This is the callback for creating an sdp. We have to manually assign it
353// locally and send it to the client.
354void Connection::OnSuccess(webrtc::SessionDescriptionInterface *desc) {
355 peer_connection_->SetLocalDescription(
356 DummySetSessionDescriptionObserver::Create(), desc);
357 flatbuffers::FlatBufferBuilder fbb(512);
358 std::string answer_string;
359 desc->ToString(&answer_string);
360 flatbuffers::Offset<WebSocketSdp> sdp_fb =
361 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, answer_string.c_str());
362 flatbuffers::Offset<WebSocketMessage> answer_message =
363 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
364 fbb.Finish(answer_message);
365
366 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
367}
368
Alex Perry5f474f22020-02-01 12:14:24 -0800369// Wait until the data channel is ready for data before sending the config.
370void Connection::OnStateChange() {
371 if (peer_connection_.get() != nullptr &&
372 data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800373 for (const auto &header : config_headers_) {
James Kuszmaul1ec74432020-07-30 20:26:45 -0700374 Send(header.buffer());
375 }
Alex Perry5f474f22020-02-01 12:14:24 -0800376 }
377}
378
379// Handle DataChannel messages. Subscribe to each listener that matches the
380// subscribe message
Alex Perryb3b50792020-01-18 16:13:45 -0800381void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
James Kuszmaul71a81932020-12-15 21:08:01 -0800382 FlatbufferSpan<SubscriberRequest> message(
383 {buffer.data.data(), buffer.data.size()});
384 if (!message.Verify()) {
385 LOG(ERROR) << "Invalid flatbuffer received from browser client.";
386 return;
387 }
388 VLOG(2) << "Got a subscription message "
389 << aos::FlatbufferToJson(&message.message());
390 if (!message.message().has_channels_to_transfer()) {
391 LOG(ERROR) << "No channels requested for transfer.";
392 return;
393 }
Alex Perry5f474f22020-02-01 12:14:24 -0800394 for (auto &subscriber : subscribers_) {
395 bool found_match = false;
James Kuszmaul71a81932020-12-15 21:08:01 -0800396 for (auto channel_request : *message.message().channels_to_transfer()) {
397 const Channel *channel = channel_request->channel();
398 if (channel == nullptr) {
399 LOG(ERROR) << "Got unpopulated channel.";
400 continue;
401 }
402 const TransferMethod transfer_method = channel_request->method();
403 // Call GetChannel() before comparing the channel name/type to each
404 // subscriber. This allows us to resolve any node or application specific
405 // mappings.
406 const Channel *comparison_channel =
407 configuration::GetChannel(event_loop_->configuration(), channel,
408 event_loop_->name(), event_loop_->node());
409 if (comparison_channel == nullptr) {
410 LOG(ERROR) << "Channel not available: "
411 << configuration::StrippedChannelToString(channel);
412 continue;
413 }
414 if (subscriber->Compare(comparison_channel)) {
Alex Perry5f474f22020-02-01 12:14:24 -0800415 int index = subscriber->index();
416 auto it = channels_.find(index);
417 if (it == channels_.end()) {
418 auto pair = channels_.insert(
419 {index, peer_connection_->CreateDataChannel(
420 channel->name()->str() + "/" + channel->type()->str(),
421 nullptr)});
422 it = pair.first;
423 }
James Kuszmaul71a81932020-12-15 21:08:01 -0800424 subscriber->AddListener(it->second, transfer_method);
Alex Perry5f474f22020-02-01 12:14:24 -0800425
426 VLOG(1) << "Subscribe to: " << channel->type()->str();
427 found_match = true;
428 break;
429 }
430 }
431 if (!found_match) {
432 int index = subscriber->index();
433 auto it = channels_.find(index);
434 subscriber->RemoveListener(it->second);
435 }
436 }
Alex Perryb3b50792020-01-18 16:13:45 -0800437}
438
439} // namespace web_proxy
440} // namespace aos