Handle subscription of messages in the webapp.

Messages larger than a threshold are split and reassembled due to the
size limit in webrtc. Threshold may have to be adjusted somewhere between
64KiB and 256KiB.

This also includes a basic handler for a ping message and a more
advanced image handler.

Change-Id: If66acfb1bb84e9d3ff686994a94b1480cb70b2aa
diff --git a/aos/network/www/BUILD b/aos/network/www/BUILD
index 5faae12..76e8ef4 100644
--- a/aos/network/www/BUILD
+++ b/aos/network/www/BUILD
@@ -5,27 +5,42 @@
     name = "files",
     srcs = glob([
         "**/*.html",
+        "**/*.css",
     ]),
     visibility=["//visibility:public"],
 )
 
 ts_library(
     name = "proxy",
-    srcs = glob([
-        "*.ts",
-    ]),
+    srcs = [
+        "config_handler.ts",
+        "proxy.ts",
+    ],
     deps = [
         "//aos/network:web_proxy_ts_fbs",
     ],
+    visibility=["//visibility:public"],
+)
+
+ts_library(
+    name = "main",
+    srcs = [
+        "main.ts",
+        "ping_handler.ts",
+    ],
+    deps = [
+        ":proxy",
+        "//aos/events:ping_ts_fbs",
+    ],
 )
 
 rollup_bundle(
-    name = "proxy_bundle",
+    name = "main_bundle",
     entry_point = "aos/network/www/main",
     deps = [
-        "proxy",
+        "main",
     ],
-    visibility=["//visibility:public"],
+    visibility=["//aos:__subpackages__"],
 )
 
 genrule(
@@ -37,5 +52,5 @@
         "flatbuffers.js",
     ],
     cmd = "cp $(location @com_github_google_flatbuffers//:flatjs) $@",
-    visibility=["//visibility:public"],
+    visibility=["//aos:__subpackages__"],
 )
diff --git a/aos/network/www/config_handler.ts b/aos/network/www/config_handler.ts
new file mode 100644
index 0000000..0af78b6
--- /dev/null
+++ b/aos/network/www/config_handler.ts
@@ -0,0 +1,67 @@
+import {aos} from 'aos/network/web_proxy_generated';
+
+export class ConfigHandler {
+  private readonly root_div = document.getElementById('config');
+
+  constructor(
+      private readonly config: aos.Configuration,
+      private readonly dataChannel: RTCDataChannel) {}
+
+  printConfig() {
+    for (const i = 0; i < this.config.channelsLength(); i++) {
+      const channel_div = document.createElement('div');
+      channel_div.classList.add('channel');
+      this.root_div.appendChild(channel_div);
+
+      const input_el = document.createElement('input');
+      input_el.setAttribute('data-index', i);
+      input_el.setAttribute('type', 'checkbox');
+      input_el.addEventListener('click', () => this.handleChange());
+      channel_div.appendChild(input_el);
+
+      const name_div = document.createElement('div');
+      const name_text = document.createTextNode(this.config.channels(i).name());
+      name_div.appendChild(name_text);
+      const type_div = document.createElement('div');
+      const type_text = document.createTextNode(this.config.channels(i).type());
+      type_div.appendChild(type_text);
+      const info_div = document.createElement('div');
+      info_div.appendChild(name_div);
+      info_div.appendChild(type_div);
+
+      channel_div.appendChild(info_div);
+    }
+  }
+
+  handleChange() {
+    const toggles = this.root_div.getElementsByTagName('input');
+    const builder = new flatbuffers.Builder(512);
+
+    const channels: flatbuffers.Offset[] = [];
+    for (const toggle of toggles) {
+      if (!toggle.checked) {
+        continue;
+      }
+      const index = toggle.getAttribute('data-index');
+      const channel = this.config.channels(index);
+      const namefb = builder.createString(channel.name());
+      const typefb = builder.createString(channel.type());
+      aos.Channel.startChannel(builder);
+      aos.Channel.addName(builder, namefb);
+      aos.Channel.addType(builder, typefb);
+      const channelfb = aos.Channel.endChannel(builder);
+      channels.push(channelfb);
+    }
+
+    const channelsfb =
+        aos.message_bridge.Connect.createChannelsToTransferVector(
+            builder, channels);
+    aos.message_bridge.Connect.startConnect(builder);
+    aos.message_bridge.Connect.addChannelsToTransfer(builder, channelsfb);
+    const connect = aos.message_bridge.Connect.endConnect(builder);
+    builder.finish(connect);
+    const array = builder.asUint8Array();
+    console.log('connect', array);
+    this.dataChannel.send(array.buffer.slice(array.byteOffset));
+  }
+}
diff --git a/aos/network/www/index.html b/aos/network/www/index.html
index bc90d40..97e16d4 100644
--- a/aos/network/www/index.html
+++ b/aos/network/www/index.html
@@ -1,6 +1,11 @@
 <html>
