Handle subscription of messages in the webapp.

Messages larger than a threshold are split and reassembled due to the
size limit in webrtc. Threshold may have to be adjusted somewhere between
64KiB and 256KiB.

This also includes a basic handler for a ping message and a more
advanced image handler.

Change-Id: If66acfb1bb84e9d3ff686994a94b1480cb70b2aa
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 0470b6e..1f7a3d7 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -1,4 +1,4 @@
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
 load("//aos:config.bzl", "aos_config")
 
 package(default_visibility = ["//visibility:public"])
@@ -24,6 +24,11 @@
     gen_reflections = 1,
 )
 
+flatbuffer_ts_library(
+    name = "ping_ts_fbs",
+    srcs = ["ping.fbs"],
+)
+
 flatbuffer_cc_library(
     name = "pong_fbs",
     srcs = ["pong.fbs"],
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 8f049e3..a334a47 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -240,12 +240,33 @@
 flatbuffer_cc_library(
     name = "web_proxy_fbs",
     srcs = ["web_proxy.fbs"],
+    includes = [
+        ":connect_fbs_includes",
+        "//aos:configuration_fbs_includes",
+    ],
     gen_reflections = True,
 )
 
 flatbuffer_ts_library(
     name = "web_proxy_ts_fbs",
     srcs = ["web_proxy.fbs"],
+    includes = [
+        ":connect_fbs_includes",
+        "//aos:configuration_fbs_includes",
+    ],
+)
+
+cc_library(
+    name = "web_proxy_utils",
+    hdrs = ["web_proxy_utils.h"],
+    srcs = ["web_proxy_utils.cc"],
+    deps = [
+        ":connect_fbs",
+        ":web_proxy_fbs",
+        "//aos:configuration_fbs",
+        "//aos/events:event_loop",
+        "@com_github_google_flatbuffers//:flatbuffers",
+    ],
 )
 
 cc_library(
@@ -257,7 +278,10 @@
         "-Wno-unused-parameter",
     ],
     deps = [
+        ":connect_fbs",
         ":web_proxy_fbs",
+        ":web_proxy_utils",
+        "//aos/events:shm_event_loop",
         "//aos/seasocks:seasocks_logger",
         "//third_party/seasocks",
         "//third_party:webrtc",
@@ -277,8 +301,9 @@
     name = "web_proxy_main",
     srcs = ["web_proxy_main.cc"],
     deps = [
-        ":web_proxy",
         ":gen_embedded",
+        ":web_proxy",
+        "//aos/events:shm_event_loop",
         "//aos:init",
         "//aos/seasocks:seasocks_logger",
         "//third_party/seasocks",
@@ -290,8 +315,9 @@
     ],
     data = [
         "//aos/network/www:files",
-        "//aos/network/www:proxy_bundle",
+        "//aos/network/www:main_bundle",
         "//aos/network/www:flatbuffers",
-        "@com_github_google_flatbuffers//:flatjs"
+        "@com_github_google_flatbuffers//:flatjs",
+        "//aos/events:pingpong_config.json",
     ],
 )
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index c6d7336..77371eb 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -1,5 +1,9 @@
 #include "aos/network/web_proxy.h"
+
+#include "aos/flatbuffer_merge.h"
+#include "aos/network/connect_generated.h"
 #include "aos/network/web_proxy_generated.h"
+#include "aos/network/web_proxy_utils.h"
 #include "api/create_peerconnection_factory.h"
 #include "glog/logging.h"
 
@@ -22,12 +26,15 @@
 
 }  // namespace
 
-WebsocketHandler::WebsocketHandler(::seasocks::Server *server)
-    : server_(server) {}
+WebsocketHandler::WebsocketHandler(
+    ::seasocks::Server *server,
+    const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+    const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
+    : server_(server), subscribers_(subscribers), config_(config) {}
 
 void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
   std::unique_ptr<Connection> conn =
-      std::make_unique<Connection>(sock, server_);
+      std::make_unique<Connection>(sock, server_, subscribers_, config_);
   connections_.insert({sock, std::move(conn)});
 }
 
@@ -40,8 +47,42 @@
   connections_.erase(sock);
 }
 
-Connection::Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server)
-    : sock_(sock), server_(server) {}
+void Subscriber::RunIteration() {
+  if (channels_.empty()) {
+    return;
+  }
+
+  fetcher_->Fetch();
+  for (int packet_index = 0; packet_index < GetPacketCount(fetcher_->context());
+       ++packet_index) {
+    flatbuffers::Offset<MessageHeader> message =
+        PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
+    fbb_.Finish(message);
+
+    const flatbuffers::DetachedBuffer buffer = fbb_.Release();
+
+    webrtc::DataBuffer data_buffer(
+        rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
+        true /* binary array */);
+    for (auto conn : channels_) {
+      conn->Send(data_buffer);
+    }
+  }
+}
+
+bool Subscriber::Compare(const Channel *channel) const {
+  return channel->name() == fetcher_->channel()->name() &&
+         channel->type() == fetcher_->channel()->type();
+}
+
+Connection::Connection(
+    ::seasocks::WebSocket *sock, ::seasocks::Server *server,
+    const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+    const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
+    : sock_(sock),
+      server_(server),
+      subscribers_(subscribers),
+      config_(config) {}
 
 // Function called for web socket data. Parses the flatbuffer and handles it
 // appropriately.
@@ -111,6 +152,13 @@
   }
 }
 
