blob: 507f73fe5436037fa9af008f7f1b2c575220a1ac [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
Alex Perryb3b50792020-01-18 16:13:45 -08007#include "api/create_peerconnection_factory.h"
8#include "glog/logging.h"
9
10namespace aos {
11namespace web_proxy {
12
13namespace {
14// Based on webrtc examples. In our controlled environment we expect setting sdp
15// to always succeed, and we can't do anything about a failure, so just ignore
16// everything.
17class DummySetSessionDescriptionObserver
18 : public webrtc::SetSessionDescriptionObserver {
19 public:
20 static DummySetSessionDescriptionObserver *Create() {
21 return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>();
22 }
23 virtual void OnSuccess() {}
24 virtual void OnFailure(webrtc::RTCError error) {}
25};
26
27} // namespace
28
James Kuszmaul7ad91522020-09-01 19:15:35 -070029WebsocketHandler::WebsocketHandler(::seasocks::Server *server,
30 aos::EventLoop *event_loop)
31 : server_(server),
32 config_(aos::CopyFlatBuffer(event_loop->configuration())) {
33 const bool is_multi_node =
34 aos::configuration::MultiNode(event_loop->configuration());
35 const aos::Node *self =
36 is_multi_node ? aos::configuration::GetMyNode(event_loop->configuration())
37 : nullptr;
38
39 for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) {
40 auto channel = event_loop->configuration()->channels()->Get(i);
41 if (aos::configuration::ChannelIsReadableOnNode(channel, self)) {
42 auto fetcher = event_loop->MakeRawFetcher(channel);
43 subscribers_.emplace_back(
44 std::make_unique<aos::web_proxy::Subscriber>(std::move(fetcher), i));
45 }
46 }
47
48 TimerHandler *const timer = event_loop->AddTimer([this]() {
49 for (auto &subscriber : subscribers_) {
50 subscriber->RunIteration();
51 }
52 });
53
54 event_loop->OnRun([timer, event_loop]() {
55 timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(100));
56 });
57}
Alex Perryb3b50792020-01-18 16:13:45 -080058
59void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
60 std::unique_ptr<Connection> conn =
Alex Perry5f474f22020-02-01 12:14:24 -080061 std::make_unique<Connection>(sock, server_, subscribers_, config_);
Alex Perryb3b50792020-01-18 16:13:45 -080062 connections_.insert({sock, std::move(conn)});
63}
64
65void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
66 size_t size) {
67 connections_[sock]->HandleWebSocketData(data, size);
68}
69
70void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
71 connections_.erase(sock);
72}
73
Alex Perry5f474f22020-02-01 12:14:24 -080074void Subscriber::RunIteration() {
75 if (channels_.empty()) {
76 return;
77 }
78
79 fetcher_->Fetch();
Alex Perry22824d72020-02-29 17:11:43 -080080 VLOG(2) << "Sending a message with " << GetPacketCount(fetcher_->context())
81 << "packets";
Alex Perry5f474f22020-02-01 12:14:24 -080082 for (int packet_index = 0; packet_index < GetPacketCount(fetcher_->context());
83 ++packet_index) {
84 flatbuffers::Offset<MessageHeader> message =
85 PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
86 fbb_.Finish(message);
87
88 const flatbuffers::DetachedBuffer buffer = fbb_.Release();
89
90 webrtc::DataBuffer data_buffer(
91 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
92 true /* binary array */);
93 for (auto conn : channels_) {
Alex Perry3dfcb812020-03-04 19:32:17 -080094 if (conn->buffered_amount() > 14000000) {
95 VLOG(1) << "skipping a send because buffered amount is too high";
96 continue;
97 }
Alex Perry5f474f22020-02-01 12:14:24 -080098 conn->Send(data_buffer);
99 }
100 }
101}
102
103bool Subscriber::Compare(const Channel *channel) const {
Alex Perry22824d72020-02-29 17:11:43 -0800104 return channel->name()->string_view() ==
105 fetcher_->channel()->name()->string_view() &&
106 channel->type()->string_view() ==
107 fetcher_->channel()->type()->string_view();
Alex Perry5f474f22020-02-01 12:14:24 -0800108}
109
110Connection::Connection(
111 ::seasocks::WebSocket *sock, ::seasocks::Server *server,
112 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
113 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
114 : sock_(sock),
115 server_(server),
116 subscribers_(subscribers),
James Kuszmaul1ec74432020-07-30 20:26:45 -0700117 config_headers_(PackBuffer(config.span())) {}
Alex Perryb3b50792020-01-18 16:13:45 -0800118
119// Function called for web socket data. Parses the flatbuffer and handles it
120// appropriately.
121void Connection::HandleWebSocketData(const uint8_t *data, size_t size) {
122 const WebSocketMessage *message =
123 flatbuffers::GetRoot<WebSocketMessage>(data);
124 switch (message->payload_type()) {
125 case Payload::WebSocketSdp: {
126 const WebSocketSdp *offer = message->payload_as_WebSocketSdp();
127 if (offer->type() != SdpType::OFFER) {
128 LOG(WARNING) << "Got the wrong sdp type from client";
129 break;
130 }
131 const flatbuffers::String *sdp = offer->payload();
132 webrtc::SdpParseError error;
133 std::unique_ptr<webrtc::SessionDescriptionInterface> desc =
134 CreateSessionDescription(webrtc::SdpType::kOffer, sdp->str(), &error);
135 if (!desc) {
136 LOG(WARNING) << "Failed to parse sdp description: "
137 << error.description;
138 // TODO(alex): send a message back to browser for failure.
139 break;
140 }
141
142 // We can only start creating the PeerConnection once we have something to
143 // give it, so we wait until we get an offer before starting.
144 webrtc::PeerConnectionInterface::RTCConfiguration config;
145 config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
146 config.enable_dtls_srtp = true;
147
148 std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create();
149 signaling_thread->SetName("signaling_thread", nullptr);
150 signaling_thread->Start();
151
152 webrtc::PeerConnectionFactoryDependencies factory_deps;
153 factory_deps.signaling_thread = signaling_thread.release();
154 rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
155 CreateModularPeerConnectionFactory(std::move(factory_deps));
156
157 peer_connection_ =
158 factory->CreatePeerConnection(config, nullptr, nullptr, this);
159
160 peer_connection_->SetRemoteDescription(
161 DummySetSessionDescriptionObserver::Create(), desc.release());
162
163 peer_connection_->CreateAnswer(
164 this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
165 break;
166 }
167 case Payload::WebSocketIce: {
168 const WebSocketIce *ice = message->payload_as_WebSocketIce();
169 std::string candidate = ice->candidate()->str();
170 std::string sdpMid = ice->sdpMid()->str();
171 int sdpMLineIndex = ice->sdpMLineIndex();
172 webrtc::SdpParseError error;
173 webrtc::IceCandidateInterface *ice_candidate =
174 webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error);
175 if (!ice_candidate) {
176 LOG(WARNING) << "Failed to parse ice candidate: " << error.description;
177 // TODO(alex): send a message back to browser for failure.
178 break;
179 }
180 peer_connection_->AddIceCandidate(ice_candidate);
181 break;
182 }
183 default: { break; }
184 }
185}
186
Alex Perry5f474f22020-02-01 12:14:24 -0800187void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
188 webrtc::DataBuffer data_buffer(
189 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
190 true /* binary array */);
Alex Perry3dfcb812020-03-04 19:32:17 -0800191 VLOG(1) << "Sending " << buffer.size() << "bytes to a client";
Alex Perry5f474f22020-02-01 12:14:24 -0800192 data_channel_->Send(data_buffer);
193}
194
Alex Perryb3b50792020-01-18 16:13:45 -0800195void Connection::OnDataChannel(
196 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
197 data_channel_ = channel;
198 data_channel_->RegisterObserver(this);
199}
200
201void Connection::OnIceCandidate(
202 const webrtc::IceCandidateInterface *candidate) {
203 flatbuffers::FlatBufferBuilder fbb(512);
204 std::string ice_string;
205 candidate->ToString(&ice_string);
206
207 flatbuffers::Offset<WebSocketIce> ice_fb = CreateWebSocketIceDirect(
208 fbb, ice_string.c_str(), candidate->sdp_mid().c_str(),
209 candidate->sdp_mline_index());
210 flatbuffers::Offset<WebSocketMessage> ice_message =
211 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
212 fbb.Finish(ice_message);
213
214 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
215}
216
217// This is the callback for creating an sdp. We have to manually assign it
218// locally and send it to the client.
219void Connection::OnSuccess(webrtc::SessionDescriptionInterface *desc) {
220 peer_connection_->SetLocalDescription(
221 DummySetSessionDescriptionObserver::Create(), desc);
222 flatbuffers::FlatBufferBuilder fbb(512);
223 std::string answer_string;
224 desc->ToString(&answer_string);
225 flatbuffers::Offset<WebSocketSdp> sdp_fb =
226 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, answer_string.c_str());
227 flatbuffers::Offset<WebSocketMessage> answer_message =
228 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
229 fbb.Finish(answer_message);
230
231 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
232}
233
Alex Perry5f474f22020-02-01 12:14:24 -0800234// Wait until the data channel is ready for data before sending the config.
235void Connection::OnStateChange() {
236 if (peer_connection_.get() != nullptr &&
237 data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
James Kuszmaul1ec74432020-07-30 20:26:45 -0700238 for (const auto &header: config_headers_) {
239 Send(header.buffer());
240 }
Alex Perry5f474f22020-02-01 12:14:24 -0800241 }
242}
243
244// Handle DataChannel messages. Subscribe to each listener that matches the
245// subscribe message
Alex Perryb3b50792020-01-18 16:13:45 -0800246void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
Alex Perry5f474f22020-02-01 12:14:24 -0800247 const message_bridge::Connect *message =
248 flatbuffers::GetRoot<message_bridge::Connect>(buffer.data.data());
Alex Perry22824d72020-02-29 17:11:43 -0800249 VLOG(2) << "Got a connect message " << aos::FlatbufferToJson(message);
Alex Perry5f474f22020-02-01 12:14:24 -0800250 for (auto &subscriber : subscribers_) {
251 bool found_match = false;
252 for (auto channel : *message->channels_to_transfer()) {
253 if (subscriber->Compare(channel)) {
254 int index = subscriber->index();
255 auto it = channels_.find(index);
256 if (it == channels_.end()) {
257 auto pair = channels_.insert(
258 {index, peer_connection_->CreateDataChannel(
259 channel->name()->str() + "/" + channel->type()->str(),
260 nullptr)});
261 it = pair.first;
262 }
263 subscriber->AddListener(it->second);
264
265 VLOG(1) << "Subscribe to: " << channel->type()->str();
266 found_match = true;
267 break;
268 }
269 }
270 if (!found_match) {
271 int index = subscriber->index();
272 auto it = channels_.find(index);
273 subscriber->RemoveListener(it->second);
274 }
275 }
Alex Perryb3b50792020-01-18 16:13:45 -0800276}
277
278} // namespace web_proxy
279} // namespace aos