Handle subscription of messages in the webapp.
Messages larger than a threshold are split and reassembled due to the
size limit in webrtc. Threshold may have to be adjusted somewhere between
64KiB and 256KiB.
This also includes a basic handler for a ping message and a more
advanced image handler.
Change-Id: If66acfb1bb84e9d3ff686994a94b1480cb70b2aa
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index c6d7336..77371eb 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -1,5 +1,9 @@
#include "aos/network/web_proxy.h"
+
+#include "aos/flatbuffer_merge.h"
+#include "aos/network/connect_generated.h"
#include "aos/network/web_proxy_generated.h"
+#include "aos/network/web_proxy_utils.h"
#include "api/create_peerconnection_factory.h"
#include "glog/logging.h"
@@ -22,12 +26,15 @@
} // namespace
-WebsocketHandler::WebsocketHandler(::seasocks::Server *server)
- : server_(server) {}
+WebsocketHandler::WebsocketHandler(
+ ::seasocks::Server *server,
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
+ : server_(server), subscribers_(subscribers), config_(config) {}
void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
std::unique_ptr<Connection> conn =
- std::make_unique<Connection>(sock, server_);
+ std::make_unique<Connection>(sock, server_, subscribers_, config_);
connections_.insert({sock, std::move(conn)});
}
@@ -40,8 +47,42 @@
connections_.erase(sock);
}
-Connection::Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server)
- : sock_(sock), server_(server) {}
+void Subscriber::RunIteration() {
+ if (channels_.empty()) {
+ return;
+ }
+
+ fetcher_->Fetch();
+ for (int packet_index = 0; packet_index < GetPacketCount(fetcher_->context());
+ ++packet_index) {
+ flatbuffers::Offset<MessageHeader> message =
+ PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
+ fbb_.Finish(message);
+
+ const flatbuffers::DetachedBuffer buffer = fbb_.Release();
+
+ webrtc::DataBuffer data_buffer(
+ rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
+ true /* binary array */);
+ for (auto conn : channels_) {
+ conn->Send(data_buffer);
+ }
+ }
+}
+
+bool Subscriber::Compare(const Channel *channel) const {
+ return channel->name() == fetcher_->channel()->name() &&
+ channel->type() == fetcher_->channel()->type();
+}
+
+Connection::Connection(
+ ::seasocks::WebSocket *sock, ::seasocks::Server *server,
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
+ : sock_(sock),
+ server_(server),
+ subscribers_(subscribers),
+ config_(config) {}
// Function called for web socket data. Parses the flatbuffer and handles it
// appropriately.
@@ -111,6 +152,13 @@
}
}
+void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
+ webrtc::DataBuffer data_buffer(
+ rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
+ true /* binary array */);
+ data_channel_->Send(data_buffer);
+}
+
void Connection::OnDataChannel(
rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
data_channel_ = channel;
@@ -150,13 +198,45 @@
server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
}
-// Receive and respond to a DataChannel message. Temporarily acting as a
-// "PONG", but will change to handle "Connect" subscription messages.
+// Wait until the data channel is ready for data before sending the config.
+void Connection::OnStateChange() {
+ if (peer_connection_.get() != nullptr &&
+ data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
+ Send(config_.buffer());
+ }
+}
+
+// Handle DataChannel messages. Subscribe to each listener that matches the
+// subscribe message
void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
- // This is technically disallowed by webrtc, But doesn't seem to cause major
- // problems. At least for the small data tested manually. Send should be
- // called from outside this call stack.
- data_channel_->Send(buffer);
+ const message_bridge::Connect *message =
+ flatbuffers::GetRoot<message_bridge::Connect>(buffer.data.data());
+ for (auto &subscriber : subscribers_) {
+ bool found_match = false;
+ for (auto channel : *message->channels_to_transfer()) {
+ if (subscriber->Compare(channel)) {
+ int index = subscriber->index();
+ auto it = channels_.find(index);
+ if (it == channels_.end()) {
+ auto pair = channels_.insert(
+ {index, peer_connection_->CreateDataChannel(
+ channel->name()->str() + "/" + channel->type()->str(),
+ nullptr)});
+ it = pair.first;
+ }
+ subscriber->AddListener(it->second);
+
+ VLOG(1) << "Subscribe to: " << channel->type()->str();
+ found_match = true;
+ break;
+ }
+ }
+ if (!found_match) {
+ int index = subscriber->index();
+ auto it = channels_.find(index);
+ subscriber->RemoveListener(it->second);
+ }
+ }
}
} // namespace web_proxy