+void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
+  webrtc::DataBuffer data_buffer(
+      rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
+      true /* binary array */);
+  data_channel_->Send(data_buffer);
+}
+
 void Connection::OnDataChannel(
     rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
   data_channel_ = channel;
@@ -150,13 +198,45 @@
   server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
 }
 
-// Receive and respond to a DataChannel message. Temporarily acting as a
-// "PONG", but will change to handle "Connect" subscription messages.
+// Wait until the data channel is ready for data before sending the config.
+void Connection::OnStateChange() {
+  if (peer_connection_.get() != nullptr &&
+      data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
+    Send(config_.buffer());
+  }
+}
+
+// Handle DataChannel messages. Subscribe to each listener that matches the
+// subscribe message
 void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
-  // This is technically disallowed by webrtc, But doesn't seem to cause major
-  // problems. At least for the small data tested manually. Send should be
-  // called from outside this call stack.
-  data_channel_->Send(buffer);
+  const message_bridge::Connect *message =
+      flatbuffers::GetRoot<message_bridge::Connect>(buffer.data.data());
+  for (auto &subscriber : subscribers_) {
+    bool found_match = false;
+    for (auto channel : *message->channels_to_transfer()) {
+      if (subscriber->Compare(channel)) {
+        int index = subscriber->index();
+        auto it = channels_.find(index);
+        if (it == channels_.end()) {
+          auto pair = channels_.insert(
+              {index, peer_connection_->CreateDataChannel(
+                          channel->name()->str() + "/" + channel->type()->str(),
+                          nullptr)});
+          it = pair.first;
+        }
+        subscriber->AddListener(it->second);
+
+        VLOG(1) << "Subscribe to: " << channel->type()->str();
+        found_match = true;
+        break;
+      }
+    }
+    if (!found_match) {
+      int index = subscriber->index();
+      auto it = channels_.find(index);
+      subscriber->RemoveListener(it->second);
+    }
+  }
 }
 
 }  // namespace web_proxy
diff --git a/aos/network/web_proxy.fbs b/aos/network/web_proxy.fbs
index e712622..d5e027e 100644
--- a/aos/network/web_proxy.fbs
+++ b/aos/network/web_proxy.fbs
@@ -1,3 +1,9 @@
+// Typescript namespaces are weird when coming from multiple files. We generate
+// all transitive dependencies into the same file in typescript so we can
+// include all 'aos' flatbuffers we care about here.
+include "aos/configuration.fbs";
+include "aos/network/connect.fbs";
+
 namespace aos.web_proxy;
 
 // SDP is Session Description Protocol. We only handle OFFER (starting a
@@ -29,3 +35,26 @@
 table WebSocketMessage {
   payload:Payload;
 }
+
+// WebRTC has size limits on the messages sent on datachannels. This message
+// ensures that parts are recieved in the correct order. If there is any
+// mismatch, all the existing work should be dropped and restart when reasonable
+// data starts again.
+table MessageHeader {
+  // Index of the channel in config
+  channel_index:uint;
+
+  // How many packets will be required for the message being sent.
+  packet_count:uint;
+  // What index into the the total packets for the multipart message, this
+  // header is parts of.
+  packet_index:uint;
+
+  // Total number of bytes in the message
+  length:uint;
+
+  // Index into the sequence of messages. This will not always increase.
+  queue_index:uint;
+
+  data:[ubyte];
+}
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 7c24eae..409e61d 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -2,6 +2,9 @@
 #define AOS_NETWORK_WEB_PROXY_H_
 #include <map>
 #include <set>
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/web_proxy_generated.h"
 #include "aos/seasocks/seasocks_logger.h"
 #include "flatbuffers/flatbuffers.h"
 #include "seasocks/Server.h"
