Send webpage config in multiple parts
Change-Id: I87b90ea5ec498e93647bb04012a51f687c05a4a0
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index 5b50072..3e9cd8f 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -90,7 +90,7 @@
: sock_(sock),
server_(server),
subscribers_(subscribers),
- config_(config) {}
+ config_headers_(PackBuffer(config.span())) {}
// Function called for web socket data. Parses the flatbuffer and handles it
// appropriately.
@@ -211,7 +211,9 @@
void Connection::OnStateChange() {
if (peer_connection_.get() != nullptr &&
data_channel_->state() == webrtc::DataChannelInterface::kOpen) {
- Send(config_.buffer());
+ for (const auto &header: config_headers_) {
+ Send(header.buffer());
+ }
}
}
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 409e61d..549a7ae 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -105,7 +105,9 @@
~Connection() {
// DataChannel may call OnStateChange after this is destroyed, so make sure
// it doesn't.
- data_channel_->UnregisterObserver();
+ if (data_channel_) {
+ data_channel_->UnregisterObserver();
+ }
}
void HandleWebSocketData(const uint8_t *data, size_t size);
@@ -148,7 +150,7 @@
::seasocks::WebSocket *sock_;
::seasocks::Server *server_;
const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
- const aos::FlatbufferDetachedBuffer<aos::Configuration> &config_;
+ const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
std::map<int, rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;
rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
diff --git a/aos/network/web_proxy_main.cc b/aos/network/web_proxy_main.cc
index c674520..227f980 100644
--- a/aos/network/web_proxy_main.cc
+++ b/aos/network/web_proxy_main.cc
@@ -62,14 +62,6 @@
aos::FlatbufferDetachedBuffer<aos::Configuration> config =
aos::configuration::ReadConfig(FLAGS_config);
- for (size_t i = 0; i < config.message().channels()->size(); ++i) {
- aos::Channel *channel =
- config.mutable_message()->mutable_channels()->GetMutableObject(i);
- channel->clear_schema();
- }
-
- config = aos::CopyFlatBuffer(&config.message());
-
std::vector<std::unique_ptr<aos::web_proxy::Subscriber>> subscribers;
std::thread data_thread{
diff --git a/aos/network/web_proxy_utils.cc b/aos/network/web_proxy_utils.cc
index 15aedf2..21550f8 100644
--- a/aos/network/web_proxy_utils.cc
+++ b/aos/network/web_proxy_utils.cc
@@ -3,30 +3,48 @@
namespace aos {
namespace web_proxy {
+namespace {
// Recommended max size is 64KiB for compatibility reasons. 256KiB theoretically
// works on chrome but seemed to have some consistency issues. Picked a size in
// the middle which seems to work.
constexpr size_t kPacketSize = 125000;
+int GetPacketCountFromSize(const int packet_size) {
+ return packet_size / kPacketSize + 1;
+}
+
+flatbuffers::Offset<flatbuffers::Vector<uint8_t>> FillOutPacketVector(
+ flatbuffers::FlatBufferBuilder *fbb, absl::Span<const uint8_t> span,
+ const int packet_index) {
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
+ if (kPacketSize * (packet_index + 1) < span.size()) {
+ data_offset = fbb->CreateVector(
+ static_cast<const uint8_t *>(span.data()) + kPacketSize * packet_index,
+ kPacketSize);
+ } else {
+ const int prefix_size = kPacketSize * packet_index;
+ data_offset = fbb->CreateVector(
+ static_cast<const uint8_t *>(span.data()) + prefix_size,
+ span.size() - prefix_size);
+ }
+ return data_offset;
+}
+} // namespace
+
int GetPacketCount(const Context &context) {
- return context.size / kPacketSize + 1;
+ return GetPacketCountFromSize(context.size);
}
flatbuffers::Offset<MessageHeader> PackMessage(
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, int packet_index) {
- int packet_count = GetPacketCount(context);
- flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset;
- if (kPacketSize * (packet_index + 1) < context.size) {
- data_offset = fbb->CreateVector(
- static_cast<const uint8_t *>(context.data) + kPacketSize * packet_index,
- kPacketSize);
- } else {
- int prefix_size = kPacketSize * packet_index;
- data_offset = fbb->CreateVector(
- static_cast<const uint8_t *>(context.data) + prefix_size,
- context.size - prefix_size);
- }
+ const int packet_count = GetPacketCount(context);
+ const flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
+ FillOutPacketVector(
+ fbb,
+ absl::Span<const uint8_t>{static_cast<const uint8_t *>(context.data),
+ context.size},
+ packet_index);
MessageHeader::Builder message_header_builder(*fbb);
message_header_builder.add_channel_index(channel_index);
@@ -39,5 +57,27 @@
return message_header_builder.Finish();
}
+std::vector<FlatbufferDetachedBuffer<MessageHeader>> PackBuffer(
+ absl::Span<const uint8_t> span) {
+ flatbuffers::FlatBufferBuilder fbb;
+ std::vector<FlatbufferDetachedBuffer<MessageHeader>> buffers;
+ const int packet_count = GetPacketCountFromSize(span.size());
+ for (int ii = 0; ii < packet_count; ++ii) {
+ const flatbuffers::Offset<flatbuffers::Vector<uint8_t>> data_offset =
+ FillOutPacketVector(&fbb, span, ii);
+
+ MessageHeader::Builder message_header_builder(fbb);
+ message_header_builder.add_packet_count(packet_count);
+ message_header_builder.add_packet_index(ii);
+ message_header_builder.add_data(data_offset);
+ message_header_builder.add_length(span.size());
+
+ fbb.Finish(message_header_builder.Finish());
+
+ buffers.emplace_back(fbb.Release());
+ }
+ return buffers;
+}
+
} // namespace web_proxy
} // namespace aos
diff --git a/aos/network/web_proxy_utils.h b/aos/network/web_proxy_utils.h
index 0672ddc..09ad333 100644
--- a/aos/network/web_proxy_utils.h
+++ b/aos/network/web_proxy_utils.h
@@ -1,5 +1,7 @@
+#include "absl/types/span.h"
#include "aos/network/web_proxy_generated.h"
#include "aos/events/event_loop.h"
+#include "aos/flatbuffers.h"
namespace aos {
namespace web_proxy {
@@ -15,6 +17,10 @@
flatbuffers::FlatBufferBuilder *fbb, const Context &context,
int channel_index, int packet_index);
+// Packs the provided raw data into a series of MessageHeader's of the
+// appropriate length.
+std::vector<FlatbufferDetachedBuffer<MessageHeader>> PackBuffer(
+ absl::Span<const uint8_t> span);
} // namespace web_proxy
} // namespace aos
diff --git a/aos/network/www/config_handler.ts b/aos/network/www/config_handler.ts
index 6615f39..048c2ba 100644
--- a/aos/network/www/config_handler.ts
+++ b/aos/network/www/config_handler.ts
@@ -27,7 +27,7 @@
}
printConfig() {
- for (const i = 0; i < this.config.channelsLength(); i++) {
+ for (let i = 0; i < this.config.channelsLength(); i++) {
const channel_div = document.createElement('div');
channel_div.classList.add('channel');
this.tree_div.appendChild(channel_div);
diff --git a/aos/network/www/proxy.ts b/aos/network/www/proxy.ts
index 7bcf575..7491ad5 100644
--- a/aos/network/www/proxy.ts
+++ b/aos/network/www/proxy.ts
@@ -90,8 +90,8 @@
// Handle messages on the DataChannel. Handles the Configuration message as
// all other messages are sent on specific DataChannels.
- onDataChannelMessage(e: MessageEvent): void {
- const fbBuffer = new flatbuffers.ByteBuffer(new Uint8Array(e.data));
+ onConfigMessage(data: Uint8Array): void {
+ const fbBuffer = new flatbuffers.ByteBuffer(data);
this.configInternal = Configuration.getRootAsConfiguration(fbBuffer);
for (const handler of Array.from(this.configHandlers)) {
handler(this.configInternal);
@@ -146,8 +146,8 @@
this.rtcPeerConnection.addEventListener(
'datachannel', (e) => this.onDataChannel(e));
this.dataChannel = this.rtcPeerConnection.createDataChannel('signalling');
- this.dataChannel.addEventListener(
- 'message', (e) => this.onDataChannelMessage(e));
+ this.handlers.add(
+ new Handler((data) => this.onConfigMessage(data), this.dataChannel));
window.dc = this.dataChannel;
this.rtcPeerConnection.addEventListener(
'icecandidate', (e) => this.onIceCandidate(e));