Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 1 | #include "aos/network/web_proxy.h" |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 2 | |
| 3 | #include "aos/flatbuffer_merge.h" |
| 4 | #include "aos/network/connect_generated.h" |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 5 | #include "aos/network/web_proxy_generated.h" |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 6 | #include "aos/network/web_proxy_utils.h" |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 7 | #include "aos/seasocks/seasocks_logger.h" |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 8 | #include "api/create_peerconnection_factory.h" |
| 9 | #include "glog/logging.h" |
James Kuszmaul | 4867136 | 2020-12-24 13:54:16 -0800 | [diff] [blame] | 10 | #include "internal/Embedded.h" |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 11 | |
James Kuszmaul | 1e95bed | 2021-01-09 21:02:49 -0800 | [diff] [blame^] | 12 | DEFINE_int32(proxy_port, 8080, "Port to use for the web proxy server."); |
| 13 | |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 14 | namespace aos { |
| 15 | namespace web_proxy { |
| 16 | |
| 17 | namespace { |
| 18 | // Based on webrtc examples. In our controlled environment we expect setting sdp |
| 19 | // to always succeed, and we can't do anything about a failure, so just ignore |
| 20 | // everything. |
| 21 | class DummySetSessionDescriptionObserver |
| 22 | : public webrtc::SetSessionDescriptionObserver { |
| 23 | public: |
| 24 | static DummySetSessionDescriptionObserver *Create() { |
| 25 | return new rtc::RefCountedObject<DummySetSessionDescriptionObserver>(); |
| 26 | } |
| 27 | virtual void OnSuccess() {} |
James Kuszmaul | 4867136 | 2020-12-24 13:54:16 -0800 | [diff] [blame] | 28 | virtual void OnFailure(webrtc::RTCError /*error*/) {} |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 29 | }; |
| 30 | |
| 31 | } // namespace |
| 32 | |
James Kuszmaul | 7ad9152 | 2020-09-01 19:15:35 -0700 | [diff] [blame] | 33 | WebsocketHandler::WebsocketHandler(::seasocks::Server *server, |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 34 | aos::EventLoop *event_loop, int buffer_size) |
James Kuszmaul | 7ad9152 | 2020-09-01 19:15:35 -0700 | [diff] [blame] | 35 | : server_(server), |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 36 | config_(aos::CopyFlatBuffer(event_loop->configuration())), |
| 37 | event_loop_(event_loop) { |
James Kuszmaul | 4867136 | 2020-12-24 13:54:16 -0800 | [diff] [blame] | 38 | // We need to reference findEmbeddedContent() to make the linker happy... |
| 39 | findEmbeddedContent(""); |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 40 | const aos::Node *self = event_loop->node(); |
James Kuszmaul | 7ad9152 | 2020-09-01 19:15:35 -0700 | [diff] [blame] | 41 | |
| 42 | for (uint i = 0; i < event_loop->configuration()->channels()->size(); ++i) { |
| 43 | auto channel = event_loop->configuration()->channels()->Get(i); |
| 44 | if (aos::configuration::ChannelIsReadableOnNode(channel, self)) { |
| 45 | auto fetcher = event_loop->MakeRawFetcher(channel); |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 46 | subscribers_.emplace_back(std::make_unique<aos::web_proxy::Subscriber>( |
| 47 | std::move(fetcher), i, buffer_size)); |
James Kuszmaul | 7ad9152 | 2020-09-01 19:15:35 -0700 | [diff] [blame] | 48 | } |
| 49 | } |
James Kuszmaul | 7ad9152 | 2020-09-01 19:15:35 -0700 | [diff] [blame] | 50 | TimerHandler *const timer = event_loop->AddTimer([this]() { |
| 51 | for (auto &subscriber : subscribers_) { |
| 52 | subscriber->RunIteration(); |
| 53 | } |
| 54 | }); |
| 55 | |
| 56 | event_loop->OnRun([timer, event_loop]() { |
| 57 | timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(100)); |
| 58 | }); |
| 59 | } |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 60 | |
| 61 | void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) { |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 62 | std::unique_ptr<Connection> conn = std::make_unique<Connection>( |
| 63 | sock, server_, subscribers_, config_, event_loop_); |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 64 | connections_.insert({sock, std::move(conn)}); |
| 65 | } |
| 66 | |
| 67 | void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data, |
| 68 | size_t size) { |
| 69 | connections_[sock]->HandleWebSocketData(data, size); |
| 70 | } |
| 71 | |
| 72 | void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) { |
| 73 | connections_.erase(sock); |
| 74 | } |
| 75 | |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 76 | WebProxy::WebProxy(aos::EventLoop *event_loop, int buffer_size) |
| 77 | : WebProxy(event_loop, &internal_epoll_, buffer_size) {} |
| 78 | |
| 79 | WebProxy::WebProxy(aos::ShmEventLoop *event_loop, int buffer_size) |
| 80 | : WebProxy(event_loop, event_loop->epoll(), buffer_size) {} |
| 81 | |
| 82 | WebProxy::WebProxy(aos::EventLoop *event_loop, aos::internal::EPoll *epoll, |
| 83 | int buffer_size) |
| 84 | : epoll_(epoll), |
| 85 | server_(std::make_shared<aos::seasocks::SeasocksLogger>( |
| 86 | ::seasocks::Logger::Level::Info)), |
| 87 | websocket_handler_( |
| 88 | new WebsocketHandler(&server_, event_loop, buffer_size)) { |
| 89 | server_.addWebSocketHandler("/ws", websocket_handler_); |
James Kuszmaul | 1e95bed | 2021-01-09 21:02:49 -0800 | [diff] [blame^] | 90 | CHECK(server_.startListening(FLAGS_proxy_port)); |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 91 | |
| 92 | epoll->OnReadable(server_.fd(), [this]() { |
| 93 | CHECK(::seasocks::Server::PollResult::Continue == server_.poll(0)); |
| 94 | }); |
| 95 | |
| 96 | if (&internal_epoll_ == epoll) { |
| 97 | TimerHandler *const timer = event_loop->AddTimer([this]() { |
| 98 | // Run the epoll poller until there are no more events (if we are being |
| 99 | // backed by a shm event loop, there won't be anything registered to |
| 100 | // internal_epoll_ and this will just return false). |
| 101 | // We just deal with clearing all the epoll events using a simulated |
| 102 | // timer. This does mean that we will spin rather than actually sleeping |
| 103 | // in any coherent manner, which will be particularly noticeable when past |
| 104 | // the end of processing other events. |
| 105 | while (internal_epoll_.Poll(false)) { |
| 106 | continue; |
| 107 | } |
| 108 | }); |
| 109 | |
| 110 | event_loop->OnRun([timer, event_loop]() { |
| 111 | timer->Setup(event_loop->monotonic_now(), std::chrono::milliseconds(10)); |
| 112 | }); |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | WebProxy::~WebProxy() { |
| 117 | epoll_->DeleteFd(server_.fd()); |
| 118 | server_.terminate(); |
| 119 | CHECK(::seasocks::Server::PollResult::Terminated == server_.poll(0)); |
| 120 | } |
| 121 | |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 122 | void Subscriber::RunIteration() { |
James Kuszmaul | 8d928d0 | 2020-12-25 17:47:49 -0800 | [diff] [blame] | 123 | { |
| 124 | // Manage updating the channels_ map given the pending_* members from the |
| 125 | // *Listeners() methods. |
| 126 | // We handle all the removals first so that we correctly handle the |
| 127 | // situation where the user calls RemoveListener() and then AddListener() |
| 128 | // between calls to RunIteration(). The reverse order (adding and then |
| 129 | // removing before an update) is handled directly in RemoveListener() where |
| 130 | // we remove things from the pending_channels_ map directly. |
| 131 | MutexLocker lock(&mutex_); |
| 132 | for (const auto &channel : pending_removal_) { |
| 133 | channels_.erase(channel); |
| 134 | } |
| 135 | pending_removal_.clear(); |
| 136 | for (const auto &channel : pending_channels_) { |
| 137 | channels_.insert(channel); |
| 138 | } |
| 139 | pending_channels_.clear(); |
| 140 | } |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 141 | if (channels_.empty() && buffer_size_ == 0) { |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 142 | return; |
| 143 | } |
| 144 | |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 145 | while (fetcher_->FetchNext()) { |
| 146 | // If we aren't building up a buffer, short-circuit the FetchNext(). |
| 147 | if (buffer_size_ == 0) { |
| 148 | fetcher_->Fetch(); |
| 149 | } |
| 150 | Message message; |
| 151 | message.index = fetcher_->context().queue_index; |
| 152 | VLOG(2) << "Packing a message with " << GetPacketCount(fetcher_->context()) |
| 153 | << "packets"; |
| 154 | for (int packet_index = 0; |
| 155 | packet_index < GetPacketCount(fetcher_->context()); ++packet_index) { |
| 156 | flatbuffers::Offset<MessageHeader> message_offset = |
| 157 | PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index); |
| 158 | fbb_.Finish(message_offset); |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 159 | |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 160 | const flatbuffers::DetachedBuffer buffer = fbb_.Release(); |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 161 | |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 162 | message.data.emplace_back( |
| 163 | rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()), |
| 164 | true /* binary array */); |
| 165 | } |
| 166 | message_buffer_.push_back(std::move(message)); |
| 167 | } |
| 168 | for (auto &conn : channels_) { |
| 169 | rtc::scoped_refptr<webrtc::DataChannelInterface> rtc_channel = conn.first; |
| 170 | ChannelInformation *channel_data = &conn.second; |
| 171 | if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) { |
| 172 | SkipToLastMessage(channel_data); |
| 173 | } |
| 174 | const webrtc::DataBuffer *buffer = NextBuffer(channel_data); |
| 175 | while (buffer != nullptr) { |
| 176 | if (rtc_channel->buffered_amount() > 14000000) { |
Alex Perry | 3dfcb81 | 2020-03-04 19:32:17 -0800 | [diff] [blame] | 177 | VLOG(1) << "skipping a send because buffered amount is too high"; |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 178 | break; |
Alex Perry | 3dfcb81 | 2020-03-04 19:32:17 -0800 | [diff] [blame] | 179 | } |
James Kuszmaul | 8d928d0 | 2020-12-25 17:47:49 -0800 | [diff] [blame] | 180 | // Call Send() from the signalling thread. The Invoke() call blocks until |
| 181 | // the handler has been called, so we do not need to handle any |
| 182 | // synchronization on this end. The body of the handler should be kept as |
| 183 | // short as possible to avoid blocking the signalling thread continuously |
| 184 | // for any longer than necessary. |
| 185 | channel_data->signaling_thread->Invoke<void>( |
| 186 | RTC_FROM_HERE, |
| 187 | [rtc_channel, buffer]() { rtc_channel->Send(*buffer); }); |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 188 | buffer = NextBuffer(channel_data); |
| 189 | } |
| 190 | } |
| 191 | if (buffer_size_ >= 0) { |
| 192 | while (message_buffer_.size() > static_cast<size_t>(buffer_size_)) { |
| 193 | message_buffer_.pop_front(); |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 194 | } |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | bool Subscriber::Compare(const Channel *channel) const { |
Alex Perry | 22824d7 | 2020-02-29 17:11:43 -0800 | [diff] [blame] | 199 | return channel->name()->string_view() == |
| 200 | fetcher_->channel()->name()->string_view() && |
| 201 | channel->type()->string_view() == |
| 202 | fetcher_->channel()->type()->string_view(); |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 203 | } |
| 204 | |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 205 | void Subscriber::AddListener( |
| 206 | rtc::scoped_refptr<webrtc::DataChannelInterface> channel, |
James Kuszmaul | 8d928d0 | 2020-12-25 17:47:49 -0800 | [diff] [blame] | 207 | TransferMethod transfer_method, rtc::Thread *signaling_thread) { |
| 208 | MutexLocker lock(&mutex_); |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 209 | ChannelInformation info; |
| 210 | info.transfer_method = transfer_method; |
James Kuszmaul | 8d928d0 | 2020-12-25 17:47:49 -0800 | [diff] [blame] | 211 | info.signaling_thread = signaling_thread; |
| 212 | pending_channels_.emplace(channel, info); |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 213 | } |
| 214 | |
| 215 | const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) { |
| 216 | CHECK_NOTNULL(channel); |
| 217 | if (message_buffer_.empty()) { |
| 218 | return nullptr; |
| 219 | } |
| 220 | const uint32_t earliest_index = message_buffer_.front().index; |
| 221 | const uint32_t latest_index = message_buffer_.back().index; |
| 222 | const bool fell_behind = channel->current_queue_index < earliest_index; |
| 223 | if (fell_behind) { |
| 224 | channel->current_queue_index = earliest_index; |
| 225 | channel->next_packet_number = 0; |
| 226 | return &message_buffer_.front().data.at(0); |
| 227 | } |
| 228 | if (channel->current_queue_index > latest_index) { |
| 229 | // We are still waiting on the next message to appear; return. |
| 230 | return nullptr; |
| 231 | } |
| 232 | CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size()) |
| 233 | << "Inconsistent queue indices."; |
| 234 | const size_t packets_in_message = |
| 235 | message_buffer_[channel->current_queue_index - earliest_index] |
| 236 | .data.size(); |
| 237 | CHECK_LT(0u, packets_in_message); |
| 238 | CHECK_LT(channel->next_packet_number, packets_in_message); |
| 239 | |
| 240 | const webrtc::DataBuffer *data = |
| 241 | &message_buffer_[channel->current_queue_index - earliest_index].data.at( |
| 242 | channel->next_packet_number); |
| 243 | |
| 244 | ++channel->next_packet_number; |
| 245 | if (channel->next_packet_number == packets_in_message) { |
| 246 | ++channel->current_queue_index; |
| 247 | channel->next_packet_number = 0; |
| 248 | } |
| 249 | |
| 250 | return data; |
| 251 | } |
| 252 | |
| 253 | void Subscriber::SkipToLastMessage(ChannelInformation *channel) { |
| 254 | CHECK_NOTNULL(channel); |
| 255 | if (message_buffer_.empty() || |
| 256 | channel->current_queue_index == message_buffer_.back().index) { |
| 257 | return; |
| 258 | } |
| 259 | channel->current_queue_index = message_buffer_.back().index; |
| 260 | channel->next_packet_number = 0; |
| 261 | } |
| 262 | |
| 263 | void Subscriber::RemoveListener( |
| 264 | rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { |
James Kuszmaul | 8d928d0 | 2020-12-25 17:47:49 -0800 | [diff] [blame] | 265 | MutexLocker lock(&mutex_); |
| 266 | pending_channels_.erase(channel); |
| 267 | pending_removal_.push_back(channel); |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 268 | } |
| 269 | |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 270 | Connection::Connection( |
| 271 | ::seasocks::WebSocket *sock, ::seasocks::Server *server, |
| 272 | const std::vector<std::unique_ptr<Subscriber>> &subscribers, |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 273 | const aos::FlatbufferDetachedBuffer<aos::Configuration> &config, |
| 274 | const EventLoop *event_loop) |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 275 | : sock_(sock), |
| 276 | server_(server), |
| 277 | subscribers_(subscribers), |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 278 | config_headers_(PackBuffer(config.span())), |
| 279 | event_loop_(event_loop) {} |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 280 | |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 281 | // Function called for web socket data. Parses the flatbuffer and |
| 282 | // handles it appropriately. |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 283 | void Connection::HandleWebSocketData(const uint8_t *data, size_t size) { |
James Kuszmaul | 4867136 | 2020-12-24 13:54:16 -0800 | [diff] [blame] | 284 | const FlatbufferSpan<WebSocketMessage> message({data, size}); |
| 285 | if (!message.Verify()) { |
| 286 | LOG(ERROR) << "Invalid WebsocketMessage received from browser."; |
| 287 | return; |
| 288 | } |
| 289 | switch (message.message().payload_type()) { |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 290 | case Payload::WebSocketSdp: { |
James Kuszmaul | 4867136 | 2020-12-24 13:54:16 -0800 | [diff] [blame] | 291 | const WebSocketSdp *offer = message.message().payload_as_WebSocketSdp(); |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 292 | if (offer->type() != SdpType::OFFER) { |
| 293 | LOG(WARNING) << "Got the wrong sdp type from client"; |
| 294 | break; |
| 295 | } |
| 296 | const flatbuffers::String *sdp = offer->payload(); |
| 297 | webrtc::SdpParseError error; |
| 298 | std::unique_ptr<webrtc::SessionDescriptionInterface> desc = |
| 299 | CreateSessionDescription(webrtc::SdpType::kOffer, sdp->str(), &error); |
| 300 | if (!desc) { |
| 301 | LOG(WARNING) << "Failed to parse sdp description: " |
| 302 | << error.description; |
| 303 | // TODO(alex): send a message back to browser for failure. |
| 304 | break; |
| 305 | } |
| 306 | |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 307 | // We can only start creating the PeerConnection once we have |
| 308 | // something to give it, so we wait until we get an offer before |
| 309 | // starting. |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 310 | webrtc::PeerConnectionInterface::RTCConfiguration config; |
| 311 | config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan; |
| 312 | config.enable_dtls_srtp = true; |
James Kuszmaul | 54424d0 | 2020-12-26 18:09:20 -0800 | [diff] [blame] | 313 | { |
| 314 | webrtc::PeerConnectionInterface::IceServer ice_server; |
| 315 | ice_server.urls.push_back("stun:stun.l.google.com:19302"); |
| 316 | config.servers.push_back(ice_server); |
| 317 | } |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 318 | |
| 319 | std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create(); |
| 320 | signaling_thread->SetName("signaling_thread", nullptr); |
| 321 | signaling_thread->Start(); |
| 322 | |
James Kuszmaul | 8d928d0 | 2020-12-25 17:47:49 -0800 | [diff] [blame] | 323 | signaling_thread_ = signaling_thread.get(); |
| 324 | |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 325 | webrtc::PeerConnectionFactoryDependencies factory_deps; |
| 326 | factory_deps.signaling_thread = signaling_thread.release(); |
| 327 | rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory = |
| 328 | CreateModularPeerConnectionFactory(std::move(factory_deps)); |
James Kuszmaul | 54424d0 | 2020-12-26 18:09:20 -0800 | [diff] [blame] | 329 | { |
| 330 | // Don't ignore *any* networks--by default, the loopback interface is |
| 331 | // ignored, which makes it impossible to use WebRTC on devices with no |
| 332 | // network. |
| 333 | webrtc::PeerConnectionFactoryInterface::Options options; |
| 334 | options.network_ignore_mask = 0; |
| 335 | factory->SetOptions(options); |
| 336 | } |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 337 | |
| 338 | peer_connection_ = |
| 339 | factory->CreatePeerConnection(config, nullptr, nullptr, this); |
| 340 | |
| 341 | peer_connection_->SetRemoteDescription( |
| 342 | DummySetSessionDescriptionObserver::Create(), desc.release()); |
| 343 | |
| 344 | peer_connection_->CreateAnswer( |
| 345 | this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions()); |
| 346 | break; |
| 347 | } |
| 348 | case Payload::WebSocketIce: { |
James Kuszmaul | 4867136 | 2020-12-24 13:54:16 -0800 | [diff] [blame] | 349 | const WebSocketIce *ice = message.message().payload_as_WebSocketIce(); |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 350 | std::string candidate = ice->candidate()->str(); |
| 351 | std::string sdpMid = ice->sdpMid()->str(); |
| 352 | int sdpMLineIndex = ice->sdpMLineIndex(); |
| 353 | webrtc::SdpParseError error; |
| 354 | webrtc::IceCandidateInterface *ice_candidate = |
| 355 | webrtc::CreateIceCandidate(sdpMid, sdpMLineIndex, candidate, &error); |
| 356 | if (!ice_candidate) { |
| 357 | LOG(WARNING) << "Failed to parse ice candidate: " << error.description; |
| 358 | // TODO(alex): send a message back to browser for failure. |
| 359 | break; |
| 360 | } |
| 361 | peer_connection_->AddIceCandidate(ice_candidate); |
| 362 | break; |
| 363 | } |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 364 | default: { |
| 365 | break; |
| 366 | } |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 367 | } |
| 368 | } |
| 369 | |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 370 | void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const { |
| 371 | webrtc::DataBuffer data_buffer( |
| 372 | rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()), |
| 373 | true /* binary array */); |
Alex Perry | 3dfcb81 | 2020-03-04 19:32:17 -0800 | [diff] [blame] | 374 | VLOG(1) << "Sending " << buffer.size() << "bytes to a client"; |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 375 | data_channel_->Send(data_buffer); |
| 376 | } |
| 377 | |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 378 | void Connection::OnDataChannel( |
| 379 | rtc::scoped_refptr<webrtc::DataChannelInterface> channel) { |
| 380 | data_channel_ = channel; |
| 381 | data_channel_->RegisterObserver(this); |
| 382 | } |
| 383 | |
| 384 | void Connection::OnIceCandidate( |
| 385 | const webrtc::IceCandidateInterface *candidate) { |
| 386 | flatbuffers::FlatBufferBuilder fbb(512); |
| 387 | std::string ice_string; |
| 388 | candidate->ToString(&ice_string); |
| 389 | |
| 390 | flatbuffers::Offset<WebSocketIce> ice_fb = CreateWebSocketIceDirect( |
| 391 | fbb, ice_string.c_str(), candidate->sdp_mid().c_str(), |
| 392 | candidate->sdp_mline_index()); |
| 393 | flatbuffers::Offset<WebSocketMessage> ice_message = |
| 394 | CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union()); |
| 395 | fbb.Finish(ice_message); |
| 396 | |
| 397 | server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release())); |
| 398 | } |
| 399 | |
| 400 | // This is the callback for creating an sdp. We have to manually assign it |
| 401 | // locally and send it to the client. |
| 402 | void Connection::OnSuccess(webrtc::SessionDescriptionInterface *desc) { |
| 403 | peer_connection_->SetLocalDescription( |
| 404 | DummySetSessionDescriptionObserver::Create(), desc); |
| 405 | flatbuffers::FlatBufferBuilder fbb(512); |
| 406 | std::string answer_string; |
| 407 | desc->ToString(&answer_string); |
| 408 | flatbuffers::Offset<WebSocketSdp> sdp_fb = |
| 409 | CreateWebSocketSdpDirect(fbb, SdpType::ANSWER, answer_string.c_str()); |
| 410 | flatbuffers::Offset<WebSocketMessage> answer_message = |
| 411 | CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union()); |
| 412 | fbb.Finish(answer_message); |
| 413 | |
| 414 | server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release())); |
| 415 | } |
| 416 | |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 417 | // Wait until the data channel is ready for data before sending the config. |
| 418 | void Connection::OnStateChange() { |
| 419 | if (peer_connection_.get() != nullptr && |
| 420 | data_channel_->state() == webrtc::DataChannelInterface::kOpen) { |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 421 | for (const auto &header : config_headers_) { |
James Kuszmaul | 1ec7443 | 2020-07-30 20:26:45 -0700 | [diff] [blame] | 422 | Send(header.buffer()); |
| 423 | } |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 424 | } |
| 425 | } |
| 426 | |
| 427 | // Handle DataChannel messages. Subscribe to each listener that matches the |
| 428 | // subscribe message |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 429 | void Connection::OnMessage(const webrtc::DataBuffer &buffer) { |
James Kuszmaul | 8d928d0 | 2020-12-25 17:47:49 -0800 | [diff] [blame] | 430 | // Sanity check--we are relying on the Add/RemoveListener calls being made |
| 431 | // from the correct thread. |
| 432 | CHECK(signaling_thread_->IsCurrent()); |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 433 | FlatbufferSpan<SubscriberRequest> message( |
| 434 | {buffer.data.data(), buffer.data.size()}); |
| 435 | if (!message.Verify()) { |
| 436 | LOG(ERROR) << "Invalid flatbuffer received from browser client."; |
| 437 | return; |
| 438 | } |
| 439 | VLOG(2) << "Got a subscription message " |
| 440 | << aos::FlatbufferToJson(&message.message()); |
| 441 | if (!message.message().has_channels_to_transfer()) { |
| 442 | LOG(ERROR) << "No channels requested for transfer."; |
| 443 | return; |
| 444 | } |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 445 | for (auto &subscriber : subscribers_) { |
| 446 | bool found_match = false; |
James Kuszmaul | 71a8193 | 2020-12-15 21:08:01 -0800 | [diff] [blame] | 447 | for (auto channel_request : *message.message().channels_to_transfer()) { |
| 448 | const Channel *channel = channel_request->channel(); |
| 449 | if (channel == nullptr) { |
| 450 | LOG(ERROR) << "Got unpopulated channel."; |
| 451 | continue; |
| 452 | } |
| 453 | const TransferMethod transfer_method = channel_request->method(); |
| 454 | // Call GetChannel() before comparing the channel name/type to each |
| 455 | // subscriber. This allows us to resolve any node or application specific |
| 456 | // mappings. |
| 457 | const Channel *comparison_channel = |
| 458 | configuration::GetChannel(event_loop_->configuration(), channel, |
| 459 | event_loop_->name(), event_loop_->node()); |
| 460 | if (comparison_channel == nullptr) { |
| 461 | LOG(ERROR) << "Channel not available: " |
| 462 | << configuration::StrippedChannelToString(channel); |
| 463 | continue; |
| 464 | } |
| 465 | if (subscriber->Compare(comparison_channel)) { |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 466 | int index = subscriber->index(); |
| 467 | auto it = channels_.find(index); |
| 468 | if (it == channels_.end()) { |
| 469 | auto pair = channels_.insert( |
| 470 | {index, peer_connection_->CreateDataChannel( |
| 471 | channel->name()->str() + "/" + channel->type()->str(), |
| 472 | nullptr)}); |
| 473 | it = pair.first; |
| 474 | } |
James Kuszmaul | 8d928d0 | 2020-12-25 17:47:49 -0800 | [diff] [blame] | 475 | subscriber->AddListener(it->second, transfer_method, signaling_thread_); |
Alex Perry | 5f474f2 | 2020-02-01 12:14:24 -0800 | [diff] [blame] | 476 | |
| 477 | VLOG(1) << "Subscribe to: " << channel->type()->str(); |
| 478 | found_match = true; |
| 479 | break; |
| 480 | } |
| 481 | } |
| 482 | if (!found_match) { |
| 483 | int index = subscriber->index(); |
| 484 | auto it = channels_.find(index); |
| 485 | subscriber->RemoveListener(it->second); |
| 486 | } |
| 487 | } |
Alex Perry | b3b5079 | 2020-01-18 16:13:45 -0800 | [diff] [blame] | 488 | } |
| 489 | |
| 490 | } // namespace web_proxy |
| 491 | } // namespace aos |