@@ -14,13 +17,17 @@
 namespace web_proxy {
 
 class Connection;
+class Subscriber;
 
 // Basic class that handles receiving new websocket connections. Creates a new
 // Connection to manage the rest of the negotiation and data passing. When the
 // websocket closes, it deletes the Connection.
 class WebsocketHandler : public ::seasocks::WebSocket::Handler {
  public:
-  WebsocketHandler(::seasocks::Server *server);
+  WebsocketHandler(
+      ::seasocks::Server *server,
+      const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+      const aos::FlatbufferDetachedBuffer<aos::Configuration> &config);
   void onConnect(::seasocks::WebSocket *sock) override;
   void onData(::seasocks::WebSocket *sock, const uint8_t *data,
               size_t size) override;
@@ -29,6 +36,8 @@
  private:
   std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
   ::seasocks::Server *server_;
+  const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
+  const aos::FlatbufferDetachedBuffer<aos::Configuration> &config_;
 };
 
 // Seasocks requires that sends happen on the correct thread. This class takes a
@@ -37,7 +46,7 @@
 class UpdateData : public ::seasocks::Server::Runnable {
  public:
   UpdateData(::seasocks::WebSocket *websocket,
-             ::flatbuffers::DetachedBuffer &&buffer)
+             flatbuffers::DetachedBuffer &&buffer)
       : sock_(websocket), buffer_(std::move(buffer)) {}
   ~UpdateData() override = default;
   UpdateData(const UpdateData &) = delete;
@@ -47,7 +56,40 @@
 
  private:
   ::seasocks::WebSocket *sock_;
-  const ::flatbuffers::DetachedBuffer buffer_;
+  const flatbuffers::DetachedBuffer buffer_;
+};
+
+// Represents a fetcher and all the Connections that care about it.
+// Handles building the message and telling each connection to send it.
+// indexed by location of the channel it handles in the config.
+class Subscriber {
+ public:
+  Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index)
+      : fbb_(1024),
+        fetcher_(std::move(fetcher)),
+        channel_index_(channel_index) {}
+
+  void RunIteration();
+
+  void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
+    channels_.insert(channel);
+  }
+
+  void RemoveListener(
+      rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
+    channels_.erase(channel);
+  }
+
+  // Check if the Channel passed matches the channel this fetchs.
+  bool Compare(const Channel *channel) const;
+
+  int index() const { return channel_index_; }
+
+ private:
+  flatbuffers::FlatBufferBuilder fbb_;
+  std::unique_ptr<RawFetcher> fetcher_;
+  int channel_index_;
+  std::set<rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
 };
 
 // Represents a single connection to a browser for the entire lifetime of the
@@ -56,10 +98,20 @@
                    public webrtc::CreateSessionDescriptionObserver,
                    public webrtc::DataChannelObserver {
  public:
-  Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server);
+  Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server,
+             const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+             const aos::FlatbufferDetachedBuffer<aos::Configuration> &config);
+
+  ~Connection() {
+    // DataChannel may call OnStateChange after this is destroyed, so make sure
+    // it doesn't.
+    data_channel_->UnregisterObserver();
+  }
 
   void HandleWebSocketData(const uint8_t *data, size_t size);
 
+  void Send(const flatbuffers::DetachedBuffer &buffer) const;
+
   // PeerConnectionObserver implementation
   void OnSignalingChange(
       webrtc::PeerConnectionInterface::SignalingState) override {}
@@ -88,13 +140,17 @@
   }
 
   // DataChannelObserver implementation
-  void OnStateChange() override {}
+  void OnStateChange() override;
   void OnMessage(const webrtc::DataBuffer &buffer) override;
   void OnBufferedAmountChange(uint64_t sent_data_size) override {}
 
  private:
   ::seasocks::WebSocket *sock_;
   ::seasocks::Server *server_;
+  const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
+  const aos::FlatbufferDetachedBuffer<aos::Configuration> &config_;
+  std::map<int, rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
+
   rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
   rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
 };
diff --git a/aos/network/web_proxy_main.cc b/aos/network/web_proxy_main.cc
index 37b3d1e..21b6f9c 100644
--- a/aos/network/web_proxy_main.cc
+++ b/aos/network/web_proxy_main.cc
@@ -1,23 +1,65 @@
+#include "aos/events/shm_event_loop.h"
 #include "aos/init.h"
-#include "aos/seasocks/seasocks_logger.h"
 #include "aos/network/web_proxy.h"
+#include "aos/seasocks/seasocks_logger.h"
+#include "gflags/gflags.h"
 
 #include "internal/Embedded.h"
 #include "seasocks/Server.h"
 #include "seasocks/WebSocket.h"
 
-int main() {
+DEFINE_string(config, "./config.json", "File path of aos configuration");
+DEFINE_string(data_dir, "www", "Directory to serve data files from");
+
+void RunDataThread(
+    std::vector<std::unique_ptr<aos::web_proxy::Subscriber>> *subscribers,
+    const aos::FlatbufferDetachedBuffer<aos::Configuration> &config) {
+  aos::ShmEventLoop event_loop(&config.message());
+
+  // TODO(alex): skip fetchers on the wrong node.
+  for (uint i = 0; i < config.message().channels()->size(); ++i) {
+    auto channel = config.message().channels()->Get(i);
+    auto fetcher = event_loop.MakeRawFetcher(channel);
+    subscribers->emplace_back(
+        std::make_unique<aos::web_proxy::Subscriber>(std::move(fetcher), i));
+  }
+
+  flatbuffers::FlatBufferBuilder fbb(1024);
+
+  auto timer = event_loop.AddTimer([&]() {
+    for (auto &subscriber : *subscribers) {
+      subscriber->RunIteration();
+    }
+  });
+
+  event_loop.OnRun([&]() {
+    timer->Setup(event_loop.monotonic_now(), std::chrono::milliseconds(100));
+  });
+
+  event_loop.Run();
+}
+
+int main(int argc, char **argv) {
   // Make sure to reference this to force the linker to include it.
+  aos::InitGoogle(&argc, &argv);
   findEmbeddedContent("");
 
   aos::InitNRT();
 
-  seasocks::Server server(::std::shared_ptr<seasocks::Logger>(
-      new ::aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info)));
+  aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+      aos::configuration::ReadConfig(FLAGS_config);
 
-  auto websocket_handler =
-      std::make_shared<aos::web_proxy::WebsocketHandler>(&server);
+  std::vector<std::unique_ptr<aos::web_proxy::Subscriber>> subscribers;
+
+  std::thread data_thread{
+      [&subscribers, &config]() { RunDataThread(&subscribers, config); }};
+
+  seasocks::Server server(std::shared_ptr<seasocks::Logger>(
+      new aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info)));
+
+  auto websocket_handler = std::make_shared<aos::web_proxy::WebsocketHandler>(
+      &server, subscribers, config);
   server.addWebSocketHandler("/ws", websocket_handler);
 
