Fix up synchronization with WebRTC calls
We were both calling one of the WebRTC functions from the wrong thread
and were modifying data structures from multiple threads simultaneously.
Change-Id: I76c1dd080b8b63d3b1155733f31ee6d1e3b30949
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index 7c34b22..af1b646 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -115,8 +115,24 @@
}
void Subscriber::RunIteration() {
- // TODO(james): The channels_ struct gets accessed here, but modified by
- // Add/RemoveListener, which are called from separate threads.
+ {
+ // Manage updating the channels_ map given the pending_* members from the
+ // *Listeners() methods.
+ // We handle all the removals first so that we correctly handle the
+ // situation where the user calls RemoveListener() and then AddListener()
+ // between calls to RunIteration(). The reverse order (adding and then
+ // removing before an update) is handled directly in RemoveListener() where
+ // we remove things from the pending_channels_ map directly.
+ MutexLocker lock(&mutex_);
+ for (const auto &channel : pending_removal_) {
+ channels_.erase(channel);
+ }
+ pending_removal_.clear();
+ for (const auto &channel : pending_channels_) {
+ channels_.insert(channel);
+ }
+ pending_channels_.clear();
+ }
if (channels_.empty() && buffer_size_ == 0) {
return;
}
@@ -156,9 +172,14 @@
VLOG(1) << "skipping a send because buffered amount is too high";
break;
}
- // TODO(james): This Send() should be called from the signalling_thread
- // created by the Connection.
- rtc_channel->Send(*buffer);
+ // Call Send() from the signalling thread. The Invoke() call blocks until
+ // the handler has been called, so we do not need to handle any
+ // synchronization on this end. The body of the handler should be kept as
+ // short as possible to avoid blocking the signalling thread continuously
+ // for any longer than necessary.
+ channel_data->signaling_thread->Invoke<void>(
+ RTC_FROM_HERE,
+ [rtc_channel, buffer]() { rtc_channel->Send(*buffer); });
buffer = NextBuffer(channel_data);
}
}
@@ -178,10 +199,12 @@
void Subscriber::AddListener(
rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
- TransferMethod transfer_method) {
+ TransferMethod transfer_method, rtc::Thread *signaling_thread) {
+ MutexLocker lock(&mutex_);
ChannelInformation info;
info.transfer_method = transfer_method;
- channels_.emplace(channel, info);
+ info.signaling_thread = signaling_thread;
+ pending_channels_.emplace(channel, info);
}
const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
@@ -234,7 +257,9 @@
void Subscriber::RemoveListener(
rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
- channels_.erase(channel);
+ MutexLocker lock(&mutex_);
+ pending_channels_.erase(channel);
+ pending_removal_.push_back(channel);
}
Connection::Connection(
@@ -282,6 +307,8 @@
signaling_thread->SetName("signaling_thread", nullptr);
signaling_thread->Start();
+ signaling_thread_ = signaling_thread.get();
+
webrtc::PeerConnectionFactoryDependencies factory_deps;
factory_deps.signaling_thread = signaling_thread.release();
rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
@@ -379,6 +406,9 @@
// Handle DataChannel messages. Subscribe to each listener that matches the
// subscribe message
void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
+ // Sanity check--we are relying on the Add/RemoveListener calls being made
+ // from the correct thread.
+ CHECK(signaling_thread_->IsCurrent());
FlatbufferSpan<SubscriberRequest> message(
{buffer.data.data(), buffer.data.size()});
if (!message.Verify()) {
@@ -421,7 +451,7 @@
nullptr)});
it = pair.first;
}
- subscriber->AddListener(it->second, transfer_method);
+ subscriber->AddListener(it->second, transfer_method, signaling_thread_);
VLOG(1) << "Subscribe to: " << channel->type()->str();
found_match = true;