Handle subscription of messages in the webapp.

Messages larger than a threshold are split and reassembled due to the
size limit in webrtc. Threshold may have to be adjusted somewhere between
64KiB and 256KiB.

This also includes a basic handler for a ping message and a more
advanced image handler.

Change-Id: If66acfb1bb84e9d3ff686994a94b1480cb70b2aa
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 7c24eae..409e61d 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -2,6 +2,9 @@
 #define AOS_NETWORK_WEB_PROXY_H_
 #include <map>
 #include <set>
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/web_proxy_generated.h"
 #include "aos/seasocks/seasocks_logger.h"
 #include "flatbuffers/flatbuffers.h"
 #include "seasocks/Server.h"
@@ -14,13 +17,17 @@
 namespace web_proxy {
 
 class Connection;
+class Subscriber;
 
 // Basic class that handles receiving new websocket connections. Creates a new
 // Connection to manage the rest of the negotiation and data passing. When the
 // websocket closes, it deletes the Connection.
 class WebsocketHandler : public ::seasocks::WebSocket::Handler {
  public:
-  WebsocketHandler(::seasocks::Server *server);
+  WebsocketHandler(
+      ::seasocks::Server *server,
+      const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+      const aos::FlatbufferDetachedBuffer<aos::Configuration> &config);
   void onConnect(::seasocks::WebSocket *sock) override;
   void onData(::seasocks::WebSocket *sock, const uint8_t *data,
               size_t size) override;
@@ -29,6 +36,8 @@
  private:
   std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
   ::seasocks::Server *server_;
+  const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
+  const aos::FlatbufferDetachedBuffer<aos::Configuration> &config_;
 };
 
 // Seasocks requires that sends happen on the correct thread. This class takes a
@@ -37,7 +46,7 @@
 class UpdateData : public ::seasocks::Server::Runnable {
  public:
   UpdateData(::seasocks::WebSocket *websocket,
-             ::flatbuffers::DetachedBuffer &&buffer)
+             flatbuffers::DetachedBuffer &&buffer)
       : sock_(websocket), buffer_(std::move(buffer)) {}
   ~UpdateData() override = default;
   UpdateData(const UpdateData &) = delete;
@@ -47,7 +56,40 @@
 
  private:
   ::seasocks::WebSocket *sock_;
-  const ::flatbuffers::DetachedBuffer buffer_;
+  const flatbuffers::DetachedBuffer buffer_;
+};
+
+// Represents a fetcher and all the Connections that care about it.
+// Handles building the message and telling each connection to send it.
+// indexed by location of the channel it handles in the config.
+class Subscriber {
+ public:
+  Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index)
+      : fbb_(1024),
+        fetcher_(std::move(fetcher)),
+        channel_index_(channel_index) {}
+
+  void RunIteration();
+
+  void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
+    channels_.insert(channel);
+  }
+
+  void RemoveListener(
+      rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
+    channels_.erase(channel);
+  }
+
+  // Check if the Channel passed matches the channel this fetchs.
+  bool Compare(const Channel *channel) const;
+
+  int index() const { return channel_index_; }
+
+ private:
+  flatbuffers::FlatBufferBuilder fbb_;
+  std::unique_ptr<RawFetcher> fetcher_;
+  int channel_index_;
+  std::set<rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
 };
 
 // Represents a single connection to a browser for the entire lifetime of the
@@ -56,10 +98,20 @@
                    public webrtc::CreateSessionDescriptionObserver,
                    public webrtc::DataChannelObserver {
  public:
-  Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server);
+  Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server,
+             const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+             const aos::FlatbufferDetachedBuffer<aos::Configuration> &config);
+
+  ~Connection() {
+    // DataChannel may call OnStateChange after this is destroyed, so make sure
+    // it doesn't.
+    data_channel_->UnregisterObserver();
+  }
 
   void HandleWebSocketData(const uint8_t *data, size_t size);
 
+  void Send(const flatbuffers::DetachedBuffer &buffer) const;
+
   // PeerConnectionObserver implementation
   void OnSignalingChange(
       webrtc::PeerConnectionInterface::SignalingState) override {}
@@ -88,13 +140,17 @@
   }
 
   // DataChannelObserver implementation
-  void OnStateChange() override {}
+  void OnStateChange() override;
   void OnMessage(const webrtc::DataBuffer &buffer) override;
   void OnBufferedAmountChange(uint64_t sent_data_size) override {}
 
  private:
   ::seasocks::WebSocket *sock_;
   ::seasocks::Server *server_;
+  const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
+  const aos::FlatbufferDetachedBuffer<aos::Configuration> &config_;
+  std::map<int, rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
+
   rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
   rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
 };