-  server.serve("aos/network/www", 8080);
+  server.serve(FLAGS_data_dir.c_str(), 8080);
 }
diff --git a/aos/network/web_proxy_utils.cc b/aos/network/web_proxy_utils.cc
new file mode 100644
index 0000000..dbcc2e6
--- /dev/null
+++ b/aos/network/web_proxy_utils.cc
@@ -0,0 +1,43 @@
+#include "aos/network/web_proxy_utils.h"
+
+namespace aos {
+namespace web_proxy {
+
+// Recommended max size is 64KiB for compatibility reasons. 256KiB theoretically
+// works on chrome but seemed to have some consistency issues. Picked a size in
+// the middle which seems to work.
+constexpr size_t kPacketSize = 125000;
+
+int GetPacketCount(const Context &context) {
+  return context.size / kPacketSize + 1;
+}
+
+flatbuffers::Offset<MessageHeader> PackMessage(
+    flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+    int channel_index, int packet_index) {
+  int packet_count = GetPacketCount(context);
+  flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
+  if (kPacketSize * (packet_index + 1) < context.size) {
+    data_offset = fbb->CreateVector(
+        static_cast<uint8_t *>(context.data) + kPacketSize * packet_index,
+        kPacketSize);
+  } else {
+    int prefix_size = kPacketSize * packet_index;
+    data_offset =
+        fbb->CreateVector(static_cast<uint8_t *>(context.data) + prefix_size,
+                          context.size - prefix_size);
+  }
+
+  MessageHeader::Builder message_header_builder(*fbb);
+  message_header_builder.add_channel_index(channel_index);
+  message_header_builder.add_queue_index(context.queue_index);
+  message_header_builder.add_packet_count(packet_count);
+  message_header_builder.add_packet_index(packet_index);
+  message_header_builder.add_data(data_offset);
+  message_header_builder.add_length(context.size);
+
+  return message_header_builder.Finish();
+}
+
+}  // namespace web_proxy
+}  // namespace aos
diff --git a/aos/network/web_proxy_utils.h b/aos/network/web_proxy_utils.h
new file mode 100644
index 0000000..0672ddc
--- /dev/null
+++ b/aos/network/web_proxy_utils.h
@@ -0,0 +1,20 @@
+#include "aos/network/web_proxy_generated.h"
+#include "aos/events/event_loop.h"
+
+namespace aos {
+namespace web_proxy {
+
+int GetPacketCount(const Context &context);
+
+/*
+ * Packs a message embedded in context into a MessageHeader on fbb. Handles
+ * multipart messages by use of the packet_index.
+ * TODO(alex): make this an iterator that returns each packet sequentially
+ */
+flatbuffers::Offset<MessageHeader> PackMessage(
+    flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+    int channel_index, int packet_index);
+
+
+}  // namespace web_proxy
+}  // namespace aos
diff --git a/aos/network/www/BUILD b/aos/network/www/BUILD
index 5faae12..76e8ef4 100644
--- a/aos/network/www/BUILD
+++ b/aos/network/www/BUILD
@@ -5,27 +5,42 @@
     name = "files",
     srcs = glob([
         "**/*.html",
+        "**/*.css",
     ]),
     visibility=["//visibility:public"],
 )
 
 ts_library(
     name = "proxy",
-    srcs = glob([
-        "*.ts",
-    ]),
+    srcs = [
+        "config_handler.ts",
+        "proxy.ts",
+    ],
     deps = [
         "//aos/network:web_proxy_ts_fbs",
     ],
+    visibility=["//visibility:public"],
+)
+
+ts_library(
+    name = "main",
+    srcs = [
+        "main.ts",
+        "ping_handler.ts",
+    ],
+    deps = [
+        ":proxy",
+        "//aos/events:ping_ts_fbs",
+    ],
 )
 
 rollup_bundle(
-    name = "proxy_bundle",
+    name = "main_bundle",
     entry_point = "aos/network/www/main",
     deps = [
-        "proxy",
+        "main",
     ],
-    visibility=["//visibility:public"],
+    visibility=["//aos:__subpackages__"],
 )
 
 genrule(
@@ -37,5 +52,5 @@
         "flatbuffers.js",
     ],
     cmd = "cp $(location @com_github_google_flatbuffers//:flatjs) $@",
-    visibility=["//visibility:public"],
+    visibility=["//aos:__subpackages__"],
 )
