Handle subscription of messages in the webapp.
Messages larger than a threshold are split and reassembled due to the
size limit in webrtc. Threshold may have to be adjusted somewhere between
64KiB and 256KiB.
This also includes a basic handler for a ping message and a more
advanced image handler.
Change-Id: If66acfb1bb84e9d3ff686994a94b1480cb70b2aa
diff --git a/aos/events/BUILD b/aos/events/BUILD
index 0470b6e..1f7a3d7 100644
--- a/aos/events/BUILD
+++ b/aos/events/BUILD
@@ -1,4 +1,4 @@
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
load("//aos:config.bzl", "aos_config")
package(default_visibility = ["//visibility:public"])
@@ -24,6 +24,11 @@
gen_reflections = 1,
)
+flatbuffer_ts_library(
+ name = "ping_ts_fbs",
+ srcs = ["ping.fbs"],
+)
+
flatbuffer_cc_library(
name = "pong_fbs",
srcs = ["pong.fbs"],
diff --git a/aos/network/BUILD b/aos/network/BUILD
index 8f049e3..a334a47 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -240,12 +240,33 @@
flatbuffer_cc_library(
name = "web_proxy_fbs",
srcs = ["web_proxy.fbs"],
+ includes = [
+ ":connect_fbs_includes",
+ "//aos:configuration_fbs_includes",
+ ],
gen_reflections = True,
)
flatbuffer_ts_library(
name = "web_proxy_ts_fbs",
srcs = ["web_proxy.fbs"],
+ includes = [
+ ":connect_fbs_includes",
+ "//aos:configuration_fbs_includes",
+ ],
+)
+
+cc_library(
+ name = "web_proxy_utils",
+ hdrs = ["web_proxy_utils.h"],
+ srcs = ["web_proxy_utils.cc"],
+ deps = [
+ ":connect_fbs",
+ ":web_proxy_fbs",
+ "//aos:configuration_fbs",
+ "//aos/events:event_loop",
+ "@com_github_google_flatbuffers//:flatbuffers",
+ ],
)
cc_library(
@@ -257,7 +278,10 @@
"-Wno-unused-parameter",
],
deps = [
+ ":connect_fbs",
":web_proxy_fbs",
+ ":web_proxy_utils",
+ "//aos/events:shm_event_loop",
"//aos/seasocks:seasocks_logger",
"//third_party/seasocks",
"//third_party:webrtc",
@@ -277,8 +301,9 @@
name = "web_proxy_main",
srcs = ["web_proxy_main.cc"],
deps = [
- ":web_proxy",
":gen_embedded",
+ ":web_proxy",
+ "//aos/events:shm_event_loop",
"//aos:init",
"//aos/seasocks:seasocks_logger",
"//third_party/seasocks",
@@ -290,8 +315,9 @@
],
data = [
"//aos/network/www:files",
- "//aos/network/www:proxy_bundle",
+ "//aos/network/www:main_bundle",
"//aos/network/www:flatbuffers",
- "@com_github_google_flatbuffers//:flatjs"
+ "@com_github_google_flatbuffers//:flatjs",
+ "//aos/events:pingpong_config.json",
],
)
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index c6d7336..77371eb 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -1,5 +1,9 @@
#include "aos/network/web_proxy.h"
+
+#include "aos/flatbuffer_merge.h"
+#include "aos/network/connect_generated.h"
#include "aos/network/web_proxy_generated.h"
+#include "aos/network/web_proxy_utils.h"
#include "api/create_peerconnection_factory.h"
#include "glog/logging.h"
@@ -22,12 +26,15 @@
} // namespace
-WebsocketHandler::WebsocketHandler(::seasocks::Server *server)
- : server_(server) {}
+WebsocketHandler::WebsocketHandler(
+ ::seasocks::Server *server,
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
+ : server_(server), subscribers_(subscribers), config_(config) {}
void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
std::unique_ptr<Connection> conn =
- std::make_unique<Connection>(sock, server_);
+ std::make_unique<Connection>(sock, server_, subscribers_, config_);
connections_.insert({sock, std::move(conn)});
}
@@ -40,8 +47,42 @@
connections_.erase(sock);
}
-Connection::Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server)
- : sock_(sock), server_(server) {}
+void Subscriber::RunIteration() {
+ if (channels_.empty()) {
+ return;
+ }
+
+ fetcher_->Fetch();
+ for (int packet_index = 0; packet_index < GetPacketCount(fetcher_->context());
+ ++packet_index) {
+ flatbuffers::Offset<MessageHeader> message =
+ PackMessage(&fbb_, fetcher_->context(), channel_index_, packet_index);
+ fbb_.Finish(message);
+
+ const flatbuffers::DetachedBuffer buffer = fbb_.Release();
+
+ webrtc::DataBuffer data_buffer(
+ rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
+ true /* binary array */);
+ for (auto conn : channels_) {
+ conn->Send(data_buffer);
+ }
+ }
+}
+
+bool Subscriber::Compare(const Channel *channel) const {
+ return channel->name() == fetcher_->channel()->name() &&
+ channel->type() == fetcher_->channel()->type();
+}
+
+Connection::Connection(
+ ::seasocks::WebSocket *sock, ::seasocks::Server *server,
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config)
+ : sock_(sock),
+ server_(server),
+ subscribers_(subscribers),
+ config_(config) {}
// Function called for web socket data. Parses the flatbuffer and handles it
// appropriately.
@@ -111,6 +152,13 @@
}
}
+void Connection::Send(const ::flatbuffers::DetachedBuffer &buffer) const {
+ webrtc::DataBuffer data_buffer(
+ rtc::CopyOnWriteBuffer(buffer.data(), buffer.size()),
+ true /* binary array */);
+ data_channel_->Send(data_buffer);
+}
+
void Connection::OnDataChannel(
rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
data_channel_ = channel;
@@ -150,13 +198,45 @@
server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
}
-// Receive and respond to a DataChannel message. Temporarily acting as a
-// "PONG", but will change to handle "Connect" subscription messages.
+// Wait until the data channel is ready for data before sending the config.
+void Connection::OnStateChange() {
+ if (peer_connection_.get() != nullptr &&
+ data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
+ Send(config_.buffer());
+ }
+}
+
+// Handle DataChannel messages. Subscribe to each listener that matches the
+// subscribe message
void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
- // This is technically disallowed by webrtc, But doesn't seem to cause major
- // problems. At least for the small data tested manually. Send should be
- // called from outside this call stack.
- data_channel_->Send(buffer);
+ const message_bridge::Connect *message =
+ flatbuffers::GetRoot<message_bridge::Connect>(buffer.data.data());
+ for (auto &subscriber : subscribers_) {
+ bool found_match = false;
+ for (auto channel : *message->channels_to_transfer()) {
+ if (subscriber->Compare(channel)) {
+ int index = subscriber->index();
+ auto it = channels_.find(index);
+ if (it == channels_.end()) {
+ auto pair = channels_.insert(
+ {index, peer_connection_->CreateDataChannel(
+ channel->name()->str() + "/" + channel->type()->str(),
+ nullptr)});
+ it = pair.first;
+ }
+ subscriber->AddListener(it->second);
+
+ VLOG(1) << "Subscribe to: " << channel->type()->str();
+ found_match = true;
+ break;
+ }
+ }
+ if (!found_match) {
+ int index = subscriber->index();
+ auto it = channels_.find(index);
+ subscriber->RemoveListener(it->second);
+ }
+ }
}
} // namespace web_proxy
diff --git a/aos/network/web_proxy.fbs b/aos/network/web_proxy.fbs
index e712622..d5e027e 100644
--- a/aos/network/web_proxy.fbs
+++ b/aos/network/web_proxy.fbs
@@ -1,3 +1,9 @@
+// Typescript namespaces are weird when coming from multiple files. We generate
+// all transitive dependencies into the same file in typescript so we can
+// include all 'aos' flatbuffers we care about here.
+include "aos/configuration.fbs";
+include "aos/network/connect.fbs";
+
namespace aos.web_proxy;
// SDP is Session Description Protocol. We only handle OFFER (starting a
@@ -29,3 +35,26 @@
table WebSocketMessage {
payload:Payload;
}
+
+// WebRTC has size limits on the messages sent on datachannels. This message
+// ensures that parts are recieved in the correct order. If there is any
+// mismatch, all the existing work should be dropped and restart when reasonable
+// data starts again.
+table MessageHeader {
+ // Index of the channel in config
+ channel_index:uint;
+
+ // How many packets will be required for the message being sent.
+ packet_count:uint;
+ // What index into the the total packets for the multipart message, this
+ // header is parts of.
+ packet_index:uint;
+
+ // Total number of bytes in the message
+ length:uint;
+
+ // Index into the sequence of messages. This will not always increase.
+ queue_index:uint;
+
+ data:[ubyte];
+}
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 7c24eae..409e61d 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -2,6 +2,9 @@
#define AOS_NETWORK_WEB_PROXY_H_
#include <map>
#include <set>
+#include "aos/events/shm_event_loop.h"
+#include "aos/network/connect_generated.h"
+#include "aos/network/web_proxy_generated.h"
#include "aos/seasocks/seasocks_logger.h"
#include "flatbuffers/flatbuffers.h"
#include "seasocks/Server.h"
@@ -14,13 +17,17 @@
namespace web_proxy {
class Connection;
+class Subscriber;
// Basic class that handles receiving new websocket connections. Creates a new
// Connection to manage the rest of the negotiation and data passing. When the
// websocket closes, it deletes the Connection.
class WebsocketHandler : public ::seasocks::WebSocket::Handler {
public:
- WebsocketHandler(::seasocks::Server *server);
+ WebsocketHandler(
+ ::seasocks::Server *server,
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config);
void onConnect(::seasocks::WebSocket *sock) override;
void onData(::seasocks::WebSocket *sock, const uint8_t *data,
size_t size) override;
@@ -29,6 +36,8 @@
private:
std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
::seasocks::Server *server_;
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config_;
};
// Seasocks requires that sends happen on the correct thread. This class takes a
@@ -37,7 +46,7 @@
class UpdateData : public ::seasocks::Server::Runnable {
public:
UpdateData(::seasocks::WebSocket *websocket,
- ::flatbuffers::DetachedBuffer &&buffer)
+ flatbuffers::DetachedBuffer &&buffer)
: sock_(websocket), buffer_(std::move(buffer)) {}
~UpdateData() override = default;
UpdateData(const UpdateData &) = delete;
@@ -47,7 +56,40 @@
private:
::seasocks::WebSocket *sock_;
- const ::flatbuffers::DetachedBuffer buffer_;
+ const flatbuffers::DetachedBuffer buffer_;
+};
+
+// Represents a fetcher and all the Connections that care about it.
+// Handles building the message and telling each connection to send it.
+// indexed by location of the channel it handles in the config.
+class Subscriber {
+ public:
+ Subscriber(std::unique_ptr<RawFetcher> fetcher, int channel_index)
+ : fbb_(1024),
+ fetcher_(std::move(fetcher)),
+ channel_index_(channel_index) {}
+
+ void RunIteration();
+
+ void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
+ channels_.insert(channel);
+ }
+
+ void RemoveListener(
+ rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
+ channels_.erase(channel);
+ }
+
+ // Check if the Channel passed matches the channel this fetchs.
+ bool Compare(const Channel *channel) const;
+
+ int index() const { return channel_index_; }
+
+ private:
+ flatbuffers::FlatBufferBuilder fbb_;
+ std::unique_ptr<RawFetcher> fetcher_;
+ int channel_index_;
+ std::set<rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
};
// Represents a single connection to a browser for the entire lifetime of the
@@ -56,10 +98,20 @@
public webrtc::CreateSessionDescriptionObserver,
public webrtc::DataChannelObserver {
public:
- Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server);
+ Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server,
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers,
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config);
+
+ ~Connection() {
+ // DataChannel may call OnStateChange after this is destroyed, so make sure
+ // it doesn't.
+ data_channel_->UnregisterObserver();
+ }
void HandleWebSocketData(const uint8_t *data, size_t size);
+ void Send(const flatbuffers::DetachedBuffer &buffer) const;
+
// PeerConnectionObserver implementation
void OnSignalingChange(
webrtc::PeerConnectionInterface::SignalingState) override {}
@@ -88,13 +140,17 @@
}
// DataChannelObserver implementation
- void OnStateChange() override {}
+ void OnStateChange() override;
void OnMessage(const webrtc::DataBuffer &buffer) override;
void OnBufferedAmountChange(uint64_t sent_data_size) override {}
private:
::seasocks::WebSocket *sock_;
::seasocks::Server *server_;
+ const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config_;
+ std::map<int, rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
+
rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel_;
};
diff --git a/aos/network/web_proxy_main.cc b/aos/network/web_proxy_main.cc
index 37b3d1e..21b6f9c 100644
--- a/aos/network/web_proxy_main.cc
+++ b/aos/network/web_proxy_main.cc
@@ -1,23 +1,65 @@
+#include "aos/events/shm_event_loop.h"
#include "aos/init.h"
-#include "aos/seasocks/seasocks_logger.h"
#include "aos/network/web_proxy.h"
+#include "aos/seasocks/seasocks_logger.h"
+#include "gflags/gflags.h"
#include "internal/Embedded.h"
#include "seasocks/Server.h"
#include "seasocks/WebSocket.h"
-int main() {
+DEFINE_string(config, "./config.json", "File path of aos configuration");
+DEFINE_string(data_dir, "www", "Directory to serve data files from");
+
+void RunDataThread(
+ std::vector<std::unique_ptr<aos::web_proxy::Subscriber>> *subscribers,
+ const aos::FlatbufferDetachedBuffer<aos::Configuration> &config) {
+ aos::ShmEventLoop event_loop(&config.message());
+
+ // TODO(alex): skip fetchers on the wrong node.
+ for (uint i = 0; i < config.message().channels()->size(); ++i) {
+ auto channel = config.message().channels()->Get(i);
+ auto fetcher = event_loop.MakeRawFetcher(channel);
+ subscribers->emplace_back(
+ std::make_unique<aos::web_proxy::Subscriber>(std::move(fetcher), i));
+ }
+
+ flatbuffers::FlatBufferBuilder fbb(1024);
+
+ auto timer = event_loop.AddTimer([&]() {
+ for (auto &subscriber : *subscribers) {
+ subscriber->RunIteration();
+ }
+ });
+
+ event_loop.OnRun([&]() {
+ timer->Setup(event_loop.monotonic_now(), std::chrono::milliseconds(100));
+ });
+
+ event_loop.Run();
+}
+
+int main(int argc, char **argv) {
// Make sure to reference this to force the linker to include it.
+ aos::InitGoogle(&argc, &argv);
findEmbeddedContent("");
aos::InitNRT();
- seasocks::Server server(::std::shared_ptr<seasocks::Logger>(
- new ::aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info)));
+ aos::FlatbufferDetachedBuffer<aos::Configuration> config =
+ aos::configuration::ReadConfig(FLAGS_config);
- auto websocket_handler =
- std::make_shared<aos::web_proxy::WebsocketHandler>(&server);
+ std::vector<std::unique_ptr<aos::web_proxy::Subscriber>> subscribers;
+
+ std::thread data_thread{
+ [&subscribers, &config]() { RunDataThread(&subscribers, config); }};
+
+ seasocks::Server server(std::shared_ptr<seasocks::Logger>(
+ new aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info)));
+
+ auto websocket_handler = std::make_shared<aos::web_proxy::WebsocketHandler>(
+ &server, subscribers, config);
server.addWebSocketHandler("/ws", websocket_handler);
- server.serve("aos/network/www", 8080);
+ server.serve(FLAGS_data_dir.c_str(), 8080);
}
diff --git a/aos/network/web_proxy_utils.cc b/aos/network/web_proxy_utils.cc
new file mode 100644
index 0000000..dbcc2e6
--- /dev/null
+++ b/aos/network/web_proxy_utils.cc
@@ -0,0 +1,43 @@
+#include "aos/network/web_proxy_utils.h"
+
+namespace aos {
+namespace web_proxy {
+
+// Recommended max size is 64KiB for compatibility reasons. 256KiB theoretically
+// works on chrome but seemed to have some consistency issues. Picked a size in
+// the middle which seems to work.
+constexpr size_t kPacketSize = 125000;
+
+int GetPacketCount(const Context &context) {
+ return context.size / kPacketSize + 1;
+}
+
+flatbuffers::Offset<MessageHeader> PackMessage(
+ flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+ int channel_index, int packet_index) {
+ int packet_count = GetPacketCount(context);
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
+ if (kPacketSize * (packet_index + 1) < context.size) {
+ data_offset = fbb->CreateVector(
+ static_cast<uint8_t *>(context.data) + kPacketSize * packet_index,
+ kPacketSize);
+ } else {
+ int prefix_size = kPacketSize * packet_index;
+ data_offset =
+ fbb->CreateVector(static_cast<uint8_t *>(context.data) + prefix_size,
+ context.size - prefix_size);
+ }
+
+ MessageHeader::Builder message_header_builder(*fbb);
+ message_header_builder.add_channel_index(channel_index);
+ message_header_builder.add_queue_index(context.queue_index);
+ message_header_builder.add_packet_count(packet_count);
+ message_header_builder.add_packet_index(packet_index);
+ message_header_builder.add_data(data_offset);
+ message_header_builder.add_length(context.size);
+
+ return message_header_builder.Finish();
+}
+
+} // namespace web_proxy
+} // namespace aos
diff --git a/aos/network/web_proxy_utils.h b/aos/network/web_proxy_utils.h
new file mode 100644
index 0000000..0672ddc
--- /dev/null
+++ b/aos/network/web_proxy_utils.h
@@ -0,0 +1,20 @@
+#include "aos/network/web_proxy_generated.h"
+#include "aos/events/event_loop.h"
+
+namespace aos {
+namespace web_proxy {
+
+int GetPacketCount(const Context &context);
+
+/*
+ * Packs a message embedded in context into a MessageHeader on fbb. Handles
+ * multipart messages by use of the packet_index.
+ * TODO(alex): make this an iterator that returns each packet sequentially
+ */
+flatbuffers::Offset<MessageHeader> PackMessage(
+ flatbuffers::FlatBufferBuilder *fbb, const Context &context,
+ int channel_index, int packet_index);
+
+
+} // namespace web_proxy
+} // namespace aos
diff --git a/aos/network/www/BUILD b/aos/network/www/BUILD
index 5faae12..76e8ef4 100644
--- a/aos/network/www/BUILD
+++ b/aos/network/www/BUILD
@@ -5,27 +5,42 @@
name = "files",
srcs = glob([
"**/*.html",
+ "**/*.css",
]),
visibility=["//visibility:public"],
)
ts_library(
name = "proxy",
- srcs = glob([
- "*.ts",
- ]),
+ srcs = [
+ "config_handler.ts",
+ "proxy.ts",
+ ],
deps = [
"//aos/network:web_proxy_ts_fbs",
],
+ visibility=["//visibility:public"],
+)
+
+ts_library(
+ name = "main",
+ srcs = [
+ "main.ts",
+ "ping_handler.ts",
+ ],
+ deps = [
+ ":proxy",
+ "//aos/events:ping_ts_fbs",
+ ],
)
rollup_bundle(
- name = "proxy_bundle",
+ name = "main_bundle",
entry_point = "aos/network/www/main",
deps = [
- "proxy",
+ "main",
],
- visibility=["//visibility:public"],
+ visibility=["//aos:__subpackages__"],
)
genrule(
@@ -37,5 +52,5 @@
"flatbuffers.js",
],
cmd = "cp $(location @com_github_google_flatbuffers//:flatjs) $@",
- visibility=["//visibility:public"],
+ visibility=["//aos:__subpackages__"],
)
diff --git a/aos/network/www/config_handler.ts b/aos/network/www/config_handler.ts
new file mode 100644
index 0000000..0af78b6
--- /dev/null
+++ b/aos/network/www/config_handler.ts
@@ -0,0 +1,67 @@
+import {aos} from 'aos/network/web_proxy_generated';
+
+export class ConfigHandler {
+ private readonly root_div = document.getElementById('config');
+
+ constructor(
+ private readonly config: aos.Configuration,
+ private readonly dataChannel: RTCDataChannel) {}
+
+ printConfig() {
+ for (const i = 0; i < this.config.channelsLength(); i++) {
+ const channel_div = document.createElement('div');
+ channel_div.classList.add('channel');
+ this.root_div.appendChild(channel_div);
+
+ const input_el = document.createElement('input');
+ input_el.setAttribute('data-index', i);
+ input_el.setAttribute('type', 'checkbox');
+ input_el.addEventListener('click', () => this.handleChange());
+ channel_div.appendChild(input_el);
+
+ const name_div = document.createElement('div');
+ const name_text = document.createTextNode(this.config.channels(i).name());
+ name_div.appendChild(name_text);
+ const type_div = document.createElement('div');
+ const type_text = document.createTextNode(this.config.channels(i).type());
+ type_div.appendChild(type_text);
+ const info_div = document.createElement('div');
+ info_div.appendChild(name_div);
+ info_div.appendChild(type_div);
+
+ channel_div.appendChild(info_div);
+ }
+ }
+
+ handleChange() {
+ const toggles = this.root_div.getElementsByTagName('input');
+ const builder = new flatbuffers.Builder(512);
+
+ const channels: flatbuffers.Offset[] = [];
+ for (const toggle of toggles) {
+ if (!toggle.checked) {
+ continue;
+ }
+ const index = toggle.getAttribute('data-index');
+ const channel = this.config.channels(index);
+ const namefb = builder.createString(channel.name());
+ const typefb = builder.createString(channel.type());
+ aos.Channel.startChannel(builder);
+ aos.Channel.addName(builder, namefb);
+ aos.Channel.addType(builder, typefb);
+ const channelfb = aos.Channel.endChannel(builder);
+ channels.push(channelfb);
+ }
+
+ const channelsfb =
+ aos.message_bridge.Connect.createChannelsToTransferVector(
+ builder, channels);
+ aos.message_bridge.Connect.startConnect(builder);
+ aos.message_bridge.Connect.addChannelsToTransfer(builder, channelsfb);
+ const connect = aos.message_bridge.Connect.endConnect(builder);
+ builder.finish(connect);
+ const array = builder.asUint8Array();
+ console.log('connect', array);
+ this.dataChannel.send(array.buffer.slice(array.byteOffset));
+ }
+}
diff --git a/aos/network/www/index.html b/aos/network/www/index.html
index bc90d40..97e16d4 100644
--- a/aos/network/www/index.html
+++ b/aos/network/www/index.html
@@ -1,6 +1,11 @@
<html>
- <body>
+ <head>
<script src="flatbuffers.js"></script>
- <script src="proxy_bundle.min.js"></script>
+ <script src="main_bundle.min.js" defer></script>
+ <link rel="stylesheet" href="styles.css">
+ </head>
+ <body>
+ <div id="config">
+ </div>
</body>
</html>
diff --git a/aos/network/www/main.ts b/aos/network/www/main.ts
index 5a3165e..1840ffb 100644
--- a/aos/network/www/main.ts
+++ b/aos/network/www/main.ts
@@ -1,5 +1,10 @@
import {Connection} from './proxy';
+import * as PingHandler from './ping_handler';
const conn = new Connection();
conn.connect();
+
+PingHandler.SetupDom();
+
+conn.addHandler(PingHandler.GetId(), PingHandler.HandlePing);
diff --git a/aos/network/www/ping_handler.ts b/aos/network/www/ping_handler.ts
new file mode 100644
index 0000000..9b37d70
--- /dev/null
+++ b/aos/network/www/ping_handler.ts
@@ -0,0 +1,26 @@
+import {aos} from 'aos/events/ping_generated';
+
+export function HandlePing(data: Uint8Array) {
+ const fbBuffer = new flatbuffers.ByteBuffer(data);
+ const ping = aos.examples.Ping.getRootAsPing(fbBuffer);
+
+ document.getElementById('val').innerHTML = ping.value();
+ document.getElementById('time').innerHTML = ping.sendTime().low;
+}
+
+export function SetupDom() {
+ const ping_div = document.createElement('div');
+ document.body.appendChild(ping_div);
+
+ const value_div = document.createElement('div');
+ value_div.setAttribute('id', 'val');
+ const time_div = document.createElement('div');
+ time_div.setAttribute('id', 'time');
+
+ ping_div.appendChild(value_div);
+ ping_div.appendChild(time_div);
+}
+
+export function GetId() {
+ return aos.examples.Ping.getFullyQualifiedName();
+}
diff --git a/aos/network/www/proxy.ts b/aos/network/www/proxy.ts
index 1ef6320..27ffbfe 100644
--- a/aos/network/www/proxy.ts
+++ b/aos/network/www/proxy.ts
@@ -1,4 +1,41 @@
-import {aos.web_proxy} from '../web_proxy_generated';
+import {aos} from 'aos/network/web_proxy_generated';
+import {ConfigHandler} from './config_handler';
+
+// There is one handler for each DataChannel, it maintains the state of
+// multi-part messages and delegates to a callback when the message is fully
+// assembled.
+export class Handler {
+ private dataBuffer: Uint8Array|null = null;
+ private receivedMessageLength: number = 0;
+ constructor(
+ private readonly handlerFunc: (data: Uint8Array) => void,
+ private readonly channel: RTCPeerConnection) {
+ channel.addEventListener('message', (e) => this.handleMessage(e));
+ }
+
+ handleMessage(e: MessageEvent): void {
+ const fbBuffer = new flatbuffers.ByteBuffer(new Uint8Array(e.data));
+ const messageHeader =
+ aos.web_proxy.MessageHeader.getRootAsMessageHeader(fbBuffer);
+ // Short circuit if only one packet
+ if (messageHeader.packetCount === 1) {
+ this.handlerFunc(messageHeader.dataArray());
+ return;
+ }
+
+ if (messageHeader.packetIndex() === 0) {
+ this.dataBuffer = new Uint8Array(messageHeader.length());
+ }
+ this.dataBuffer.set(
+ messageHeader.dataArray(),
+ this.receivedMessageLength);
+ this.receivedMessageLength += messageHeader.dataLength();
+
+ if (messageHeader.packetIndex() === messageHeader.packetCount() - 1) {
+ this.handlerFunc(this.dataBuffer);
+ }
+ }
+}
// Analogous to the Connection class in //aos/network/web_proxy.h. Because most
// of the apis are native in JS, it is much simpler.
@@ -7,12 +44,20 @@
private rtcPeerConnection: RTCPeerConnection|null = null;
private dataChannel: DataChannel|null = null;
private webSocketUrl: string;
+ private configHandler: ConfigHandler|null =
+ null private config: aos.Configuration|null = null;
+ private readonly handlerFuncs = new Map<string, (data: Uint8Array) => void>();
+ private readonly handlers = new Set<Handler>();
constructor() {
const server = location.host;
this.webSocketUrl = `ws://${server}/ws`;
}
+ addHandler(id: string, handler: (data: Uint8Array) => void): void {
+ this.handlerFuncs.set(id, handler);
+ }
+
connect(): void {
this.webSocketConnection = new WebSocket(this.webSocketUrl);
this.webSocketConnection.binaryType = 'arraybuffer';
@@ -22,14 +67,29 @@
'message', (e) => this.onWebSocketMessage(e));
}
- // Handle messages on the DataChannel. Will delegate to various handlers for
- // different message types.
+ // Handle messages on the DataChannel. Handles the Configuration message as
+ // all other messages are sent on specific DataChannels.
onDataChannelMessage(e: MessageEvent): void {
- console.log(e);
+ const fbBuffer = new flatbuffers.ByteBuffer(new Uint8Array(e.data));
+ // TODO(alex): handle config updates if/when required
+ if (!this.configHandler) {
+ const config = aos.Configuration.getRootAsConfiguration(fbBuffer);
+ this.config = config;
+ this.configHandler = new ConfigHandler(config, this.dataChannel);
+ this.configHandler.printConfig();
+ return;
+ }
+ }
+
+ onDataChannel(ev: RTCDataChannelEvent): void {
+ const channel = ev.channel;
+ const name = channel.label;
+ const channelType = name.split('/').pop();
+ const handlerFunc = this.handlerFuncs.get(channelType);
+ this.handlers.add(new Handler(handlerFunc, channel));
}
onIceCandidate(e: RTCPeerConnectionIceEvent): void {
- console.log('Created ice candidate', e);
if (!e.candidate) {
return;
}
@@ -49,7 +109,6 @@
// Called for new SDPs. Make sure to set it locally and remotely.
onOfferCreated(description: RTCSessionDescription): void {
- console.log('Created offer', description);
this.rtcPeerConnection.setLocalDescription(description);
const builder = new flatbuffers.Builder(512);
const offerString = builder.createString(description.sdp);
@@ -67,7 +126,9 @@
// want a DataChannel, so create it and then create an offer to send.
onWebSocketOpen(): void {
this.rtcPeerConnection = new RTCPeerConnection({});
- this.dataChannel = this.rtcPeerConnection.createDataChannel('dc');
+ this.rtcPeerConnection.addEventListener(
+ 'datachannel', (e) => this.onDataCnannel(e));
+ this.dataChannel = this.rtcPeerConnection.createDataChannel('signalling');
this.dataChannel.addEventListener(
'message', (e) => this.onDataChannelMessage(e));
window.dc = this.dataChannel;
@@ -81,14 +142,12 @@
// and handle appropriately. Either by setting the remote description or
// adding the remote ice candidate.
onWebSocketMessage(e: MessageEvent): void {
- console.log('ws: ', e);
const buffer = new Uint8Array(e.data)
const fbBuffer = new flatbuffers.ByteBuffer(buffer);
const message =
aos.web_proxy.WebSocketMessage.getRootAsWebSocketMessage(fbBuffer);
switch (message.payloadType()) {
case aos.web_proxy.Payload.WebSocketSdp:
- console.log('got an sdp message');
const sdpFb = message.payload(new aos.web_proxy.WebSocketSdp());
if (sdpFb.type() !== aos.web_proxy.SdpType.ANSWER) {
console.log('got something other than an answer back');
@@ -98,7 +157,6 @@
{'type': 'answer', 'sdp': sdpFb.payload()}));
break;
case aos.web_proxy.Payload.WebSocketIce:
- console.log('got an ice message');
const iceFb = message.payload(new aos.web_proxy.WebSocketIce());
const candidate = {} as RTCIceCandidateInit;
candidate.candidate = iceFb.candidate();
diff --git a/aos/network/www/styles.css b/aos/network/www/styles.css
new file mode 100644
index 0000000..23ceb21
--- /dev/null
+++ b/aos/network/www/styles.css
@@ -0,0 +1,5 @@
+.channel {
+ display: flex;
+ border-bottom: 1px solid;
+ font-size: 24px;
+}
diff --git a/third_party/flatbuffers/build_defs.bzl b/third_party/flatbuffers/build_defs.bzl
index 2ddc65b..a7e5576 100644
--- a/third_party/flatbuffers/build_defs.bzl
+++ b/third_party/flatbuffers/build_defs.bzl
@@ -33,6 +33,7 @@
"--no-ts-reexport",
"--reflect-names",
"--reflect-types",
+ "--gen-name-strings",
]
def flatbuffer_library_public(
diff --git a/third_party/flatbuffers/src/idl_gen_js_ts.cpp b/third_party/flatbuffers/src/idl_gen_js_ts.cpp
index 9c89c1a..be0a205 100644
--- a/third_party/flatbuffers/src/idl_gen_js_ts.cpp
+++ b/third_party/flatbuffers/src/idl_gen_js_ts.cpp
@@ -768,6 +768,14 @@
code += "');\n};\n\n";
}
+ // Generate the name method
+ if (parser_.opts.generate_name_strings) {
+ code +=
+ "static getFullyQualifiedName(): string {\n"
+ " return '" + object_namespace + "." + struct_def.name + "';\n"
+ "}\n";
+ }
+
// Emit field accessors
for (auto it = struct_def.fields.vec.begin();
it != struct_def.fields.vec.end(); ++it) {
diff --git a/y2020/BUILD b/y2020/BUILD
index 1d281bb..c80550b 100644
--- a/y2020/BUILD
+++ b/y2020/BUILD
@@ -125,3 +125,15 @@
srcs = ["__init__.py"],
visibility = ["//visibility:public"],
)
+
+sh_binary(
+ name = "web_proxy",
+ srcs = ["web_proxy.sh"],
+ data = [
+ ":config.json",
+ "//aos/network:web_proxy_main",
+ "//y2020/www:main_bundle",
+ "//y2020/www:files",
+ "//y2020/www:flatbuffers",
+ ],
+)
diff --git a/y2020/vision/BUILD b/y2020/vision/BUILD
index 2e3723d..fa30531 100644
--- a/y2020/vision/BUILD
+++ b/y2020/vision/BUILD
@@ -1,4 +1,4 @@
-load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library")
+load("@com_github_google_flatbuffers//:build_defs.bzl", "flatbuffer_cc_library", "flatbuffer_ts_library")
load("//aos:config.bzl", "aos_config")
flatbuffer_cc_library(
@@ -36,3 +36,9 @@
"//aos/events:shm_event_loop",
],
)
+
+flatbuffer_ts_library(
+ name = "vision_ts_fbs",
+ srcs = ["vision.fbs"],
+ visibility = ["//y2020:__subpackages__"],
+)
diff --git a/y2020/vision/vision.fbs b/y2020/vision/vision.fbs
index 66f695b..b8d8bd9 100644
--- a/y2020/vision/vision.fbs
+++ b/y2020/vision/vision.fbs
@@ -14,7 +14,7 @@
// The number of columns in the image.
cols:int;
// The image data.
- data:[byte];
+ data:[ubyte];
// Timestamp when the frame was captured.
monotonic_timestamp_ns:long;
realtime_timestamp_ns:long;
diff --git a/y2020/web_proxy.sh b/y2020/web_proxy.sh
new file mode 100755
index 0000000..8e1a570
--- /dev/null
+++ b/y2020/web_proxy.sh
@@ -0,0 +1 @@
+./aos/network/web_proxy_main --config=y2020/config.json --data_dir=y2020/www
diff --git a/y2020/www/BUILD b/y2020/www/BUILD
new file mode 100644
index 0000000..146456f
--- /dev/null
+++ b/y2020/www/BUILD
@@ -0,0 +1,45 @@
+load("@build_bazel_rules_typescript//:defs.bzl", "ts_library")
+load("@build_bazel_rules_nodejs//:defs.bzl", "rollup_bundle")
+
+ts_library(
+ name = "main",
+ srcs = [
+ "main.ts",
+ "image_handler.ts",
+ ],
+ deps = [
+ "//aos/network/www:proxy",
+ "//y2020/vision:vision_ts_fbs",
+ ],
+ visibility = ["//y2020:__subpackages__"],
+)
+
+rollup_bundle(
+ name = "main_bundle",
+ entry_point = "y2020/www/main",
+ deps = [
+ "main",
+ ],
+ visibility = ["//y2020:__subpackages__"],
+)
+
+filegroup(
+ name = "files",
+ srcs = glob([
+ "**/*.html",
+ "**/*.css",
+ ]),
+ visibility=["//visibility:public"],
+)
+
+genrule(
+ name = "flatbuffers",
+ srcs = [
+ "@com_github_google_flatbuffers//:flatjs",
+ ],
+ outs = [
+ "flatbuffers.js",
+ ],
+ cmd = "cp $(location @com_github_google_flatbuffers//:flatjs) $@",
+ visibility=["//y2020:__subpackages__"],
+)
diff --git a/y2020/www/image_handler.ts b/y2020/www/image_handler.ts
new file mode 100644
index 0000000..abaf831
--- /dev/null
+++ b/y2020/www/image_handler.ts
@@ -0,0 +1,61 @@
+import {frc971} from 'y2020/vision/vision_generated';
+
+export class ImageHandler {
+ private canvas = document.createElement('canvas');
+
+ constructor() {
+ document.body.appendChild(this.canvas);
+ }
+
+ handleImage(data: Uint8Array) {
+ const fbBuffer = new flatbuffers.ByteBuffer(data);
+ const image = frc971.vision.CameraImage.getRootAsCameraImage(fbBuffer);
+
+ const width = image.cols();
+ const height = image.rows();
+ if (width === 0 || height === 0) {
+ return;
+ }
+ const imageBuffer = new Uint8ClampedArray(width * height * 4); // RGBA
+
+ // Read four bytes (YUYV) from the data and transform into two pixels of
+ // RGBA for canvas
+ for (const j = 0; j < height; j++) {
+ for (const i = 0; i < width; i += 2) {
+ const y1 = image.data((j * width + i) * 2);
+ const u = image.data((j * width + i) * 2 + 1);
+ const y2 = image.data((j * width + i + 1) * 2);
+ const v = image.data((j * width + i + 1) * 2 + 1);
+
+ // Based on https://en.wikipedia.org/wiki/YUV#Converting_between_Y%E2%80%B2UV_and_RGB
+ const c1 = y1 - 16;
+ const c2 = y2 - 16;
+ const d = u - 128;
+ const e = v - 128;
+
+ imageBuffer[(j * width + i) * 4 + 0] = (298 * c1 + 409 * e + 128) >> 8;
+ imageBuffer[(j * width + i) * 4 + 1] =
+ (298 * c1 - 100 * d - 208 * e + 128) >> 8;
+ imageBuffer[(j * width + i) * 4 + 2] = (298 * c1 + 516 * d + 128) >> 8;
+ imageBuffer[(j * width + i) * 4 + 3] = 255;
+ imageBuffer[(j * width + i) * 4 + 4] = (298 * c2 + 409 * e + 128) >> 8;
+ imageBuffer[(j * width + i) * 4 + 5] =
+ (298 * c2 - 100 * d - 208 * e + 128) >> 8;
+ imageBuffer[(j * width + i) * 4 + 6] = (298 * c2 + 516 * d + 128) >> 8;
+ imageBuffer[(j * width + i) * 4 + 7] = 255;
+ }
+ }
+
+ const ctx = this.canvas.getContext('2d');
+
+ this.canvas.width = width;
+ this.canvas.height = height;
+ const idata = ctx.createImageData(width, height);
+ idata.data.set(imageBuffer);
+ ctx.putImageData(idata, 0, 0);
+ }
+
+ getId() {
+ return frc971.vision.CameraImage.getFullyQualifiedName();
+ }
+}
diff --git a/y2020/www/index.html b/y2020/www/index.html
new file mode 100644
index 0000000..97e16d4
--- /dev/null
+++ b/y2020/www/index.html
@@ -0,0 +1,11 @@
+<html>
+ <head>
+ <script src="flatbuffers.js"></script>
+ <script src="main_bundle.min.js" defer></script>
+ <link rel="stylesheet" href="styles.css">
+ </head>
+ <body>
+ <div id="config">
+ </div>
+ </body>
+</html>
diff --git a/y2020/www/main.ts b/y2020/www/main.ts
new file mode 100644
index 0000000..7831713
--- /dev/null
+++ b/y2020/www/main.ts
@@ -0,0 +1,11 @@
+import {Connection} from 'aos/network/www/proxy';
+
+import {ImageHandler} from './image_handler';
+
+const conn = new Connection();
+
+conn.connect();
+
+const iHandler = new ImageHandler();
+
+conn.addHandler(iHandler.getId(), (data) => iHandler.handleImage(data));
diff --git a/y2020/www/styles.css b/y2020/www/styles.css
new file mode 100644
index 0000000..23ceb21
--- /dev/null
+++ b/y2020/www/styles.css
@@ -0,0 +1,5 @@
+.channel {
+ display: flex;
+ border-bottom: 1px solid;
+ font-size: 24px;
+}