blob: 02573b3c94f77e65908527d4630314859d796f5b [file] [log] [blame]
Philipp Schradere625ba22020-11-16 20:11:37 -08001import * as configuration from 'org_frc971/aos/configuration_generated';
2import * as web_proxy from 'org_frc971/aos/network/web_proxy_generated';
3import {Builder} from 'org_frc971/external/com_github_google_flatbuffers/ts/builder';
4import {ByteBuffer} from 'org_frc971/external/com_github_google_flatbuffers/ts/byte-buffer';
5
James Kuszmaul527038a2020-12-21 23:40:44 -08006import ChannelFb = configuration.aos.Channel;
Philipp Schradere625ba22020-11-16 20:11:37 -08007import Configuration = configuration.aos.Configuration;
James Kuszmaul5f5e1232020-12-22 20:58:00 -08008import Schema = configuration.reflection.Schema;
Philipp Schradere625ba22020-11-16 20:11:37 -08009import MessageHeader = web_proxy.aos.web_proxy.MessageHeader;
10import WebSocketIce = web_proxy.aos.web_proxy.WebSocketIce;
11import WebSocketMessage = web_proxy.aos.web_proxy.WebSocketMessage;
12import Payload = web_proxy.aos.web_proxy.Payload;
13import WebSocketSdp = web_proxy.aos.web_proxy.WebSocketSdp;
14import SdpType = web_proxy.aos.web_proxy.SdpType;
James Kuszmaul527038a2020-12-21 23:40:44 -080015import SubscriberRequest = web_proxy.aos.web_proxy.SubscriberRequest;
16import ChannelRequestFb = web_proxy.aos.web_proxy.ChannelRequest;
17import TransferMethod = web_proxy.aos.web_proxy.TransferMethod;
Alex Perry5f474f22020-02-01 12:14:24 -080018
19// There is one handler for each DataChannel, it maintains the state of
20// multi-part messages and delegates to a callback when the message is fully
21// assembled.
22export class Handler {
23 private dataBuffer: Uint8Array|null = null;
24 private receivedMessageLength: number = 0;
25 constructor(
James Kuszmaul48413bf2020-09-01 19:19:05 -070026 private readonly handlerFunc:
27 (data: Uint8Array, sentTime: number) => void,
Philipp Schrader47445a02020-11-14 17:31:04 -080028 private readonly channel: RTCDataChannel) {
Alex Perry5f474f22020-02-01 12:14:24 -080029 channel.addEventListener('message', (e) => this.handleMessage(e));
30 }
31
32 handleMessage(e: MessageEvent): void {
Philipp Schradere625ba22020-11-16 20:11:37 -080033 const fbBuffer = new ByteBuffer(new Uint8Array(e.data));
34 const messageHeader = MessageHeader.getRootAsMessageHeader(
35 fbBuffer as unknown as flatbuffers.ByteBuffer);
James Kuszmaul48413bf2020-09-01 19:19:05 -070036 const time = messageHeader.monotonicSentTime().toFloat64() * 1e-9;
Alex Perry5f474f22020-02-01 12:14:24 -080037 // Short circuit if only one packet
Alex Perry22824d72020-02-29 17:11:43 -080038 if (messageHeader.packetCount() === 1) {
James Kuszmaul48413bf2020-09-01 19:19:05 -070039 this.handlerFunc(messageHeader.dataArray(), time);
Alex Perry5f474f22020-02-01 12:14:24 -080040 return;
41 }
42
43 if (messageHeader.packetIndex() === 0) {
44 this.dataBuffer = new Uint8Array(messageHeader.length());
Alex Perry22824d72020-02-29 17:11:43 -080045 this.receivedMessageLength = 0;
46 }
47 if (!messageHeader.dataLength()) {
48 return;
Alex Perry5f474f22020-02-01 12:14:24 -080049 }
50 this.dataBuffer.set(
51 messageHeader.dataArray(),
52 this.receivedMessageLength);
53 this.receivedMessageLength += messageHeader.dataLength();
54
55 if (messageHeader.packetIndex() === messageHeader.packetCount() - 1) {
James Kuszmaul48413bf2020-09-01 19:19:05 -070056 this.handlerFunc(this.dataBuffer, time);
Alex Perry5f474f22020-02-01 12:14:24 -080057 }
58 }
59}
Alex Perryb3b50792020-01-18 16:13:45 -080060
James Kuszmaul527038a2020-12-21 23:40:44 -080061class Channel {
62 constructor(public readonly name: string, public readonly type: string) {}
63 key(): string {
64 return this.name + "/" + this.type;
65 }
66}
67
68class ChannelRequest {
69 constructor(
70 public readonly channel: Channel,
71 public readonly transferMethod: TransferMethod) {}
72}
73
Alex Perryb3b50792020-01-18 16:13:45 -080074// Analogous to the Connection class in //aos/network/web_proxy.h. Because most
75// of the apis are native in JS, it is much simpler.
76export class Connection {
77 private webSocketConnection: WebSocket|null = null;
78 private rtcPeerConnection: RTCPeerConnection|null = null;
Philipp Schrader47445a02020-11-14 17:31:04 -080079 private dataChannel: RTCDataChannel|null = null;
Alex Perryb3b50792020-01-18 16:13:45 -080080 private webSocketUrl: string;
Alex Perry6249aaf2020-02-29 14:51:49 -080081
Philipp Schradere625ba22020-11-16 20:11:37 -080082 private configInternal: Configuration|null = null;
Alex Perry6249aaf2020-02-29 14:51:49 -080083 // A set of functions that accept the config to handle.
Philipp Schradere625ba22020-11-16 20:11:37 -080084 private readonly configHandlers = new Set<(config: Configuration) => void>();
Alex Perry6249aaf2020-02-29 14:51:49 -080085
James Kuszmaul48413bf2020-09-01 19:19:05 -070086 private readonly handlerFuncs =
James Kuszmaul5f5e1232020-12-22 20:58:00 -080087 new Map<string, ((data: Uint8Array, sentTime: number) => void)[]>();
Alex Perry5f474f22020-02-01 12:14:24 -080088 private readonly handlers = new Set<Handler>();
Alex Perryb3b50792020-01-18 16:13:45 -080089
James Kuszmaul527038a2020-12-21 23:40:44 -080090 private subscribedChannels: ChannelRequest[] = [];
91
Alex Perryb3b50792020-01-18 16:13:45 -080092 constructor() {
93 const server = location.host;
94 this.webSocketUrl = `ws://${server}/ws`;
95 }
96
Philipp Schradere625ba22020-11-16 20:11:37 -080097 addConfigHandler(handler: (config: Configuration) => void): void {
Alex Perry6249aaf2020-02-29 14:51:49 -080098 this.configHandlers.add(handler);
99 }
100
Alex Perryb49a3fb2020-02-29 15:26:54 -0800101 /**
James Kuszmaul527038a2020-12-21 23:40:44 -0800102 * Add a handler for a specific message type, with reliable delivery of all
103 * messages.
Alex Perryb49a3fb2020-02-29 15:26:54 -0800104 */
James Kuszmaul527038a2020-12-21 23:40:44 -0800105 addReliableHandler(
106 name: string, type: string,
107 handler: (data: Uint8Array, sentTime: number) => void): void {
108 this.addHandlerImpl(
109 name, type, TransferMethod.EVERYTHING_WITH_HISTORY, handler);
110 }
111
112 /**
113 * Add a handler for a specific message type.
114 */
115 addHandler(
116 name: string, type: string,
117 handler: (data: Uint8Array, sentTime: number) => void): void {
118 this.addHandlerImpl(name, type, TransferMethod.SUBSAMPLE, handler);
119 }
120
121 addHandlerImpl(
122 name: string, type: string, method: TransferMethod,
123 handler: (data: Uint8Array, sentTime: number) => void): void {
124 const channel = new Channel(name, type);
125 const request = new ChannelRequest(channel, method);
James Kuszmaul5f5e1232020-12-22 20:58:00 -0800126 if (!this.handlerFuncs.has(channel.key())) {
127 this.handlerFuncs.set(channel.key(), []);
James Kuszmaulc4ae11c2020-12-26 16:26:58 -0800128 } else {
129 if (method == TransferMethod.EVERYTHING_WITH_HISTORY) {
130 console.warn(
131 'Behavior of multiple reliable handlers is currently poorly ' +
132 'defined and may not actually deliver all of the messages.');
133 }
James Kuszmaul5f5e1232020-12-22 20:58:00 -0800134 }
135 this.handlerFuncs.get(channel.key()).push(handler);
James Kuszmaul527038a2020-12-21 23:40:44 -0800136 this.subscribeToChannel(request);
137 }
138
James Kuszmaul5f5e1232020-12-22 20:58:00 -0800139 getSchema(typeName: string): Schema {
140 let schema = null;
141 const config = this.getConfig();
142 for (let ii = 0; ii < config.channelsLength(); ++ii) {
143 if (config.channels(ii).type() === typeName) {
144 schema = config.channels(ii).schema();
145 }
146 }
147 if (schema === null) {
148 throw new Error('Unable to find schema for ' + typeName);
149 }
150 return schema;
151 }
152
James Kuszmaul527038a2020-12-21 23:40:44 -0800153 subscribeToChannel(channel: ChannelRequest): void {
154 this.subscribedChannels.push(channel);
155 if (this.configInternal === null) {
156 throw new Error(
157 'Must call subscribeToChannel after we\'ve received the config.');
158 }
159 const builder = new Builder(512) as unknown as flatbuffers.Builder;
160 const channels: flatbuffers.Offset[] = [];
161 for (const channel of this.subscribedChannels) {
162 const nameFb = builder.createString(channel.channel.name);
163 const typeFb = builder.createString(channel.channel.type);
164 ChannelFb.startChannel(builder);
165 ChannelFb.addName(builder, nameFb);
166 ChannelFb.addType(builder, typeFb);
167 const channelFb = ChannelFb.endChannel(builder);
168 ChannelRequestFb.startChannelRequest(builder);
169 ChannelRequestFb.addChannel(builder, channelFb);
170 ChannelRequestFb.addMethod(builder, channel.transferMethod);
171 channels.push(ChannelRequestFb.endChannelRequest(builder));
172 }
173
174 const channelsFb =
175 SubscriberRequest.createChannelsToTransferVector(builder, channels);
176 SubscriberRequest.startSubscriberRequest(builder);
177 SubscriberRequest.addChannelsToTransfer(builder, channelsFb);
178 const connect = SubscriberRequest.endSubscriberRequest(builder);
179 builder.finish(connect);
180 this.sendConnectMessage(builder);
Alex Perry5f474f22020-02-01 12:14:24 -0800181 }
182
Alex Perryb3b50792020-01-18 16:13:45 -0800183 connect(): void {
184 this.webSocketConnection = new WebSocket(this.webSocketUrl);
185 this.webSocketConnection.binaryType = 'arraybuffer';
186 this.webSocketConnection.addEventListener(
187 'open', () => this.onWebSocketOpen());
188 this.webSocketConnection.addEventListener(
189 'message', (e) => this.onWebSocketMessage(e));
190 }
191
Alex Perry3dfcb812020-03-04 19:32:17 -0800192 getConfig() {
Philipp Schrader47445a02020-11-14 17:31:04 -0800193 return this.configInternal;
Alex Perry6249aaf2020-02-29 14:51:49 -0800194 }
195
Alex Perry5f474f22020-02-01 12:14:24 -0800196 // Handle messages on the DataChannel. Handles the Configuration message as
197 // all other messages are sent on specific DataChannels.
James Kuszmaul1ec74432020-07-30 20:26:45 -0700198 onConfigMessage(data: Uint8Array): void {
Philipp Schradere625ba22020-11-16 20:11:37 -0800199 const fbBuffer = new ByteBuffer(data);
200 this.configInternal = Configuration.getRootAsConfiguration(
201 fbBuffer as unknown as flatbuffers.ByteBuffer);
Alex Perry3dfcb812020-03-04 19:32:17 -0800202 for (const handler of Array.from(this.configHandlers)) {
Alex Perry6249aaf2020-02-29 14:51:49 -0800203 handler(this.configInternal);
Alex Perry5f474f22020-02-01 12:14:24 -0800204 }
205 }
206
207 onDataChannel(ev: RTCDataChannelEvent): void {
208 const channel = ev.channel;
209 const name = channel.label;
James Kuszmaul5f5e1232020-12-22 20:58:00 -0800210 const handlers = this.handlerFuncs.get(name);
211 for (const handler of handlers) {
212 this.handlers.add(new Handler(handler, channel));
213 }
Alex Perryb3b50792020-01-18 16:13:45 -0800214 }
215
216 onIceCandidate(e: RTCPeerConnectionIceEvent): void {
Alex Perryb3b50792020-01-18 16:13:45 -0800217 if (!e.candidate) {
218 return;
219 }
220 const candidate = e.candidate;
Philipp Schradere625ba22020-11-16 20:11:37 -0800221 const builder = new Builder(512);
Alex Perryb3b50792020-01-18 16:13:45 -0800222 const candidateString = builder.createString(candidate.candidate);
223 const sdpMidString = builder.createString(candidate.sdpMid);
224
Philipp Schradere625ba22020-11-16 20:11:37 -0800225 const iceFb = WebSocketIce.createWebSocketIce(
226 builder as unknown as flatbuffers.Builder, candidateString,
227 sdpMidString, candidate.sdpMLineIndex);
228 const messageFb = WebSocketMessage.createWebSocketMessage(
229 builder as unknown as flatbuffers.Builder, Payload.WebSocketIce, iceFb);
Alex Perryb3b50792020-01-18 16:13:45 -0800230 builder.finish(messageFb);
231 const array = builder.asUint8Array();
232 this.webSocketConnection.send(array.buffer.slice(array.byteOffset));
233 }
234
235 // Called for new SDPs. Make sure to set it locally and remotely.
Philipp Schrader47445a02020-11-14 17:31:04 -0800236 onOfferCreated(description: RTCSessionDescriptionInit): void {
Alex Perryb3b50792020-01-18 16:13:45 -0800237 this.rtcPeerConnection.setLocalDescription(description);
Philipp Schradere625ba22020-11-16 20:11:37 -0800238 const builder = new Builder(512);
Alex Perryb3b50792020-01-18 16:13:45 -0800239 const offerString = builder.createString(description.sdp);
240
Philipp Schradere625ba22020-11-16 20:11:37 -0800241 const webSocketSdp = WebSocketSdp.createWebSocketSdp(
242 builder as unknown as flatbuffers.Builder, SdpType.OFFER, offerString);
243 const message = WebSocketMessage.createWebSocketMessage(
244 builder as unknown as flatbuffers.Builder, Payload.WebSocketSdp,
245 webSocketSdp);
Alex Perryb3b50792020-01-18 16:13:45 -0800246 builder.finish(message);
247 const array = builder.asUint8Array();
248 this.webSocketConnection.send(array.buffer.slice(array.byteOffset));
249 }
250
251 // We now have a websocket, so start setting up the peer connection. We only
252 // want a DataChannel, so create it and then create an offer to send.
253 onWebSocketOpen(): void {
254 this.rtcPeerConnection = new RTCPeerConnection({});
Alex Perry5f474f22020-02-01 12:14:24 -0800255 this.rtcPeerConnection.addEventListener(
Alex Perry22824d72020-02-29 17:11:43 -0800256 'datachannel', (e) => this.onDataChannel(e));
Alex Perry5f474f22020-02-01 12:14:24 -0800257 this.dataChannel = this.rtcPeerConnection.createDataChannel('signalling');
James Kuszmaul1ec74432020-07-30 20:26:45 -0700258 this.handlers.add(
259 new Handler((data) => this.onConfigMessage(data), this.dataChannel));
Philipp Schrader47445a02020-11-14 17:31:04 -0800260 // TODO(james): Is this used? Can we delete it?
261 // window.dc = this.dataChannel;
Alex Perryb3b50792020-01-18 16:13:45 -0800262 this.rtcPeerConnection.addEventListener(
263 'icecandidate', (e) => this.onIceCandidate(e));
264 this.rtcPeerConnection.createOffer().then(
265 (offer) => this.onOfferCreated(offer));
266 }
267
268 // When we receive a websocket message, we need to determine what type it is
269 // and handle appropriately. Either by setting the remote description or
270 // adding the remote ice candidate.
271 onWebSocketMessage(e: MessageEvent): void {
Alex Perryb3b50792020-01-18 16:13:45 -0800272 const buffer = new Uint8Array(e.data)
Philipp Schradere625ba22020-11-16 20:11:37 -0800273 const fbBuffer = new ByteBuffer(buffer);
274 const message = WebSocketMessage.getRootAsWebSocketMessage(
275 fbBuffer as unknown as flatbuffers.ByteBuffer);
Alex Perryb3b50792020-01-18 16:13:45 -0800276 switch (message.payloadType()) {
Philipp Schradere625ba22020-11-16 20:11:37 -0800277 case Payload.WebSocketSdp:
278 const sdpFb = message.payload(new WebSocketSdp());
279 if (sdpFb.type() !== SdpType.ANSWER) {
Alex Perryb3b50792020-01-18 16:13:45 -0800280 console.log('got something other than an answer back');
281 break;
282 }
283 this.rtcPeerConnection.setRemoteDescription(new RTCSessionDescription(
284 {'type': 'answer', 'sdp': sdpFb.payload()}));
285 break;
Philipp Schradere625ba22020-11-16 20:11:37 -0800286 case Payload.WebSocketIce:
287 const iceFb = message.payload(new WebSocketIce());
Alex Perryb3b50792020-01-18 16:13:45 -0800288 const candidate = {} as RTCIceCandidateInit;
289 candidate.candidate = iceFb.candidate();
290 candidate.sdpMid = iceFb.sdpMid();
291 candidate.sdpMLineIndex = iceFb.sdpMLineIndex();
292 this.rtcPeerConnection.addIceCandidate(candidate);
293 break;
294 default:
295 console.log('got an unknown message');
296 break;
297 }
298 }
Alex Perry6249aaf2020-02-29 14:51:49 -0800299
300 /**
Alex Perryb49a3fb2020-02-29 15:26:54 -0800301 * Subscribes to messages. Only the most recent connect message is in use. Any
302 * channels not specified in the message are implicitely unsubscribed.
Alex Perry6249aaf2020-02-29 14:51:49 -0800303 * @param a Finished flatbuffer.Builder containing a Connect message to send.
304 */
305 sendConnectMessage(builder: any) {
Alex Perry3dfcb812020-03-04 19:32:17 -0800306 const array = builder.asUint8Array();
Alex Perry6249aaf2020-02-29 14:51:49 -0800307 this.dataChannel.send(array.buffer.slice(array.byteOffset));
308 }
Alex Perryb3b50792020-01-18 16:13:45 -0800309}