Fix up synchronization with WebRTC calls
We were both calling one of the WebRTC functions from the wrong thread
and were modifying data structures from multiple threads simultaneously.
Change-Id: I76c1dd080b8b63d3b1155733f31ee6d1e3b30949
diff --git a/aos/network/BUILD b/aos/network/BUILD
index be7e5eb..b430d90 100644
--- a/aos/network/BUILD
+++ b/aos/network/BUILD
@@ -397,6 +397,7 @@
":web_proxy_fbs",
":web_proxy_utils",
"//aos/events:shm_event_loop",
+ "//aos/mutex",
"//aos/seasocks:seasocks_logger",
"//third_party:webrtc",
"//third_party/seasocks",
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index 7c34b22..af1b646 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -115,8 +115,24 @@
}
void Subscriber::RunIteration() {
- // TODO(james): The channels_ struct gets accessed here, but modified by
- // Add/RemoveListener, which are called from separate threads.
+ {
+ // Manage updating the channels_ map given the pending_* members from the
+ // *Listeners() methods.
+ // We handle all the removals first so that we correctly handle the
+ // situation where the user calls RemoveListener() and then AddListener()
+ // between calls to RunIteration(). The reverse order (adding and then
+ // removing before an update) is handled directly in RemoveListener() where
+ // we remove things from the pending_channels_ map directly.
+ MutexLocker lock(&mutex_);
+ for (const auto &channel : pending_removal_) {
+ channels_.erase(channel);
+ }
+ pending_removal_.clear();
+ for (const auto &channel : pending_channels_) {
+ channels_.insert(channel);
+ }
+ pending_channels_.clear();
+ }
if (channels_.empty() && buffer_size_ == 0) {
return;
}
@@ -156,9 +172,14 @@
VLOG(1) << "skipping a send because buffered amount is too high";
break;
}
- // TODO(james): This Send() should be called from the signalling_thread
- // created by the Connection.
- rtc_channel->Send(*buffer);
+ // Call Send() from the signalling thread. The Invoke() call blocks until
+ // the handler has been called, so we do not need to handle any
+ // synchronization on this end. The body of the handler should be kept as
+ // short as possible to avoid blocking the signalling thread continuously
+ // for any longer than necessary.
+ channel_data->signaling_thread->Invoke<void>(
+ RTC_FROM_HERE,
+ [rtc_channel, buffer]() { rtc_channel->Send(*buffer); });
buffer = NextBuffer(channel_data);
}
}
@@ -178,10 +199,12 @@
void Subscriber::AddListener(
rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
- TransferMethod transfer_method) {
+ TransferMethod transfer_method, rtc::Thread *signaling_thread) {
+ MutexLocker lock(&mutex_);
ChannelInformation info;
info.transfer_method = transfer_method;
- channels_.emplace(channel, info);
+ info.signaling_thread = signaling_thread;
+ pending_channels_.emplace(channel, info);
}
const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
@@ -234,7 +257,9 @@
void Subscriber::RemoveListener(
rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
- channels_.erase(channel);
+ MutexLocker lock(&mutex_);
+ pending_channels_.erase(channel);
+ pending_removal_.push_back(channel);
}
Connection::Connection(
@@ -282,6 +307,8 @@
signaling_thread->SetName("signaling_thread", nullptr);
signaling_thread->Start();
+ signaling_thread_ = signaling_thread.get();
+
webrtc::PeerConnectionFactoryDependencies factory_deps;
factory_deps.signaling_thread = signaling_thread.release();
rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
@@ -379,6 +406,9 @@
// Handle DataChannel messages. Subscribe to each listener that matches the
// subscribe message
void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
+ // Sanity check--we are relying on the Add/RemoveListener calls being made
+ // from the correct thread.
+ CHECK(signaling_thread_->IsCurrent());
FlatbufferSpan<SubscriberRequest> message(
{buffer.data.data(), buffer.data.size()});
if (!message.Verify()) {
@@ -421,7 +451,7 @@
nullptr)});
it = pair.first;
}
- subscriber->AddListener(it->second, transfer_method);
+ subscriber->AddListener(it->second, transfer_method, signaling_thread_);
VLOG(1) << "Subscribe to: " << channel->type()->str();
found_match = true;
diff --git a/aos/network/web_proxy.h b/aos/network/web_proxy.h
index 85cfd6f..ab524de 100644
--- a/aos/network/web_proxy.h
+++ b/aos/network/web_proxy.h
@@ -4,6 +4,7 @@
#include <set>
#include "aos/events/event_loop.h"
#include "aos/events/shm_event_loop.h"
+#include "aos/mutex/mutex.h"
#include "aos/network/connect_generated.h"
#include "aos/network/web_proxy_generated.h"
#include "aos/seasocks/seasocks_logger.h"
@@ -99,7 +100,8 @@
void RunIteration();
void AddListener(rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
- TransferMethod transfer_method);
+ TransferMethod transfer_method,
+ rtc::Thread *signaling_thread);
void RemoveListener(
rtc::scoped_refptr<webrtc::DataChannelInterface> channel);
@@ -114,6 +116,8 @@
TransferMethod transfer_method;
uint32_t current_queue_index = 0;
size_t next_packet_number = 0;
+ // Thread to use for making calls to the DataChannelInterface.
+ rtc::Thread *signaling_thread;
};
struct Message {
uint32_t index = 0xffffffff;
@@ -130,6 +134,15 @@
std::deque<Message> message_buffer_;
std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
channels_;
+ // In order to enable the Connection class to add/remove listeners
+ // asyncrhonously, queue up all the newly added listeners in pending_*
+ // members. Access to these members is controlled by mutex_.
+ std::map<rtc::scoped_refptr<webrtc::DataChannelInterface>, ChannelInformation>
+ pending_channels_;
+ std::vector<rtc::scoped_refptr<webrtc::DataChannelInterface>>
+ pending_removal_;
+
+ aos::Mutex mutex_;
};
// Represents a single connection to a browser for the entire lifetime of the
@@ -190,6 +203,10 @@
private:
::seasocks::WebSocket *sock_;
::seasocks::Server *server_;
+ // The signaling thread is the thread on which most/all of the work we do with
+ // WebRTC will happen--it is both where the handlers we register should be
+ // called and where we should be calling Send() from.
+ rtc::Thread *signaling_thread_;
const std::vector<std::unique_ptr<Subscriber>> &subscribers_;
const std::vector<FlatbufferDetachedBuffer<MessageHeader>> config_headers_;
std::map<int, rtc::scoped_refptr<webrtc::DataChannelInterface>> channels_;