Fix up synchronization with WebRTC calls

We were both calling one of the WebRTC functions from the wrong thread
and were modifying data structures from multiple threads simultaneously.

Change-Id: I76c1dd080b8b63d3b1155733f31ee6d1e3b30949
diff --git a/aos/network/web_proxy.cc b/aos/network/web_proxy.cc
index 7c34b22..af1b646 100644
--- a/aos/network/web_proxy.cc
+++ b/aos/network/web_proxy.cc
@@ -115,8 +115,24 @@
 }
 
 void Subscriber::RunIteration() {
-  // TODO(james): The channels_ struct gets accessed here, but modified by
-  // Add/RemoveListener, which are called from separate threads.
+  {
+    // Manage updating the channels_ map given the pending_* members from the
+    // *Listeners() methods.
+    // We handle all the removals first so that we correctly handle the
+    // situation where the user calls RemoveListener() and then AddListener()
+    // between calls to RunIteration(). The reverse order (adding and then
+    // removing before an update) is handled directly in RemoveListener() where
+    // we remove things from the pending_channels_ map directly.
+    MutexLocker lock(&mutex_);
+    for (const auto &channel : pending_removal_) {
+      channels_.erase(channel);
+    }
+    pending_removal_.clear();
+    for (const auto &channel : pending_channels_) {
+      channels_.insert(channel);
+    }
+    pending_channels_.clear();
+  }
   if (channels_.empty() && buffer_size_ == 0) {
     return;
   }
@@ -156,9 +172,14 @@
         VLOG(1) << "skipping a send because buffered amount is too high";
         break;
       }
-      // TODO(james): This Send() should be called from the signalling_thread
-      // created by the Connection.
-      rtc_channel->Send(*buffer);
+      // Call Send() from the signalling thread. The Invoke() call blocks until
+      // the handler has been called, so we do not need to handle any
+      // synchronization on this end. The body of the handler should be kept as
+      // short as possible to avoid blocking the signalling thread continuously
+      // for any longer than necessary.
+      channel_data->signaling_thread->Invoke<void>(
+          RTC_FROM_HERE,
+          [rtc_channel, buffer]() { rtc_channel->Send(*buffer); });
       buffer = NextBuffer(channel_data);
     }
   }
@@ -178,10 +199,12 @@
 
 void Subscriber::AddListener(
     rtc::scoped_refptr<webrtc::DataChannelInterface> channel,
-    TransferMethod transfer_method) {
+    TransferMethod transfer_method, rtc::Thread *signaling_thread) {
+  MutexLocker lock(&mutex_);
   ChannelInformation info;
   info.transfer_method = transfer_method;
-  channels_.emplace(channel, info);
+  info.signaling_thread = signaling_thread;
+  pending_channels_.emplace(channel, info);
 }
 
 const webrtc::DataBuffer *Subscriber::NextBuffer(ChannelInformation *channel) {
@@ -234,7 +257,9 @@
 
 void Subscriber::RemoveListener(
     rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
-  channels_.erase(channel);
+  MutexLocker lock(&mutex_);
+  pending_channels_.erase(channel);
+  pending_removal_.push_back(channel);
 }
 
 Connection::Connection(
@@ -282,6 +307,8 @@
       signaling_thread->SetName("signaling_thread", nullptr);
       signaling_thread->Start();
 
+      signaling_thread_ = signaling_thread.get();
+
       webrtc::PeerConnectionFactoryDependencies factory_deps;
       factory_deps.signaling_thread = signaling_thread.release();
       rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface> factory =
@@ -379,6 +406,9 @@
 // Handle DataChannel messages. Subscribe to each listener that matches the
 // subscribe message
 void Connection::OnMessage(const webrtc::DataBuffer &buffer) {
+  // Sanity check--we are relying on the Add/RemoveListener calls being made
+  // from the correct thread.
+  CHECK(signaling_thread_->IsCurrent());
   FlatbufferSpan<SubscriberRequest> message(
       {buffer.data.data(), buffer.data.size()});
   if (!message.Verify()) {
@@ -421,7 +451,7 @@
                           nullptr)});
           it = pair.first;
         }
-        subscriber->AddListener(it->second, transfer_method);
+        subscriber->AddListener(it->second, transfer_method, signaling_thread_);
 
         VLOG(1) << "Subscribe to: " << channel->type()->str();
         found_match = true;