diff --git a/aos/network/www/config_handler.ts b/aos/network/www/config_handler.ts
new file mode 100644
index 0000000..0af78b6
--- /dev/null
+++ b/aos/network/www/config_handler.ts
@@ -0,0 +1,67 @@
+import {aos} from 'aos/network/web_proxy_generated';
+
+export class ConfigHandler {
+  private readonly root_div = document.getElementById('config');
+
+  constructor(
+      private readonly config: aos.Configuration,
+      private readonly dataChannel: RTCDataChannel) {}
+
+  printConfig() {
+    for (const i = 0; i < this.config.channelsLength(); i++) {
+      const channel_div = document.createElement('div');
+      channel_div.classList.add('channel');
+      this.root_div.appendChild(channel_div);
+
+      const input_el = document.createElement('input');
+      input_el.setAttribute('data-index', i);
+      input_el.setAttribute('type', 'checkbox');
+      input_el.addEventListener('click', () => this.handleChange());
+      channel_div.appendChild(input_el);
+
+      const name_div = document.createElement('div');
+      const name_text = document.createTextNode(this.config.channels(i).name());
+      name_div.appendChild(name_text);
+      const type_div = document.createElement('div');
+      const type_text = document.createTextNode(this.config.channels(i).type());
+      type_div.appendChild(type_text);
+      const info_div = document.createElement('div');
+      info_div.appendChild(name_div);
+      info_div.appendChild(type_div);
+
+      channel_div.appendChild(info_div);
+    }
+  }
+
+  handleChange() {
+    const toggles = this.root_div.getElementsByTagName('input');
+    const builder = new flatbuffers.Builder(512);
+
+    const channels: flatbuffers.Offset[] = [];
+    for (const toggle of toggles) {
+      if (!toggle.checked) {
+        continue;
+      }
+      const index = toggle.getAttribute('data-index');
+      const channel = this.config.channels(index);
+      const namefb = builder.createString(channel.name());
+      const typefb = builder.createString(channel.type());
+      aos.Channel.startChannel(builder);
+      aos.Channel.addName(builder, namefb);
+      aos.Channel.addType(builder, typefb);
+      const channelfb = aos.Channel.endChannel(builder);
+      channels.push(channelfb);
+    }
+
+    const channelsfb =
+        aos.message_bridge.Connect.createChannelsToTransferVector(
+            builder, channels);
+    aos.message_bridge.Connect.startConnect(builder);
+    aos.message_bridge.Connect.addChannelsToTransfer(builder, channelsfb);
+    const connect = aos.message_bridge.Connect.endConnect(builder);
+    builder.finish(connect);
+    const array = builder.asUint8Array();
+    console.log('connect', array);
+    this.dataChannel.send(array.buffer.slice(array.byteOffset));
+  }
+}
diff --git a/aos/network/www/index.html b/aos/network/www/index.html
index bc90d40..97e16d4 100644
--- a/aos/network/www/index.html
+++ b/aos/network/www/index.html
@@ -1,6 +1,11 @@
 <html>
-  <body>
+  <head>
     <script src="flatbuffers.js"></script>
-    <script src="proxy_bundle.min.js"></script>
+    <script src="main_bundle.min.js" defer></script>
+    <link rel="stylesheet" href="styles.css">
+  </head>
+  <body>
+    <div id="config">
+    </div>
   </body>
 </html>
diff --git a/aos/network/www/main.ts b/aos/network/www/main.ts
index 5a3165e..1840ffb 100644
--- a/aos/network/www/main.ts
+++ b/aos/network/www/main.ts
@@ -1,5 +1,10 @@
 import {Connection} from './proxy';
+import * as PingHandler from './ping_handler';
 
 const conn = new Connection();
 
 conn.connect();