-  <body>
+  <head>
     <script src="flatbuffers.js"></script>
-    <script src="proxy_bundle.min.js"></script>
+    <script src="main_bundle.min.js" defer></script>
+    <link rel="stylesheet" href="styles.css">
+  </head>
+  <body>
+    <div id="config">
+    </div>
   </body>
 </html>
diff --git a/aos/network/www/main.ts b/aos/network/www/main.ts
index 5a3165e..1840ffb 100644
--- a/aos/network/www/main.ts
+++ b/aos/network/www/main.ts
@@ -1,5 +1,10 @@
 import {Connection} from './proxy';
+import * as PingHandler from './ping_handler';
 
 const conn = new Connection();
 
 conn.connect();
+
+PingHandler.SetupDom();
+
+conn.addHandler(PingHandler.GetId(), PingHandler.HandlePing);
diff --git a/aos/network/www/ping_handler.ts b/aos/network/www/ping_handler.ts
new file mode 100644
index 0000000..9b37d70
--- /dev/null
+++ b/aos/network/www/ping_handler.ts
@@ -0,0 +1,26 @@
+import {aos} from 'aos/events/ping_generated';
+
+export function HandlePing(data: Uint8Array) {
+  const fbBuffer = new flatbuffers.ByteBuffer(data);
+  const ping = aos.examples.Ping.getRootAsPing(fbBuffer);
+
+  document.getElementById('val').innerHTML = ping.value();
+  document.getElementById('time').innerHTML = ping.sendTime().low;
+}
+
+export function SetupDom() {
+  const ping_div = document.createElement('div');
+  document.body.appendChild(ping_div);
+
+  const value_div = document.createElement('div');
+  value_div.setAttribute('id', 'val');
+  const time_div = document.createElement('div');
+  time_div.setAttribute('id', 'time');
+
+  ping_div.appendChild(value_div);
+  ping_div.appendChild(time_div);
+}
+
+export function GetId() {
+  return aos.examples.Ping.getFullyQualifiedName();
+}
diff --git a/aos/network/www/proxy.ts b/aos/network/www/proxy.ts
index 1ef6320..27ffbfe 100644
--- a/aos/network/www/proxy.ts
+++ b/aos/network/www/proxy.ts
@@ -1,4 +1,41 @@
-import {aos.web_proxy} from '../web_proxy_generated';
+import {aos} from 'aos/network/web_proxy_generated';
+import {ConfigHandler} from './config_handler';
+
+// There is one handler for each DataChannel, it maintains the state of
+// multi-part messages and delegates to a callback when the message is fully
+// assembled.
+export class Handler {
+  private dataBuffer: Uint8Array|null = null;
+  private receivedMessageLength: number = 0;
+  constructor(
+      private readonly handlerFunc: (data: Uint8Array) => void,
+      private readonly channel: RTCPeerConnection) {
+    channel.addEventListener('message', (e) => this.handleMessage(e));
+  }
+
+  handleMessage(e: MessageEvent): void {
+    const fbBuffer = new flatbuffers.ByteBuffer(new Uint8Array(e.data));
+    const messageHeader =
+        aos.web_proxy.MessageHeader.getRootAsMessageHeader(fbBuffer);
+    // Short circuit if only one packet
+    if (messageHeader.packetCount === 1) {
+      this.handlerFunc(messageHeader.dataArray());
+      return;
+    }
+
+    if (messageHeader.packetIndex() === 0) {
+      this.dataBuffer = new Uint8Array(messageHeader.length());
+    }
+    this.dataBuffer.set(
+        messageHeader.dataArray(),
+        this.receivedMessageLength);
+    this.receivedMessageLength += messageHeader.dataLength();
+
+    if (messageHeader.packetIndex() === messageHeader.packetCount() - 1) {
+      this.handlerFunc(this.dataBuffer);
+    }
+  }
+}
 
 // Analogous to the Connection class in //aos/network/web_proxy.h. Because most
 // of the apis are native in JS, it is much simpler.
@@ -7,12 +44,20 @@
   private rtcPeerConnection: RTCPeerConnection|null = null;
   private dataChannel: DataChannel|null = null;
   private webSocketUrl: string;
