blob: 5b500725748167b22d55d07ecb1729490182f1ac [file] [log] [blame]
Alex Perryb3b50792020-01-18 16:13:45 -08001#include "aos/network/web_proxy.h"
Alex Perry5f474f22020-02-01 12:14:24 -08002
3#include "aos/flatbuffer_merge.h"
4#include "aos/network/connect_generated.h"
Alex Perryb3b50792020-01-18 16:13:45 -08005#include "aos/network/web_proxy_generated.h"
Alex Perry5f474f22020-02-01 12:14:24 -08006#include "aos/network/web_proxy_utils.h"
Alex Perryb3b50792020-01-18 16:13:45 -08007#include "api/create_peerconnection_factory.h"
8#include "glog/logging.h"
9
10namespace aos {
11namespace web_proxy {
12
13namespace {
14// Based on webrtc examples. In our controlled environment we expect setting sdp
15// to always succeed, and we can't do anything about a failure, so just ignore
16// everything.
17class DummySetSessionDescriptionObserver
18 : public webrtc::SetSessionDescriptionObserver {
19 public:
20 static DummySetSessionDescriptionObserver *Create() {
21 return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>();
22 }
23 virtual void OnSuccess() {}
24 virtual void OnFailure(webrtc::RTCError error) {}
25};
26
27} // namespace
28
Alex Perry5f474f22020-02-01 12:14:24 -080029WebsocketHandler::WebsocketHandler(
30 ::seasocks::Server *server,
31 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
32 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
33 : server_(server), subscribers_(subscribers), config_(config) {}
Alex Perryb3b50792020-01-18 16:13:45 -080034
35void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
36 std::unique_ptr<Connection> conn =
Alex Perry5f474f22020-02-01 12:14:24 -080037 std::make_unique<Connection>(sock, server_, subscribers_, config_);
Alex Perryb3b50792020-01-18 16:13:45 -080038 connections_.insert({sock, std::move(conn)});
39}
40
41void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
42 size_t size) {
43 connections_[sock]->HandleWebSocketData(data, size);
44}
45
46void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
47 connections_.erase(sock);
48}
49
Alex Perry5f474f22020-02-01 12:14:24 -080050void Subscriber::RunIteration() {
51 if (channels_.empty()) {
52 return;
53 }
54
55 fetcher_->Fetch();
Alex Perry22824d72020-02-29 17:11:43 -080056 VLOG(2) << "Sending a message with " << GetPacketCount(fetcher_->context())
57 << "packets";
Alex Perry5f474f22020-02-01 12:14:24 -080058 for (int packet_index = 0; packet_index < GetPacketCount(fetcher_->context());
59 ++packet_index) {
60 flatbuffers::Offset<MessageHeader> message =
61 PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
62 fbb_.Finish(message);
63
64 const flatbuffers::DetachedBuffer buffer = fbb_.Release();
65
66 webrtc::DataBuffer data_buffer(
67 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
68 true /* binary array */);
69 for (auto conn : channels_) {
Alex Perry3dfcb812020-03-04 19:32:17 -080070 if (conn->buffered_amount() > 14000000) {
71 VLOG(1) << "skipping a send because buffered amount is too high";
72 continue;
73 }
Alex Perry5f474f22020-02-01 12:14:24 -080074 conn->Send(data_buffer);
75 }
76 }
77}
78
79bool Subscriber::Compare(const Channel *channel) const {
Alex Perry22824d72020-02-29 17:11:43 -080080 return channel->name()->string_view() ==
81 fetcher_->channel()->name()->string_view() &&
82 channel->type()->string_view() ==
83 fetcher_->channel()->type()->string_view();
Alex Perry5f474f22020-02-01 12:14:24 -080084}
85
86Connection::Connection(
87 ::seasocks::WebSocket *sock, ::seasocks::Server *server,
88 const std::vector<std::unique_ptr<Subscriber>> &subscribers,
89 const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
90 : sock_(sock),
91 server_(server),
92 subscribers_(subscribers),
93 config_(config) {}
Alex Perryb3b50792020-01-18 16:13:45 -080094
95// Function called for web socket data. Parses the flatbuffer and handles it
96// appropriately.
97void Connection::HandleWebSocketData(const uint8_t *data, size_t size) {
98 const WebSocketMessage *message =
99 flatbuffers::GetRoot<WebSocketMessage>(data);
100 switch (message->payload_type()) {
101 case Payload::WebSocketSdp: {
102 const WebSocketSdp *offer = message->payload_as_WebSocketSdp();
103 if (offer->type() != SdpType::OFFER) {
104 LOG(WARNING) << "Got the wrong sdp type from client";
105 break;
106 }
107 const flatbuffers::String *sdp = offer->payload();
108 webrtc::SdpParseError error;
109 std::unique_ptr<webrtc::SessionDescriptionInterface> desc =
110 CreateSessionDescription(webrtc::SdpType::kOffer, sdp->str(), &error);
111 if (!desc) {
112 LOG(WARNING) << "Failed to parse sdp description: "
113 << error.description;
114 // TODO(alex): send a message back to browser for failure.
115 break;
116 }
117
118 // We can only start creating the PeerConnection once we have something to
119 // give it, so we wait until we get an offer before starting.
120 webrtc::PeerConnectionInterface::RTCConfiguration config;
121 config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
122 config.enable_dtls_srtp = true;
123
124 std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create();
125 signaling_thread->SetName("signaling_thread", nullptr);
126 signaling_thread->Start();
127
128 webrtc::PeerConnectionFactoryDependencies factory_deps;
129 factory_deps.signaling_thread = signaling_thread.release();
130 rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
131 CreateModularPeerConnectionFactory(std::move(factory_deps));
132
133 peer_connection_ =
134 factory->CreatePeerConnection(config, nullptr, nullptr, this);
135
136 peer_connection_->SetRemoteDescription(
137 DummySetSessionDescriptionObserver::Create(), desc.release());
138
139 peer_connection_->CreateAnswer(
140 this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
141 break;
142 }
143 case Payload::WebSocketIce: {
144 const WebSocketIce *ice = message->payload_as_WebSocketIce();
145 std::string candidate = ice->candidate()->str();
146 std::string sdpMid = ice->sdpMid()->str();
147 int sdpMLineIndex = ice->sdpMLineIndex();
148 webrtc::SdpParseError error;
149 webrtc::IceCandidateInterface *ice_candidate =
150 webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error);
151 if (!ice_candidate) {
152 LOG(WARNING) << "Failed to parse ice candidate: " << error.description;
153 // TODO(alex): send a message back to browser for failure.
154 break;
155 }
156 peer_connection_->AddIceCandidate(ice_candidate);
157 break;
158 }
159 default: { break; }
160 }
161}
162
Alex Perry5f474f22020-02-01 12:14:24 -0800163void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
164 webrtc::DataBuffer data_buffer(
165 rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
166 true /* binary array */);
Alex Perry3dfcb812020-03-04 19:32:17 -0800167 VLOG(1) << "Sending " << buffer.size() << "bytes to a client";
Alex Perry5f474f22020-02-01 12:14:24 -0800168 data_channel_->Send(data_buffer);
169}
170
Alex Perryb3b50792020-01-18 16:13:45 -0800171void Connection::OnDataChannel(
172 rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
173 data_channel_ = channel;
174 data_channel_->RegisterObserver(this);
175}
176
177void Connection::OnIceCandidate(
178 const webrtc::IceCandidateInterface *candidate) {
179 flatbuffers::FlatBufferBuilder fbb(512);
180 std::string ice_string;
181 candidate->ToString(&ice_string);
182
183 flatbuffers::Offset<WebSocketIce> ice_fb = CreateWebSocketIceDirect(
184 fbb, ice_string.c_str(), candidate->sdp_mid().c_str(),
185 candidate->sdp_mline_index());
186 flatbuffers::Offset<WebSocketMessage> ice_message =
187 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
188 fbb.Finish(ice_message);
189
190 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
191}
192
193// This is the callback for creating an sdp. We have to manually assign it
194// locally and send it to the client.
195void Connection::OnSuccess(webrtc::SessionDescriptionInterface *desc) {
196 peer_connection_->SetLocalDescription(
197 DummySetSessionDescriptionObserver::Create(), desc);
198 flatbuffers::FlatBufferBuilder fbb(512);
199 std::string answer_string;
200 desc->ToString(&answer_string);
201 flatbuffers::Offset<WebSocketSdp> sdp_fb =
202 CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, answer_string.c_str());
203 flatbuffers::Offset<WebSocketMessage> answer_message =
204 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
205 fbb.Finish(answer_message);
206
207 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
208}
209
Alex Perry5f474f22020-02-01 12:14:24 -0800210// Wait until the data channel is ready for data before sending the config.
211void Connection::OnStateChange() {
212 if (peer_connection_.get() != nullptr &&
213 data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
214 Send(config_.buffer());
215 }
216}
217
218// Handle DataChannel messages. Subscribe to each listener that matches the
219// subscribe message
Alex Perryb3b50792020-01-18 16:13:45 -0800220void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
Alex Perry5f474f22020-02-01 12:14:24 -0800221 const message_bridge::Connect *message =
222 flatbuffers::GetRoot<message_bridge::Connect>(buffer.data.data());
Alex Perry22824d72020-02-29 17:11:43 -0800223 VLOG(2) << "Got a connect message " << aos::FlatbufferToJson(message);
Alex Perry5f474f22020-02-01 12:14:24 -0800224 for (auto &subscriber : subscribers_) {
Alex Perry656dd5b2020-02-16 17:20:42 -0800225 // Make sure the subscriber is for a channel on this node.
226 if (subscriber.get() == nullptr) {
Alex Perry22824d72020-02-29 17:11:43 -0800227 VLOG(2) << ": Null subscriber";
Alex Perry656dd5b2020-02-16 17:20:42 -0800228 continue;
229 }
Alex Perry5f474f22020-02-01 12:14:24 -0800230 bool found_match = false;
231 for (auto channel : *message->channels_to_transfer()) {
232 if (subscriber->Compare(channel)) {
233 int index = subscriber->index();
234 auto it = channels_.find(index);
235 if (it == channels_.end()) {
236 auto pair = channels_.insert(
237 {index, peer_connection_->CreateDataChannel(
238 channel->name()->str() + "/" + channel->type()->str(),
239 nullptr)});
240 it = pair.first;
241 }
242 subscriber->AddListener(it->second);
243
244 VLOG(1) << "Subscribe to: " << channel->type()->str();
245 found_match = true;
246 break;
247 }
248 }
249 if (!found_match) {
250 int index = subscriber->index();
251 auto it = channels_.find(index);
252 subscriber->RemoveListener(it->second);
253 }
254 }
Alex Perryb3b50792020-01-18 16:13:45 -0800255}
256
257} // namespace web_proxy
258} // namespace aos