Handle subscription of messages in the webapp.

Messages larger than a threshold are split and reassembled due to the
size limit in webrtc. Threshold may have to be adjusted somewhere between
64KiB and 256KiB.

This also includes a basic handler for a ping message and a more
advanced image handler.

Change-Id: If66acfb1bb84e9d3ff686994a94b1480cb70b2aa
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index c6d7336..77371eb 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -1,5 +1,9 @@
 #include "aos/network/web_proxy.h"
+
+#include "aos/flatbuffer_merge.h"
+#include "aos/network/connect_generated.h"
 #include "aos/network/web_proxy_generated.h"
+#include "aos/network/web_proxy_utils.h"
 #include "api/create_peerconnection_factory.h"
 #include "glog/logging.h"
 
@@ -22,12 +26,15 @@
 
 }  // namespace
 
-WebsocketHandler::WebsocketHandler(::seasocks::Server *server)
-    : server_(server) {}
+WebsocketHandler::WebsocketHandler(
+    ::seasocks::Server *server,
+    const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+    const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
+    : server_(server), subscribers_(subscribers), config_(config) {}
 
 void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
   std::unique_ptr<Connection> conn =
-      std::make_unique<Connection>(sock, server_);
+      std::make_unique<Connection>(sock, server_, subscribers_, config_);
   connections_.insert({sock, std::move(conn)});
 }
 
@@ -40,8 +47,42 @@
   connections_.erase(sock);
 }
 
-Connection::Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server)
-    : sock_(sock), server_(server) {}
+void Subscriber::RunIteration() {
+  if (channels_.empty()) {
+    return;
+  }
+
+  fetcher_->Fetch();
+  for (int packet_index = 0; packet_index < GetPacketCount(fetcher_->context());
+       ++packet_index) {
+    flatbuffers::Offset<MessageHeader> message =
+        PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
+    fbb_.Finish(message);
+
+    const flatbuffers::DetachedBuffer buffer = fbb_.Release();
+
+    webrtc::DataBuffer data_buffer(
+        rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
+        true /* binary array */);
+    for (auto conn : channels_) {
+      conn->Send(data_buffer);
+    }
+  }
+}
+
+bool Subscriber::Compare(const Channel *channel) const {
+  return channel->name() == fetcher_->channel()->name() &&
+         channel->type() == fetcher_->channel()->type();
+}
+
+Connection::Connection(
+    ::seasocks::WebSocket *sock, ::seasocks::Server *server,
+    const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+    const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
+    : sock_(sock),
+      server_(server),
+      subscribers_(subscribers),
+      config_(config) {}
 
 // Function called for web socket data. Parses the flatbuffer and handles it
 // appropriately.
@@ -111,6 +152,13 @@
   }
 }
 
+void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
+  webrtc::DataBuffer data_buffer(
+      rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
+      true /* binary array */);
+  data_channel_->Send(data_buffer);
+}
+
 void Connection::OnDataChannel(
     rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
   data_channel_ = channel;
@@ -150,13 +198,45 @@
   server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
 }
 
-// Receive and respond to a DataChannel message. Temporarily acting as a
-// "PONG", but will change to handle "Connect" subscription messages.
+// Wait until the data channel is ready for data before sending the config.
+void Connection::OnStateChange() {
+  if (peer_connection_.get() != nullptr &&
+      data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
+    Send(config_.buffer());
+  }
+}
+
+// Handle DataChannel messages. Subscribe to each listener that matches the
+// subscribe message
 void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
-  // This is technically disallowed by webrtc, But doesn't seem to cause major
-  // problems. At least for the small data tested manually. Send should be
-  // called from outside this call stack.
-  data_channel_->Send(buffer);
+  const message_bridge::Connect *message =
+      flatbuffers::GetRoot<message_bridge::Connect>(buffer.data.data());
+  for (auto &subscriber : subscribers_) {
+    bool found_match = false;
+    for (auto channel : *message->channels_to_transfer()) {
+      if (subscriber->Compare(channel)) {
+        int index = subscriber->index();
+        auto it = channels_.find(index);
+        if (it == channels_.end()) {
+          auto pair = channels_.insert(
+              {index, peer_connection_->CreateDataChannel(
+                          channel->name()->str() + "/" + channel->type()->str(),
+                          nullptr)});
+          it = pair.first;
+        }
+        subscriber->AddListener(it->second);
+
+        VLOG(1) << "Subscribe to: " << channel->type()->str();
+        found_match = true;
+        break;
+      }
+    }
+    if (!found_match) {
+      int index = subscriber->index();
+      auto it = channels_.find(index);
+      subscriber->RemoveListener(it->second);
+    }
+  }
 }
 
 }  // namespace web_proxy