Add backpressure to web_proxy
Implement things such that the client will send back what it has
currently processed on a given channel. The server will then avoid
sending more than 10000 messages ahead of that point.
Also, fix up some memory management to ensure that data channels
actually get closed/destroyed at the end of a browser session.
Change-Id: Id1795d7496f410332407624a559d6a16a1698702
Signed-off-by: James Kuszmaul <jabukuszmaul@gmail.com>
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index 94473fb..fdd8f1e 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -18,6 +18,12 @@
}
DEFINE_int32(proxy_port, 1180, "Port to use for the web proxy server.");
+DEFINE_int32(pre_send_messages, 10000,
+ "Number of messages / queue to send to a client before waiting on "
+ "confirmation that the initial message was received. If set to "
+ "-1, will not throttle messages at all. This prevents a situation "
+ "where, when run on localhost, the large number of WebRTC packets "
+ "can overwhelm the browser and crash the webpage.");
namespace aos {
namespace web_proxy {
@@ -234,7 +240,8 @@
}
}
for (auto &conn : channels_) {
- std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first;
+ std::shared_ptr<ScopedDataChannel> rtc_channel = conn.first.lock();
+ CHECK(rtc_channel) << "data_channel was destroyed too early.";
ChannelInformation *channel_data = &conn.second;
if (channel_data->transfer_method == TransferMethod::SUBSAMPLE) {
SkipToLastMessage(channel_data);
@@ -261,7 +268,6 @@
void Subscriber::AddListener(std::shared_ptr<ScopedDataChannel> data_channel,
TransferMethod transfer_method) {
- VLOG(1) << "Adding listener for " << data_channel.get();
ChannelInformation info;
info.transfer_method = transfer_method;
@@ -271,13 +277,35 @@
fetcher_->Fetch();
}
- channels_.emplace(data_channel, info);
+ channels_.emplace_back(std::make_pair(data_channel, info));
+
+ data_channel->set_on_message(
+ [this, index = channels_.size() - 1](
+ struct mbuf *const buffer,
+ const enum rawrtc_data_channel_message_flag /*flags*/) {
+ FlatbufferSpan<ChannelState> message(
+ {mbuf_buf(buffer), mbuf_get_left(buffer)});
+ if (!message.Verify()) {
+ LOG(ERROR) << "Invalid flatbuffer received from browser client.";
+ return;
+ }
+
+ channels_[index].second.reported_queue_index =
+ message.message().queue_index();
+ channels_[index].second.reported_packet_index =
+ message.message().packet_index();
+ });
}
-void Subscriber::RemoveListener(
- std::shared_ptr<ScopedDataChannel> data_channel) {
- VLOG(1) << "Removing listener for " << data_channel.get();
- channels_.erase(data_channel);
+void Subscriber::RemoveListener(std::shared_ptr<ScopedDataChannel> data_channel) {
+ channels_.erase(
+ std::remove_if(
+ channels_.begin(), channels_.end(),
+ [data_channel](const std::pair<std::weak_ptr<ScopedDataChannel>,
+ ChannelInformation> &channel) {
+ return channel.first.lock().get() == data_channel.get();
+ }),
+ channels_.end());
}
std::shared_ptr<struct mbuf> Subscriber::NextBuffer(
@@ -294,10 +322,24 @@
channel->next_packet_number = 0;
return message_buffer_.front().data.at(0);
}
+ // TODO(james): Handle queue index wrapping when >2^32 messages are sent on a
+ // channel.
if (channel->current_queue_index > latest_index) {
// We are still waiting on the next message to appear; return.
return nullptr;
}
+ if (FLAGS_pre_send_messages > 0) {
+ // Don't buffer up an excessive number of messages to the client.
+ // This currently ignores the packet index (and really, any concept of
+ // message size), but the main goal is just to avoid locking up the client
+ // browser, not to be ultra precise about anything. It's also not clear that
+ // message *size* is necessarily even the determining factor in causing
+ // issues.
+ if (channel->reported_queue_index + FLAGS_pre_send_messages <
+ channel->current_queue_index) {
+ return nullptr;
+ }
+ }
CHECK_EQ(latest_index - earliest_index + 1, message_buffer_.size())
<< "Inconsistent queue indices.";
const size_t packets_in_message =
@@ -575,7 +617,7 @@
auto it = channels_.find(channel_index);
if (it == channels_.end()) {
std::shared_ptr<ScopedDataChannel> data_channel =
- std::make_shared<ScopedDataChannel>();
+ ScopedDataChannel::MakeDataChannel();
std::weak_ptr<ScopedDataChannel> data_channel_weak_ptr = data_channel;
@@ -584,9 +626,10 @@
std::shared_ptr<ScopedDataChannel> data_channel =
data_channel_weak_ptr.lock();
CHECK(data_channel) << ": Subscriber got destroyed before we started.";
- // Weak ptr inside the subscriber so we don't have a circular
+ // Raw pointer inside the subscriber so we don't have a circular
// reference. AddListener will close it.
- subscribers_[channel_index]->AddListener(data_channel, transfer_method);
+ subscribers_[channel_index]->AddListener(data_channel,
+ transfer_method);
});
Subscriber *subscriber = subscribers_[channel_index].get();