blob: 948f8db53dc28f6370b2b7b73d6437130e02e5b3 [file] [log] [blame]
Philipp Schradere625ba22020-11-16 20:11:37 -08001import * as configuration from 'org_frc971/aos/configuration_generated';
2import * as web_proxy from 'org_frc971/aos/network/web_proxy_generated';
3import {Builder} from 'org_frc971/external/com_github_google_flatbuffers/ts/builder';
4import {ByteBuffer} from 'org_frc971/external/com_github_google_flatbuffers/ts/byte-buffer';
5
James Kuszmaul527038a2020-12-21 23:40:44 -08006import ChannelFb = configuration.aos.Channel;
Philipp Schradere625ba22020-11-16 20:11:37 -08007import Configuration = configuration.aos.Configuration;
James Kuszmaul5f5e1232020-12-22 20:58:00 -08008import Schema = configuration.reflection.Schema;
Philipp Schradere625ba22020-11-16 20:11:37 -08009import MessageHeader = web_proxy.aos.web_proxy.MessageHeader;
10import WebSocketIce = web_proxy.aos.web_proxy.WebSocketIce;
11import WebSocketMessage = web_proxy.aos.web_proxy.WebSocketMessage;
12import Payload = web_proxy.aos.web_proxy.Payload;
13import WebSocketSdp = web_proxy.aos.web_proxy.WebSocketSdp;
14import SdpType = web_proxy.aos.web_proxy.SdpType;
James Kuszmaul527038a2020-12-21 23:40:44 -080015import SubscriberRequest = web_proxy.aos.web_proxy.SubscriberRequest;
16import ChannelRequestFb = web_proxy.aos.web_proxy.ChannelRequest;
17import TransferMethod = web_proxy.aos.web_proxy.TransferMethod;
Alex Perry5f474f22020-02-01 12:14:24 -080018
19// There is one handler for each DataChannel, it maintains the state of
20// multi-part messages and delegates to a callback when the message is fully
21// assembled.
22export class Handler {
23 private dataBuffer: Uint8Array|null = null;
24 private receivedMessageLength: number = 0;
25 constructor(
James Kuszmaul48413bf2020-09-01 19:19:05 -070026 private readonly handlerFunc:
27 (data: Uint8Array, sentTime: number) => void,
Philipp Schrader47445a02020-11-14 17:31:04 -080028 private readonly channel: RTCDataChannel) {
Alex Perry5f474f22020-02-01 12:14:24 -080029 channel.addEventListener('message', (e) => this.handleMessage(e));
30 }
31
32 handleMessage(e: MessageEvent): void {
Philipp Schradere625ba22020-11-16 20:11:37 -080033 const fbBuffer = new ByteBuffer(new Uint8Array(e.data));
34 const messageHeader = MessageHeader.getRootAsMessageHeader(
35 fbBuffer as unknown as flatbuffers.ByteBuffer);
James Kuszmaul48413bf2020-09-01 19:19:05 -070036 const time = messageHeader.monotonicSentTime().toFloat64() * 1e-9;
Alex Perry5f474f22020-02-01 12:14:24 -080037 // Short circuit if only one packet
Alex Perry22824d72020-02-29 17:11:43 -080038 if (messageHeader.packetCount() === 1) {
James Kuszmaul48413bf2020-09-01 19:19:05 -070039 this.handlerFunc(messageHeader.dataArray(), time);
Alex Perry5f474f22020-02-01 12:14:24 -080040 return;
41 }
42
43 if (messageHeader.packetIndex() === 0) {
44 this.dataBuffer = new Uint8Array(messageHeader.length());
Alex Perry22824d72020-02-29 17:11:43 -080045 this.receivedMessageLength = 0;
46 }
47 if (!messageHeader.dataLength()) {
48 return;
Alex Perry5f474f22020-02-01 12:14:24 -080049 }
50 this.dataBuffer.set(
51 messageHeader.dataArray(),
52 this.receivedMessageLength);
53 this.receivedMessageLength += messageHeader.dataLength();
54
55 if (messageHeader.packetIndex() === messageHeader.packetCount() - 1) {
James Kuszmaul48413bf2020-09-01 19:19:05 -070056 this.handlerFunc(this.dataBuffer, time);
Alex Perry5f474f22020-02-01 12:14:24 -080057 }
58 }
59}
Alex Perryb3b50792020-01-18 16:13:45 -080060
James Kuszmaul527038a2020-12-21 23:40:44 -080061class Channel {
62 constructor(public readonly name: string, public readonly type: string) {}
63 key(): string {
64 return this.name + "/" + this.type;
65 }
66}
67
68class ChannelRequest {
69 constructor(
70 public readonly channel: Channel,
71 public readonly transferMethod: TransferMethod) {}
72}
73
Alex Perryb3b50792020-01-18 16:13:45 -080074// Analogous to the Connection class in //aos/network/web_proxy.h. Because most
75// of the apis are native in JS, it is much simpler.
76export class Connection {
77 private webSocketConnection: WebSocket|null = null;
78 private rtcPeerConnection: RTCPeerConnection|null = null;
Philipp Schrader47445a02020-11-14 17:31:04 -080079 private dataChannel: RTCDataChannel|null = null;
Alex Perryb3b50792020-01-18 16:13:45 -080080 private webSocketUrl: string;
Alex Perry6249aaf2020-02-29 14:51:49 -080081
Philipp Schradere625ba22020-11-16 20:11:37 -080082 private configInternal: Configuration|null = null;
Alex Perry6249aaf2020-02-29 14:51:49 -080083 // A set of functions that accept the config to handle.
Philipp Schradere625ba22020-11-16 20:11:37 -080084 private readonly configHandlers = new Set<(config: Configuration) => void>();
Alex Perry6249aaf2020-02-29 14:51:49 -080085
James Kuszmaul48413bf2020-09-01 19:19:05 -070086 private readonly handlerFuncs =
James Kuszmaul5f5e1232020-12-22 20:58:00 -080087 new Map<string, ((data: Uint8Array, sentTime: number) => void)[]>();
Alex Perry5f474f22020-02-01 12:14:24 -080088 private readonly handlers = new Set<Handler>();
Alex Perryb3b50792020-01-18 16:13:45 -080089
James Kuszmaul527038a2020-12-21 23:40:44 -080090 private subscribedChannels: ChannelRequest[] = [];
91
Alex Perryb3b50792020-01-18 16:13:45 -080092 constructor() {
93 const server = location.host;
94 this.webSocketUrl = `ws://${server}/ws`;
95 }
96
Philipp Schradere625ba22020-11-16 20:11:37 -080097 addConfigHandler(handler: (config: Configuration) => void): void {
Alex Perry6249aaf2020-02-29 14:51:49 -080098 this.configHandlers.add(handler);
99 }
100
Alex Perryb49a3fb2020-02-29 15:26:54 -0800101 /**
James Kuszmaul527038a2020-12-21 23:40:44 -0800102 * Add a handler for a specific message type, with reliable delivery of all
103 * messages.
Alex Perryb49a3fb2020-02-29 15:26:54 -0800104 */
James Kuszmaul527038a2020-12-21 23:40:44 -0800105 addReliableHandler(
106 name: string, type: string,
107 handler: (data: Uint8Array, sentTime: number) => void): void {
108 this.addHandlerImpl(
109 name, type, TransferMethod.EVERYTHING_WITH_HISTORY, handler);
110 }
111
112 /**
113 * Add a handler for a specific message type.
114 */
115 addHandler(
116 name: string, type: string,
117 handler: (data: Uint8Array, sentTime: number) => void): void {
118 this.addHandlerImpl(name, type, TransferMethod.SUBSAMPLE, handler);
119 }
120
121 addHandlerImpl(
122 name: string, type: string, method: TransferMethod,
123 handler: (data: Uint8Array, sentTime: number) => void): void {
124 const channel = new Channel(name, type);
125 const request = new ChannelRequest(channel, method);
James Kuszmaul5f5e1232020-12-22 20:58:00 -0800126 if (!this.handlerFuncs.has(channel.key())) {
127 this.handlerFuncs.set(channel.key(), []);
128 }
129 this.handlerFuncs.get(channel.key()).push(handler);
James Kuszmaul527038a2020-12-21 23:40:44 -0800130 this.subscribeToChannel(request);
131 }
132
James Kuszmaul5f5e1232020-12-22 20:58:00 -0800133 getSchema(typeName: string): Schema {
134 let schema = null;
135 const config = this.getConfig();
136 for (let ii = 0; ii < config.channelsLength(); ++ii) {
137 if (config.channels(ii).type() === typeName) {
138 schema = config.channels(ii).schema();
139 }
140 }
141 if (schema === null) {
142 throw new Error('Unable to find schema for ' + typeName);
143 }
144 return schema;
145 }
146
James Kuszmaul527038a2020-12-21 23:40:44 -0800147 subscribeToChannel(channel: ChannelRequest): void {
148 this.subscribedChannels.push(channel);
149 if (this.configInternal === null) {
150 throw new Error(
151 'Must call subscribeToChannel after we\'ve received the config.');
152 }
153 const builder = new Builder(512) as unknown as flatbuffers.Builder;
154 const channels: flatbuffers.Offset[] = [];
155 for (const channel of this.subscribedChannels) {
156 const nameFb = builder.createString(channel.channel.name);
157 const typeFb = builder.createString(channel.channel.type);
158 ChannelFb.startChannel(builder);
159 ChannelFb.addName(builder, nameFb);
160 ChannelFb.addType(builder, typeFb);
161 const channelFb = ChannelFb.endChannel(builder);
162 ChannelRequestFb.startChannelRequest(builder);
163 ChannelRequestFb.addChannel(builder, channelFb);
164 ChannelRequestFb.addMethod(builder, channel.transferMethod);
165 channels.push(ChannelRequestFb.endChannelRequest(builder));
166 }
167
168 const channelsFb =
169 SubscriberRequest.createChannelsToTransferVector(builder, channels);
170 SubscriberRequest.startSubscriberRequest(builder);
171 SubscriberRequest.addChannelsToTransfer(builder, channelsFb);
172 const connect = SubscriberRequest.endSubscriberRequest(builder);
173 builder.finish(connect);
174 this.sendConnectMessage(builder);
Alex Perry5f474f22020-02-01 12:14:24 -0800175 }
176
Alex Perryb3b50792020-01-18 16:13:45 -0800177 connect(): void {
178 this.webSocketConnection = new WebSocket(this.webSocketUrl);
179 this.webSocketConnection.binaryType = 'arraybuffer';
180 this.webSocketConnection.addEventListener(
181 'open', () => this.onWebSocketOpen());
182 this.webSocketConnection.addEventListener(
183 'message', (e) => this.onWebSocketMessage(e));
184 }
185
Alex Perry3dfcb812020-03-04 19:32:17 -0800186 getConfig() {
Philipp Schrader47445a02020-11-14 17:31:04 -0800187 return this.configInternal;
Alex Perry6249aaf2020-02-29 14:51:49 -0800188 }
189
Alex Perry5f474f22020-02-01 12:14:24 -0800190 // Handle messages on the DataChannel. Handles the Configuration message as
191 // all other messages are sent on specific DataChannels.
James Kuszmaul1ec74432020-07-30 20:26:45 -0700192 onConfigMessage(data: Uint8Array): void {
Philipp Schradere625ba22020-11-16 20:11:37 -0800193 const fbBuffer = new ByteBuffer(data);
194 this.configInternal = Configuration.getRootAsConfiguration(
195 fbBuffer as unknown as flatbuffers.ByteBuffer);
Alex Perry3dfcb812020-03-04 19:32:17 -0800196 for (const handler of Array.from(this.configHandlers)) {
Alex Perry6249aaf2020-02-29 14:51:49 -0800197 handler(this.configInternal);
Alex Perry5f474f22020-02-01 12:14:24 -0800198 }
199 }
200
201 onDataChannel(ev: RTCDataChannelEvent): void {
202 const channel = ev.channel;
203 const name = channel.label;
James Kuszmaul5f5e1232020-12-22 20:58:00 -0800204 const handlers = this.handlerFuncs.get(name);
205 for (const handler of handlers) {
206 this.handlers.add(new Handler(handler, channel));
207 }
Alex Perryb3b50792020-01-18 16:13:45 -0800208 }
209
210 onIceCandidate(e: RTCPeerConnectionIceEvent): void {
Alex Perryb3b50792020-01-18 16:13:45 -0800211 if (!e.candidate) {
212 return;
213 }
214 const candidate = e.candidate;
Philipp Schradere625ba22020-11-16 20:11:37 -0800215 const builder = new Builder(512);
Alex Perryb3b50792020-01-18 16:13:45 -0800216 const candidateString = builder.createString(candidate.candidate);
217 const sdpMidString = builder.createString(candidate.sdpMid);
218
Philipp Schradere625ba22020-11-16 20:11:37 -0800219 const iceFb = WebSocketIce.createWebSocketIce(
220 builder as unknown as flatbuffers.Builder, candidateString,
221 sdpMidString, candidate.sdpMLineIndex);
222 const messageFb = WebSocketMessage.createWebSocketMessage(
223 builder as unknown as flatbuffers.Builder, Payload.WebSocketIce, iceFb);
Alex Perryb3b50792020-01-18 16:13:45 -0800224 builder.finish(messageFb);
225 const array = builder.asUint8Array();
226 this.webSocketConnection.send(array.buffer.slice(array.byteOffset));
227 }
228
229 // Called for new SDPs. Make sure to set it locally and remotely.
Philipp Schrader47445a02020-11-14 17:31:04 -0800230 onOfferCreated(description: RTCSessionDescriptionInit): void {
Alex Perryb3b50792020-01-18 16:13:45 -0800231 this.rtcPeerConnection.setLocalDescription(description);
Philipp Schradere625ba22020-11-16 20:11:37 -0800232 const builder = new Builder(512);
Alex Perryb3b50792020-01-18 16:13:45 -0800233 const offerString = builder.createString(description.sdp);
234
Philipp Schradere625ba22020-11-16 20:11:37 -0800235 const webSocketSdp = WebSocketSdp.createWebSocketSdp(
236 builder as unknown as flatbuffers.Builder, SdpType.OFFER, offerString);
237 const message = WebSocketMessage.createWebSocketMessage(
238 builder as unknown as flatbuffers.Builder, Payload.WebSocketSdp,
239 webSocketSdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800240 builder.finish(message);
241 const array = builder.asUint8Array();
242 this.webSocketConnection.send(array.buffer.slice(array.byteOffset));
243 }
244
245 // We now have a websocket, so start setting up the peer connection. We only
246 // want a DataChannel, so create it and then create an offer to send.
247 onWebSocketOpen(): void {
248 this.rtcPeerConnection = new RTCPeerConnection({});
Alex Perry5f474f22020-02-01 12:14:24 -0800249 this.rtcPeerConnection.addEventListener(
Alex Perry22824d72020-02-29 17:11:43 -0800250 'datachannel', (e) => this.onDataChannel(e));
Alex Perry5f474f22020-02-01 12:14:24 -0800251 this.dataChannel = this.rtcPeerConnection.createDataChannel('signalling');
James Kuszmaul1ec74432020-07-30 20:26:45 -0700252 this.handlers.add(
253 new Handler((data) => this.onConfigMessage(data), this.dataChannel));
Philipp Schrader47445a02020-11-14 17:31:04 -0800254 // TODO(james): Is this used? Can we delete it?
255 // window.dc = this.dataChannel;
Alex Perryb3b50792020-01-18 16:13:45 -0800256 this.rtcPeerConnection.addEventListener(
257 'icecandidate', (e) => this.onIceCandidate(e));
258 this.rtcPeerConnection.createOffer().then(
259 (offer) => this.onOfferCreated(offer));
260 }
261
262 // When we receive a websocket message, we need to determine what type it is
263 // and handle appropriately. Either by setting the remote description or
264 // adding the remote ice candidate.
265 onWebSocketMessage(e: MessageEvent): void {
Alex Perryb3b50792020-01-18 16:13:45 -0800266 const buffer = new Uint8Array(e.data)
Philipp Schradere625ba22020-11-16 20:11:37 -0800267 const fbBuffer = new ByteBuffer(buffer);
268 const message = WebSocketMessage.getRootAsWebSocketMessage(
269 fbBuffer as unknown as flatbuffers.ByteBuffer);
Alex Perryb3b50792020-01-18 16:13:45 -0800270 switch (message.payloadType()) {
Philipp Schradere625ba22020-11-16 20:11:37 -0800271 case Payload.WebSocketSdp:
272 const sdpFb = message.payload(new WebSocketSdp());
273 if (sdpFb.type() !== SdpType.ANSWER) {
Alex Perryb3b50792020-01-18 16:13:45 -0800274 console.log('got something other than an answer back');
275 break;
276 }
277 this.rtcPeerConnection.setRemoteDescription(new RTCSessionDescription(
278 {'type': 'answer', 'sdp': sdpFb.payload()}));
279 break;
Philipp Schradere625ba22020-11-16 20:11:37 -0800280 case Payload.WebSocketIce:
281 const iceFb = message.payload(new WebSocketIce());
Alex Perryb3b50792020-01-18 16:13:45 -0800282 const candidate = {} as RTCIceCandidateInit;
283 candidate.candidate = iceFb.candidate();
284 candidate.sdpMid = iceFb.sdpMid();
285 candidate.sdpMLineIndex = iceFb.sdpMLineIndex();
286 this.rtcPeerConnection.addIceCandidate(candidate);
287 break;
288 default:
289 console.log('got an unknown message');
290 break;
291 }
292 }
Alex Perry6249aaf2020-02-29 14:51:49 -0800293
294 /**
Alex Perryb49a3fb2020-02-29 15:26:54 -0800295 * Subscribes to messages. Only the most recent connect message is in use. Any
296 * channels not specified in the message are implicitely unsubscribed.
Alex Perry6249aaf2020-02-29 14:51:49 -0800297 * @param a Finished flatbuffer.Builder containing a Connect message to send.
298 */
299 sendConnectMessage(builder: any) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800300 const array = builder.asUint8Array();
Alex Perry6249aaf2020-02-29 14:51:49 -0800301 this.dataChannel.send(array.buffer.slice(array.byteOffset));
302 }
Alex Perryb3b50792020-01-18 16:13:45 -0800303}