Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 1 | #include "aos/network/web_proxy.h" |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 2 | |
| 3 | #include "aos/flatbuffer_merge.h" |
| 4 | #include "aos/network/connect_generated.h" |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 5 | #include "aos/network/web_proxy_generated.h" |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 6 | #include "aos/network/web_proxy_utils.h" |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 7 | #include "api/create_peerconnection_factory.h" |
| 8 | #include "glog/logging.h" |
| 9 | |
| 10 | namespace aos { |
| 11 | namespace web_proxy { |
| 12 | |
| 13 | namespace { |
| 14 | // Based on webrtc examples. In our controlled environment we expect setting sdp |
| 15 | // to always succeed, and we can't do anything about a failure, so just ignore |
| 16 | // everything. |
| 17 | class DummySetSessionDescriptionObserver |
| 18 | : public webrtc::SetSessionDescriptionObserver { |
| 19 | public: |
| 20 | static DummySetSessionDescriptionObserver *Create() { |
| 21 | return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>(); |
| 22 | } |
| 23 | virtual void OnSuccess() {} |
| 24 | virtual void OnFailure(webrtc::RTCError error) {} |
| 25 | }; |
| 26 | |
| 27 | } // namespace |
| 28 | |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 29 | WebsocketHandler::WebsocketHandler( |
| 30 | ::seasocks::Server *server, |
| 31 | const std::vector<std::unique_ptr<Subscriber>> &subscribers, |
| 32 | const aos::FlatbufferDetachedBuffer<aos::Configuration> &config) |
| 33 | : server_(server), subscribers_(subscribers), config_(config) {} |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 34 | |
| 35 | void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) { |
| 36 | std::unique_ptr<Connection> conn = |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 37 | std::make_unique<Connection>(sock, server_, subscribers_, config_); |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 38 | connections_.insert({sock, std::move(conn)}); |
| 39 | } |
| 40 | |
| 41 | void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data, |
| 42 | size_t size) { |
| 43 | connections_[sock]->HandleWebSocketData(data, size); |
| 44 | } |
| 45 | |
| 46 | void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) { |
| 47 | connections_.erase(sock); |
| 48 | } |
| 49 | |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 50 | void Subscriber::RunIteration() { |
| 51 | if (channels_.empty()) { |
| 52 | return; |
| 53 | } |
| 54 | |
| 55 | fetcher_->Fetch(); |
Alex Perry | 22824d7 | 2020-02-29 17:11:43 -0800 | [diff] [blame] | 56 | VLOG(2) << "Sending a message with " << GetPacketCount(fetcher_->context()) |
| 57 | << "packets"; |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 58 | for (int packet_index = 0; packet_index < GetPacketCount(fetcher_->context()); |
| 59 | ++packet_index) { |
| 60 | flatbuffers::Offset<MessageHeader> message = |
| 61 | PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index); |
| 62 | fbb_.Finish(message); |
| 63 | |
| 64 | const flatbuffers::DetachedBuffer buffer = fbb_.Release(); |
| 65 | |
| 66 | webrtc::DataBuffer data_buffer( |
| 67 | rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()), |
| 68 | true /* binary array */); |
| 69 | for (auto conn : channels_) { |
Alex Perry | 3dfcb81 | 2020-03-04 19:32:17 -0800 | [diff] [blame^] | 70 | if (conn->buffered_amount() > 14000000) { |
| 71 | VLOG(1) << "skipping a send because buffered amount is too high"; |
| 72 | continue; |
| 73 | } |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 74 | conn->Send(data_buffer); |
| 75 | } |
| 76 | } |
| 77 | } |
| 78 | |
| 79 | bool Subscriber::Compare(const Channel *channel) const { |
Alex Perry | 22824d7 | 2020-02-29 17:11:43 -0800 | [diff] [blame] | 80 | return channel->name()->string_view() == |
| 81 | fetcher_->channel()->name()->string_view() && |
| 82 | channel->type()->string_view() == |
| 83 | fetcher_->channel()->type()->string_view(); |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 84 | } |
| 85 | |
| 86 | Connection::Connection( |
| 87 | ::seasocks::WebSocket *sock, ::seasocks::Server *server, |
| 88 | const std::vector<std::unique_ptr<Subscriber>> &subscribers, |
| 89 | const aos::FlatbufferDetachedBuffer<aos::Configuration> &config) |
| 90 | : sock_(sock), |
| 91 | server_(server), |
| 92 | subscribers_(subscribers), |
| 93 | config_(config) {} |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 94 | |
| 95 | // Function called for web socket data. Parses the flatbuffer and handles it |
| 96 | // appropriately. |
| 97 | void Connection::HandleWebSocketData(const uint8_t *data, size_t size) { |
| 98 | const WebSocketMessage *message = |
| 99 | flatbuffers::GetRoot<WebSocketMessage>(data); |
| 100 | switch (message->payload_type()) { |
| 101 | case Payload::WebSocketSdp: { |
| 102 | const WebSocketSdp *offer = message->payload_as_WebSocketSdp(); |
| 103 | if (offer->type() != SdpType::OFFER) { |
| 104 | LOG(WARNING) << "Got the wrong sdp type from client"; |
| 105 | break; |
| 106 | } |
| 107 | const flatbuffers::String *sdp = offer->payload(); |
| 108 | webrtc::SdpParseError error; |
| 109 | std::unique_ptr<webrtc::SessionDescriptionInterface> desc = |
| 110 | CreateSessionDescription(webrtc::SdpType::kOffer, sdp->str(), &error); |
| 111 | if (!desc) { |
| 112 | LOG(WARNING) << "Failed to parse sdp description: " |
| 113 | << error.description; |
| 114 | // TODO(alex): send a message back to browser for failure. |
| 115 | break; |
| 116 | } |
| 117 | |
| 118 | // We can only start creating the PeerConnection once we have something to |
| 119 | // give it, so we wait until we get an offer before starting. |
| 120 | webrtc::PeerConnectionInterface::RTCConfiguration config; |
| 121 | config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan; |
| 122 | config.enable_dtls_srtp = true; |
| 123 | |
| 124 | std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create(); |
| 125 | signaling_thread->SetName("signaling_thread", nullptr); |
| 126 | signaling_thread->Start(); |
| 127 | |
| 128 | webrtc::PeerConnectionFactoryDependencies factory_deps; |
| 129 | factory_deps.signaling_thread = signaling_thread.release(); |
| 130 | rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory = |
| 131 | CreateModularPeerConnectionFactory(std::move(factory_deps)); |
| 132 | |
| 133 | peer_connection_ = |
| 134 | factory->CreatePeerConnection(config, nullptr, nullptr, this); |
| 135 | |
| 136 | peer_connection_->SetRemoteDescription( |
| 137 | DummySetSessionDescriptionObserver::Create(), desc.release()); |
| 138 | |
| 139 | peer_connection_->CreateAnswer( |
| 140 | this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions()); |
| 141 | break; |
| 142 | } |
| 143 | case Payload::WebSocketIce: { |
| 144 | const WebSocketIce *ice = message->payload_as_WebSocketIce(); |
| 145 | std::string candidate = ice->candidate()->str(); |
| 146 | std::string sdpMid = ice->sdpMid()->str(); |
| 147 | int sdpMLineIndex = ice->sdpMLineIndex(); |
| 148 | webrtc::SdpParseError error; |
| 149 | webrtc::IceCandidateInterface *ice_candidate = |
| 150 | webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error); |
| 151 | if (!ice_candidate) { |
| 152 | LOG(WARNING) << "Failed to parse ice candidate: " << error.description; |
| 153 | // TODO(alex): send a message back to browser for failure. |
| 154 | break; |
| 155 | } |
| 156 | peer_connection_->AddIceCandidate(ice_candidate); |
| 157 | break; |
| 158 | } |
| 159 | default: { break; } |
| 160 | } |
| 161 | } |
| 162 | |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 163 | void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const { |
| 164 | webrtc::DataBuffer data_buffer( |
| 165 | rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()), |
| 166 | true /* binary array */); |
Alex Perry | 3dfcb81 | 2020-03-04 19:32:17 -0800 | [diff] [blame^] | 167 | VLOG(1) << "Sending " << buffer.size() << "bytes to a client"; |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 168 | data_channel_->Send(data_buffer); |
| 169 | } |
| 170 | |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 171 | void Connection::OnDataChannel( |
| 172 | rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { |
| 173 | data_channel_ = channel; |
| 174 | data_channel_->RegisterObserver(this); |
| 175 | } |
| 176 | |
| 177 | void Connection::OnIceCandidate( |
| 178 | const webrtc::IceCandidateInterface *candidate) { |
| 179 | flatbuffers::FlatBufferBuilder fbb(512); |
| 180 | std::string ice_string; |
| 181 | candidate->ToString(&ice_string); |
| 182 | |
| 183 | flatbuffers::Offset<WebSocketIce> ice_fb = CreateWebSocketIceDirect( |
| 184 | fbb, ice_string.c_str(), candidate->sdp_mid().c_str(), |
| 185 | candidate->sdp_mline_index()); |
| 186 | flatbuffers::Offset<WebSocketMessage> ice_message = |
| 187 | CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union()); |
| 188 | fbb.Finish(ice_message); |
| 189 | |
| 190 | server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release())); |
| 191 | } |
| 192 | |
| 193 | // This is the callback for creating an sdp. We have to manually assign it |
| 194 | // locally and send it to the client. |
| 195 | void Connection::OnSuccess(webrtc::SessionDescriptionInterface *desc) { |
| 196 | peer_connection_->SetLocalDescription( |
| 197 | DummySetSessionDescriptionObserver::Create(), desc); |
| 198 | flatbuffers::FlatBufferBuilder fbb(512); |
| 199 | std::string answer_string; |
| 200 | desc->ToString(&answer_string); |
| 201 | flatbuffers::Offset<WebSocketSdp> sdp_fb = |
| 202 | CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, answer_string.c_str()); |
| 203 | flatbuffers::Offset<WebSocketMessage> answer_message = |
| 204 | CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union()); |
| 205 | fbb.Finish(answer_message); |
| 206 | |
| 207 | server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release())); |
| 208 | } |
| 209 | |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 210 | // Wait until the data channel is ready for data before sending the config. |
| 211 | void Connection::OnStateChange() { |
| 212 | if (peer_connection_.get() != nullptr && |
| 213 | data_channel_->state() == webrtc::DataChannelInterface::kOpen) { |
| 214 | Send(config_.buffer()); |
| 215 | } |
| 216 | } |
| 217 | |
| 218 | // Handle DataChannel messages. Subscribe to each listener that matches the |
| 219 | // subscribe message |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 220 | void Connection::OnMessage(const webrtc::DataBuffer &buffer) { |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 221 | const message_bridge::Connect *message = |
| 222 | flatbuffers::GetRoot<message_bridge::Connect>(buffer.data.data()); |
Alex Perry | 22824d7 | 2020-02-29 17:11:43 -0800 | [diff] [blame] | 223 | VLOG(2) << "Got a connect message " << aos::FlatbufferToJson(message); |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 224 | for (auto &subscriber : subscribers_) { |
Alex Perry | 656dd5b | 2020-02-16 17:20:42 -0800 | [diff] [blame] | 225 | // Make sure the subscriber is for a channel on this node. |
| 226 | if (subscriber.get() == nullptr) { |
Alex Perry | 22824d7 | 2020-02-29 17:11:43 -0800 | [diff] [blame] | 227 | VLOG(2) << ": Null subscriber"; |
Alex Perry | 656dd5b | 2020-02-16 17:20:42 -0800 | [diff] [blame] | 228 | continue; |
| 229 | } |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 230 | bool found_match = false; |
| 231 | for (auto channel : *message->channels_to_transfer()) { |
| 232 | if (subscriber->Compare(channel)) { |
| 233 | int index = subscriber->index(); |
| 234 | auto it = channels_.find(index); |
| 235 | if (it == channels_.end()) { |
| 236 | auto pair = channels_.insert( |
| 237 | {index, peer_connection_->CreateDataChannel( |
| 238 | channel->name()->str() + "/" + channel->type()->str(), |
| 239 | nullptr)}); |
| 240 | it = pair.first; |
| 241 | } |
| 242 | subscriber->AddListener(it->second); |
| 243 | |
| 244 | VLOG(1) << "Subscribe to: " << channel->type()->str(); |
| 245 | found_match = true; |
| 246 | break; |
| 247 | } |
| 248 | } |
| 249 | if (!found_match) { |
| 250 | int index = subscriber->index(); |
| 251 | auto it = channels_.find(index); |
| 252 | subscriber->RemoveListener(it->second); |
| 253 | } |
| 254 | } |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 255 | } |
| 256 | |
| 257 | } // namespace web_proxy |
| 258 | } // namespace aos |