+  private configHandler: ConfigHandler|null =
+      null private config: aos.Configuration|null = null;
+  private readonly handlerFuncs = new Map<string, (data: Uint8Array) => void>();
+  private readonly handlers = new Set<Handler>();
 
   constructor() {
     const server = location.host;
     this.webSocketUrl = `ws://${server}/ws`;
   }
 
+  addHandler(id: string, handler: (data: Uint8Array) => void): void {
+    this.handlerFuncs.set(id, handler);
+  }
+
   connect(): void {
     this.webSocketConnection = new WebSocket(this.webSocketUrl);
     this.webSocketConnection.binaryType = 'arraybuffer';
@@ -22,14 +67,29 @@
         'message', (e) => this.onWebSocketMessage(e));
   }
 
-  // Handle messages on the DataChannel. Will delegate to various handlers for
-  // different message types.
+  // Handle messages on the DataChannel. Handles the Configuration message as
+  // all other messages are sent on specific DataChannels.
   onDataChannelMessage(e: MessageEvent): void {
-    console.log(e);
+    const fbBuffer = new flatbuffers.ByteBuffer(new Uint8Array(e.data));
+    // TODO(alex): handle config updates if/when required
+    if (!this.configHandler) {
+      const config = aos.Configuration.getRootAsConfiguration(fbBuffer);
+      this.config = config;
+      this.configHandler = new ConfigHandler(config, this.dataChannel);
+      this.configHandler.printConfig();
+      return;
+    }
+  }
+
+  onDataChannel(ev: RTCDataChannelEvent): void {
+    const channel = ev.channel;
+    const name = channel.label;
+    const channelType = name.split('/').pop();
+    const handlerFunc = this.handlerFuncs.get(channelType);
+    this.handlers.add(new Handler(handlerFunc, channel));
   }
 
   onIceCandidate(e: RTCPeerConnectionIceEvent): void {
-    console.log('Created ice candidate', e);
     if (!e.candidate) {
       return;
     }
@@ -49,7 +109,6 @@
 
   // Called for new SDPs. Make sure to set it locally and remotely.
   onOfferCreated(description: RTCSessionDescription): void {
-    console.log('Created offer', description);
     this.rtcPeerConnection.setLocalDescription(description);
     const builder = new flatbuffers.Builder(512);
     const offerString = builder.createString(description.sdp);
@@ -67,7 +126,9 @@
   // want a DataChannel, so create it and then create an offer to send.
   onWebSocketOpen(): void {
     this.rtcPeerConnection = new RTCPeerConnection({});
-    this.dataChannel = this.rtcPeerConnection.createDataChannel('dc');
+    this.rtcPeerConnection.addEventListener(
+        'datachannel', (e) => this.onDataCnannel(e));
+    this.dataChannel = this.rtcPeerConnection.createDataChannel('signalling');
     this.dataChannel.addEventListener(
         'message', (e) => this.onDataChannelMessage(e));
     window.dc = this.dataChannel;
@@ -81,14 +142,12 @@
   // and handle appropriately. Either by setting the remote description or
   // adding the remote ice candidate.
   onWebSocketMessage(e: MessageEvent): void {
-    console.log('ws: ', e);
     const buffer = new Uint8Array(e.data)
     const fbBuffer = new flatbuffers.ByteBuffer(buffer);
     const message =
         aos.web_proxy.WebSocketMessage.getRootAsWebSocketMessage(fbBuffer);
     switch (message.payloadType()) {
       case aos.web_proxy.Payload.WebSocketSdp:
-        console.log('got an sdp message');
         const sdpFb = message.payload(new aos.web_proxy.WebSocketSdp());
         if (sdpFb.type() !== aos.web_proxy.SdpType.ANSWER) {
           console.log('got something other than an answer back');
@@ -98,7 +157,6 @@
             {'type': 'answer', 'sdp': sdpFb.payload()}));
         break;
       case aos.web_proxy.Payload.WebSocketIce:
-        console.log('got an ice message');
         const iceFb = message.payload(new aos.web_proxy.WebSocketIce());
         const candidate = {} as RTCIceCandidateInit;
         candidate.candidate = iceFb.candidate();
diff --git a/aos/network/www/styles.css b/aos/network/www/styles.css
new file mode 100644
index 0000000..23ceb21
--- /dev/null
+++ b/aos/network/www/styles.css
@@ -0,0 +1,5 @@
+.channel {
+  display: flex;
+  border-bottom: 1px solid;
+  font-size: 24px;
+}