+
+PingHandler.SetupDom();
+
+conn.addHandler(PingHandler.GetId(), PingHandler.HandlePing);
diff --git a/aos/network/www/ping_handler.ts b/aos/network/www/ping_handler.ts
new file mode 100644
index 0000000..9b37d70
--- /dev/null
+++ b/aos/network/www/ping_handler.ts
@@ -0,0 +1,26 @@
+import {aos} from 'aos/events/ping_generated';
+
+export function HandlePing(data: Uint8Array) {
+  const fbBuffer = new flatbuffers.ByteBuffer(data);
+  const ping = aos.examples.Ping.getRootAsPing(fbBuffer);
+
+  document.getElementById('val').innerHTML = ping.value();
+  document.getElementById('time').innerHTML = ping.sendTime().low;
+}
+
+export function SetupDom() {
+  const ping_div = document.createElement('div');
+  document.body.appendChild(ping_div);
+
+  const value_div = document.createElement('div');
+  value_div.setAttribute('id', 'val');
+  const time_div = document.createElement('div');
+  time_div.setAttribute('id', 'time');
+
+  ping_div.appendChild(value_div);
+  ping_div.appendChild(time_div);
+}
+
+export function GetId() {
+  return aos.examples.Ping.getFullyQualifiedName();
+}
diff --git a/aos/network/www/proxy.ts b/aos/network/www/proxy.ts
index 1ef6320..27ffbfe 100644
--- a/aos/network/www/proxy.ts
+++ b/aos/network/www/proxy.ts
@@ -1,4 +1,41 @@
-import {aos.web_proxy} from '../web_proxy_generated';
+import {aos} from 'aos/network/web_proxy_generated';
+import {ConfigHandler} from './config_handler';
+
+// There is one handler for each DataChannel, it maintains the state of
+// multi-part messages and delegates to a callback when the message is fully
+// assembled.
+export class Handler {
+  private dataBuffer: Uint8Array|null = null;
+  private receivedMessageLength: number = 0;
+  constructor(
+      private readonly handlerFunc: (data: Uint8Array) => void,
+      private readonly channel: RTCPeerConnection) {
+    channel.addEventListener('message', (e) => this.handleMessage(e));
+  }
+
+  handleMessage(e: MessageEvent): void {
+    const fbBuffer = new flatbuffers.ByteBuffer(new Uint8Array(e.data));
+    const messageHeader =
+        aos.web_proxy.MessageHeader.getRootAsMessageHeader(fbBuffer);
+    // Short circuit if only one packet
+    if (messageHeader.packetCount === 1) {
+      this.handlerFunc(messageHeader.dataArray());
+      return;
+    }
+
+    if (messageHeader.packetIndex() === 0) {
+      this.dataBuffer = new Uint8Array(messageHeader.length());
+    }
+    this.dataBuffer.set(
+        messageHeader.dataArray(),
+        this.receivedMessageLength);
+    this.receivedMessageLength += messageHeader.dataLength();
+
+    if (messageHeader.packetIndex() === messageHeader.packetCount() - 1) {
+      this.handlerFunc(this.dataBuffer);
+    }
+  }
+}
 
 // Analogous to the Connection class in //aos/network/web_proxy.h. Because most
 // of the apis are native in JS, it is much simpler.
@@ -7,12 +44,20 @@
   private rtcPeerConnection: RTCPeerConnection|null = null;
   private dataChannel: DataChannel|null = null;
   private webSocketUrl: string;
+  private configHandler: ConfigHandler|null =
+      null private config: aos.Configuration|null = null;
+  private readonly handlerFuncs = new Map<string, (data: Uint8Array) => void>();
+  private readonly handlers = new Set<Handler>();
 
   constructor() {
     const server = location.host;
     this.webSocketUrl = `ws://${server}/ws`;
   }
 
+  addHandler(id: string, handler: (data: Uint8Array) => void): void {
+    this.handlerFuncs.set(id, handler);
+  }
+
   connect(): void {
     this.webSocketConnection = new WebSocket(this.webSocketUrl);
     this.webSocketConnection.binaryType = 'arraybuffer';
@@ -22,14 +67,29 @@
         'message', (e) => this.onWebSocketMessage(e));
   }
 
