Handle subscription of messages in the webapp.
Messages larger than a threshold are split and reassembled due to the
size limit in webrtc. Threshold may have to be adjusted somewhere between
64KiB and 256KiB.
This also includes a basic handler for a ping message and a more
advanced image handler.
Change-Id: If66acfb1bb84e9d3ff686994a94b1480cb70b2aa
diff --git a/aos/network/www/proxy.ts b/aos/network/www/proxy.ts
index 1ef6320..27ffbfe 100644
--- a/aos/network/www/proxy.ts
+++ b/aos/network/www/proxy.ts
@@ -1,4 +1,41 @@
-import {aos.web_proxy} from '../web_proxy_generated';
+import {aos} from 'aos/network/web_proxy_generated';
+import {ConfigHandler} from './config_handler';
+
+// There is one handler for each DataChannel, it maintains the state of
+// multi-part messages and delegates to a callback when the message is fully
+// assembled.
+export class Handler {
+ private dataBuffer: Uint8Array|null = null;
+ private receivedMessageLength: number = 0;
+ constructor(
+ private readonly handlerFunc: (data: Uint8Array) => void,
+ private readonly channel: RTCPeerConnection) {
+ channel.addEventListener('message', (e) => this.handleMessage(e));
+ }
+
+ handleMessage(e: MessageEvent): void {
+ const fbBuffer = new flatbuffers.ByteBuffer(new Uint8Array(e.data));
+ const messageHeader =
+ aos.web_proxy.MessageHeader.getRootAsMessageHeader(fbBuffer);
+ // Short circuit if only one packet
+ if (messageHeader.packetCount === 1) {
+ this.handlerFunc(messageHeader.dataArray());
+ return;
+ }
+
+ if (messageHeader.packetIndex() === 0) {
+ this.dataBuffer = new Uint8Array(messageHeader.length());
+ }
+ this.dataBuffer.set(
+ messageHeader.dataArray(),
+ this.receivedMessageLength);
+ this.receivedMessageLength += messageHeader.dataLength();
+
+ if (messageHeader.packetIndex() === messageHeader.packetCount() - 1) {
+ this.handlerFunc(this.dataBuffer);
+ }
+ }
+}
// Analogous to the Connection class in //aos/network/web_proxy.h. Because most
// of the apis are native in JS, it is much simpler.
@@ -7,12 +44,20 @@
private rtcPeerConnection: RTCPeerConnection|null = null;
private dataChannel: DataChannel|null = null;
private webSocketUrl: string;
+ private configHandler: ConfigHandler|null =
+ null private config: aos.Configuration|null = null;
+ private readonly handlerFuncs = new Map<string, (data: Uint8Array) => void>();
+ private readonly handlers = new Set<Handler>();
constructor() {
const server = location.host;
this.webSocketUrl = `ws://${server}/ws`;
}
+ addHandler(id: string, handler: (data: Uint8Array) => void): void {
+ this.handlerFuncs.set(id, handler);
+ }
+
connect(): void {
this.webSocketConnection = new WebSocket(this.webSocketUrl);
this.webSocketConnection.binaryType = 'arraybuffer';
@@ -22,14 +67,29 @@
'message', (e) => this.onWebSocketMessage(e));
}
- // Handle messages on the DataChannel. Will delegate to various handlers for
- // different message types.
+ // Handle messages on the DataChannel. Handles the Configuration message as
+ // all other messages are sent on specific DataChannels.
onDataChannelMessage(e: MessageEvent): void {
- console.log(e);
+ const fbBuffer = new flatbuffers.ByteBuffer(new Uint8Array(e.data));
+ // TODO(alex): handle config updates if/when required
+ if (!this.configHandler) {
+ const config = aos.Configuration.getRootAsConfiguration(fbBuffer);
+ this.config = config;
+ this.configHandler = new ConfigHandler(config, this.dataChannel);
+ this.configHandler.printConfig();
+ return;
+ }
+ }
+
+ onDataChannel(ev: RTCDataChannelEvent): void {
+ const channel = ev.channel;
+ const name = channel.label;
+ const channelType = name.split('/').pop();
+ const handlerFunc = this.handlerFuncs.get(channelType);
+ this.handlers.add(new Handler(handlerFunc, channel));
}
onIceCandidate(e: RTCPeerConnectionIceEvent): void {
- console.log('Created ice candidate', e);
if (!e.candidate) {
return;
}
@@ -49,7 +109,6 @@
// Called for new SDPs. Make sure to set it locally and remotely.
onOfferCreated(description: RTCSessionDescription): void {
- console.log('Created offer', description);
this.rtcPeerConnection.setLocalDescription(description);
const builder = new flatbuffers.Builder(512);
const offerString = builder.createString(description.sdp);
@@ -67,7 +126,9 @@
// want a DataChannel, so create it and then create an offer to send.
onWebSocketOpen(): void {
this.rtcPeerConnection = new RTCPeerConnection({});
- this.dataChannel = this.rtcPeerConnection.createDataChannel('dc');
+ this.rtcPeerConnection.addEventListener(
+ 'datachannel', (e) => this.onDataCnannel(e));
+ this.dataChannel = this.rtcPeerConnection.createDataChannel('signalling');
this.dataChannel.addEventListener(
'message', (e) => this.onDataChannelMessage(e));
window.dc = this.dataChannel;
@@ -81,14 +142,12 @@
// and handle appropriately. Either by setting the remote description or
// adding the remote ice candidate.
onWebSocketMessage(e: MessageEvent): void {
- console.log('ws: ', e);
const buffer = new Uint8Array(e.data)
const fbBuffer = new flatbuffers.ByteBuffer(buffer);
const message =
aos.web_proxy.WebSocketMessage.getRootAsWebSocketMessage(fbBuffer);
switch (message.payloadType()) {
case aos.web_proxy.Payload.WebSocketSdp:
- console.log('got an sdp message');
const sdpFb = message.payload(new aos.web_proxy.WebSocketSdp());
if (sdpFb.type() !== aos.web_proxy.SdpType.ANSWER) {
console.log('got something other than an answer back');
@@ -98,7 +157,6 @@
{'type': 'answer', 'sdp': sdpFb.payload()}));
break;
case aos.web_proxy.Payload.WebSocketIce:
- console.log('got an ice message');
const iceFb = message.payload(new aos.web_proxy.WebSocketIce());
const candidate = {} as RTCIceCandidateInit;
candidate.candidate = iceFb.candidate();