-  // Handle messages on the DataChannel. Will delegate to various handlers for
-  // different message types.
+  // Handle messages on the DataChannel. Handles the Configuration message as
+  // all other messages are sent on specific DataChannels.
   onDataChannelMessage(e: MessageEvent): void {
-    console.log(e);
+    const fbBuffer = new flatbuffers.ByteBuffer(new Uint8Array(e.data));
+    // TODO(alex): handle config updates if/when required
+    if (!this.configHandler) {
+      const config = aos.Configuration.getRootAsConfiguration(fbBuffer);
+      this.config = config;
+      this.configHandler = new ConfigHandler(config, this.dataChannel);
+      this.configHandler.printConfig();
+      return;
+    }
+  }
+
+  onDataChannel(ev: RTCDataChannelEvent): void {
+    const channel = ev.channel;
+    const name = channel.label;
+    const channelType = name.split('/').pop();
+    const handlerFunc = this.handlerFuncs.get(channelType);
+    this.handlers.add(new Handler(handlerFunc, channel));
   }
 
   onIceCandidate(e: RTCPeerConnectionIceEvent): void {
-    console.log('Created ice candidate', e);
     if (!e.candidate) {
       return;
     }
@@ -49,7 +109,6 @@
 
   // Called for new SDPs. Make sure to set it locally and remotely.
   onOfferCreated(description: RTCSessionDescription): void {
-    console.log('Created offer', description);
     this.rtcPeerConnection.setLocalDescription(description);
     const builder = new flatbuffers.Builder(512);
     const offerString = builder.createString(description.sdp);
@@ -67,7 +126,9 @@
   // want a DataChannel, so create it and then create an offer to send.
   onWebSocketOpen(): void {
     this.rtcPeerConnection = new RTCPeerConnection({});
-    this.dataChannel = this.rtcPeerConnection.createDataChannel('dc');
+    this.rtcPeerConnection.addEventListener(
+        'datachannel', (e) => this.onDataCnannel(e));
+    this.dataChannel = this.rtcPeerConnection.createDataChannel('signalling');
     this.dataChannel.addEventListener(
         'message', (e) => this.onDataChannelMessage(e));
     window.dc = this.dataChannel;
@@ -81,14 +142,12 @@
   // and handle appropriately. Either by setting the remote description or
   // adding the remote ice candidate.
   onWebSocketMessage(e: MessageEvent): void {
-    console.log('ws: ', e);
     const buffer = new Uint8Array(e.data)
     const fbBuffer = new flatbuffers.ByteBuffer(buffer);
     const message =
         aos.web_proxy.WebSocketMessage.getRootAsWebSocketMessage(fbBuffer);
     switch (message.payloadType()) {
       case aos.web_proxy.Payload.WebSocketSdp:
-        console.log('got an sdp message');
         const sdpFb = message.payload(new aos.web_proxy.WebSocketSdp());
         if (sdpFb.type() !== aos.web_proxy.SdpType.ANSWER) {
           console.log('got something other than an answer back');
@@ -98,7 +157,6 @@
             {'type': 'answer', 'sdp': sdpFb.payload()}));
         break;
       case aos.web_proxy.Payload.WebSocketIce:
-        console.log('got an ice message');
         const iceFb = message.payload(new aos.web_proxy.WebSocketIce());
         const candidate = {} as RTCIceCandidateInit;
         candidate.candidate = iceFb.candidate();
diff --git a/aos/network/www/styles.css b/aos/network/www/styles.css
new file mode 100644
index 0000000..23ceb21
--- /dev/null
+++ b/aos/network/www/styles.css
@@ -0,0 +1,5 @@
+.channel {
+  display: flex;
+  border-bottom: 1px solid;
+  font-size: 24px;
+}
diff --git a/third_party/flatbuffers/build_defs.bzl b/third_party/flatbuffers/build_defs.bzl
index 2ddc65b..a7e5576 100644
--- a/third_party/flatbuffers/build_defs.bzl
+++ b/third_party/flatbuffers/build_defs.bzl
@@ -33,6 +33,7 @@
     "--no-ts-reexport",
     "--reflect-names",
     "--reflect-types",
+    "--gen-name-strings",
 ]
 
 def flatbuffer_library_public(
diff --git a/third_party/flatbuffers/src/idl_gen_js_ts.cpp b/third_party/flatbuffers/src/idl_gen_js_ts.cpp
index 9c89c1a..be0a205 100644
--- a/third_party/flatbuffers/src/idl_gen_js_ts.cpp
+++ b/third_party/flatbuffers/src/idl_gen_js_ts.cpp
@@ -768,6 +768,14 @@
       code += "');\n};\n\n";
     }
 
+    // Generate the name method
+    if (parser_.opts.generate_name_strings) {
+      code +=
+        "static getFullyQualifiedName(): string {\n"
+        "  return '" + object_namespace + "." + struct_def.name + "';\n"
+        "}\n";
+    }
+
     // Emit field accessors
     for (auto it = struct_def.fields.vec.begin();
          it != struct_def.fields.vec.end(); ++it) {
diff --git a/y2020/BUILD b/y2020/BUILD
index 1d281bb..c80550b 100644
--- a/y2020/BUILD
+++ b/y2020/BUILD
@@ -125,3 +125,15 @@
     srcs = ["__init__.py"],
     visibility = ["//visibility:public"],
 )
+
+sh_binary(
+    name = "web_proxy",
+    srcs = ["web_proxy.sh"],
+    data = [
+        ":config.json",
+        "//aos/network:web_proxy_main",
+        "//y2020/www:main_bundle",
+        "//y2020/www:files",
+        "//y2020/www:flatbuffers",
+    ],
+)
diff --git a/y2020/vision/BUILD b/y2020/vision/BUILD
index 2e3723d..fa30531 100644
--- a/y2020/vision/BUILD
+++ b/y2020/vision/BUILD
@@ -1,4 +1,4 @@
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
 load("//aos:config.bzl", "aos_config")
 
 flatbuffer_cc_library(
@@ -36,3 +36,9 @@
         "//aos/events:shm_event_loop",
     ],
 )
+
+flatbuffer_ts_library(
+    name = "vision_ts_fbs",
+    srcs = ["vision.fbs"],
+    visibility = ["//y2020:__subpackages__"],
+)
diff --git a/y2020/vision/vision.fbs b/y2020/vision/vision.fbs
index 66f695b..b8d8bd9 100644
--- a/y2020/vision/vision.fbs
+++ b/y2020/vision/vision.fbs
@@ -14,7 +14,7 @@
   // The number of columns in the image.
   cols:int;
   // The image data.
-  data:[byte];
+  data:[ubyte];
   // Timestamp when the frame was captured.
   monotonic_timestamp_ns:long;
   realtime_timestamp_ns:long;
diff --git a/y2020/web_proxy.sh b/y2020/web_proxy.sh
new file mode 100755
index 0000000..8e1a570
--- /dev/null
+++ b/y2020/web_proxy.sh
@@ -0,0 +1 @@
+./aos/network/web_proxy_main --config=y2020/config.json --data_dir=y2020/www
diff --git a/y2020/www/BUILD b/y2020/www/BUILD
new file mode 100644
index 0000000..146456f
--- /dev/null
+++ b/y2020/www/BUILD
@@ -0,0 +1,45 @@
+load("@build_bazel_rules_typescript//:defs.bzl", "ts_library")
+load("@build_bazel_rules_nodejs//:defs.bzl", "rollup_bundle")
+
+ts_library(
+    name = "main",
+    srcs = [
+        "main.ts",
+        "image_handler.ts",
+    ],
+    deps = [
+        "//aos/network/www:proxy",
+        "//y2020/vision:vision_ts_fbs",
+    ],
+    visibility = ["//y2020:__subpackages__"],
+)
+
+rollup_bundle(
+    name = "main_bundle",
+    entry_point = "y2020/www/main",
+    deps = [
+        "main",
+    ],
+    visibility = ["//y2020:__subpackages__"],
+)
+
+filegroup(
+    name = "files",
+    srcs = glob([
+        "**/*.html",
+        "**/*.css",
+    ]),
+    visibility=["//visibility:public"],
+)
+
+genrule(
+    name = "flatbuffers",
+    srcs = [
+        "@com_github_google_flatbuffers//:flatjs",
+    ],
+    outs = [
+        "flatbuffers.js",
+    ],
+    cmd = "cp $(location @com_github_google_flatbuffers//:flatjs) $@",
+    visibility=["//y2020:__subpackages__"],
+)
diff --git a/y2020/www/image_handler.ts b/y2020/www/image_handler.ts
new file mode 100644
index 0000000..abaf831
--- /dev/null
+++ b/y2020/www/image_handler.ts
@@ -0,0 +1,61 @@
+import {frc971} from 'y2020/vision/vision_generated';
+
+export class ImageHandler {
+  private canvas = document.createElement('canvas');
+
+  constructor() {
+    document.body.appendChild(this.canvas);
+  }
+
+  handleImage(data: Uint8Array) {
+    const fbBuffer = new flatbuffers.ByteBuffer(data);
+    const image = frc971.vision.CameraImage.getRootAsCameraImage(fbBuffer);
+
+    const width = image.cols();
+    const height = image.rows();
+    if (width === 0 || height === 0) {
+      return;
+    }
+    const imageBuffer = new Uint8ClampedArray(width * height * 4); // RGBA
+
+    // Read four bytes (YUYV) from the data and transform into two pixels of
+    // RGBA for canvas
+    for (const j = 0; j < height; j++) {
+      for (const i = 0; i < width; i += 2) {
+        const y1 = image.data((j * width + i) * 2);
+        const u = image.data((j * width + i) * 2 + 1);
+        const y2 = image.data((j * width + i + 1) * 2);
+        const v = image.data((j * width + i + 1) * 2 + 1);
+
+        // Based on https://en.wikipedia.org/wiki/YUV#Converting_between_Y%E2%80%B2UV_and_RGB
+        const c1 = y1 - 16;
+        const c2 = y2 - 16;
+        const d = u - 128;
+        const e = v - 128;
+
+        imageBuffer[(j * width + i) * 4 + 0] = (298 * c1 + 409 * e + 128) >> 8;
+        imageBuffer[(j * width + i) * 4 + 1] =
+            (298 * c1 - 100 * d - 208 * e + 128) >> 8;
+        imageBuffer[(j * width + i) * 4 + 2] = (298 * c1 + 516 * d + 128) >> 8;
+        imageBuffer[(j * width + i) * 4 + 3] = 255;
+        imageBuffer[(j * width + i) * 4 + 4] = (298 * c2 + 409 * e + 128) >> 8;
+        imageBuffer[(j * width + i) * 4 + 5] =
+            (298 * c2 - 100 * d - 208 * e + 128) >> 8;
+        imageBuffer[(j * width + i) * 4 + 6] = (298 * c2 + 516 * d + 128) >> 8;
+        imageBuffer[(j * width + i) * 4 + 7] = 255;
+      }
+    }
+
+    const ctx = this.canvas.getContext('2d');
+
+    this.canvas.width = width;
+    this.canvas.height = height;
+    const idata = ctx.createImageData(width, height);
+    idata.data.set(imageBuffer);
+    ctx.putImageData(idata, 0, 0);
+  }
+
+  getId() {
+    return frc971.vision.CameraImage.getFullyQualifiedName();
+  }
+}
diff --git a/y2020/www/index.html b/y2020/www/index.html
new file mode 100644
index 0000000..97e16d4
--- /dev/null
+++ b/y2020/www/index.html
@@ -0,0 +1,11 @@
+<html>
+  <head>
+    <script src="flatbuffers.js"></script>
+    <script src="main_bundle.min.js" defer></script>
+    <link rel="stylesheet" href="styles.css">
+  </head>
+  <body>
+    <div id="config">
+    </div>
+  </body>
+</html>
diff --git a/y2020/www/main.ts b/y2020/www/main.ts
new file mode 100644
index 0000000..7831713
--- /dev/null
+++ b/y2020/www/main.ts
@@ -0,0 +1,11 @@
+import {Connection} from 'aos/network/www/proxy';
+
+import {ImageHandler} from './image_handler';
+
+const conn = new Connection();
+
+conn.connect();
+
+const iHandler = new ImageHandler();
+
+conn.addHandler(iHandler.getId(), (data) => iHandler.handleImage(data));
diff --git a/y2020/www/styles.css b/y2020/www/styles.css
new file mode 100644
index 0000000..23ceb21
--- /dev/null
+++ b/y2020/www/styles.css
@@ -0,0 +1,5 @@
+.channel {
+  display: flex;
+  border-bottom: 1px solid;
+  font-size: 24px;
+}