Fix a problem in Thread::Send.
Previously if thread A->Send is called on thread B, B->ReceiveSends will be called, which enables an arbitrary thread to invoke calls on B while B is wait for A->Send to return. This caused mutliple problems like issue 3559, 3579.
The fix is to limit B->ReceiveSends to only process requests from A.
Also disallow the worker thread invoking other threads.

BUG=3559
R=juberti@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/15089004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@7290 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/talk/app/webrtc/peerconnection_unittest.cc b/talk/app/webrtc/peerconnection_unittest.cc
index 0d3e426..977fc11 100644
--- a/talk/app/webrtc/peerconnection_unittest.cc
+++ b/talk/app/webrtc/peerconnection_unittest.cc
@@ -481,9 +481,8 @@
     if (!allocator_factory_) {
       return false;
     }
-    audio_thread_.Start();
     fake_audio_capture_module_ = FakeAudioCaptureModule::Create(
-        &audio_thread_);
+        rtc::Thread::Current());
 
     if (fake_audio_capture_module_ == NULL) {
       return false;
@@ -557,12 +556,6 @@
   }
 
   std::string id_;
-  // Separate thread for executing |fake_audio_capture_module_| tasks. Audio
-  // processing must not be performed on the same thread as signaling due to
-  // signaling time constraints and relative complexity of the audio pipeline.
-  // This is consistent with the video pipeline that us a a separate thread for
-  // encoding and decoding.
-  rtc::Thread audio_thread_;
 
   rtc::scoped_refptr<webrtc::PortAllocatorFactoryInterface>
       allocator_factory_;
diff --git a/talk/app/webrtc/peerconnectionfactory.cc b/talk/app/webrtc/peerconnectionfactory.cc
index 5dccba8..862ceda 100644
--- a/talk/app/webrtc/peerconnectionfactory.cc
+++ b/talk/app/webrtc/peerconnectionfactory.cc
@@ -41,6 +41,7 @@
 #include "talk/media/webrtc/webrtcmediaengine.h"
 #include "talk/media/webrtc/webrtcvideodecoderfactory.h"
 #include "talk/media/webrtc/webrtcvideoencoderfactory.h"
+#include "webrtc/base/bind.h"
 #include "webrtc/modules/audio_device/include/audio_device.h"
 
 using rtc::scoped_refptr;
diff --git a/talk/app/webrtc/test/peerconnectiontestwrapper.cc b/talk/app/webrtc/test/peerconnectiontestwrapper.cc
index 8a4f45c..24932b8 100644
--- a/talk/app/webrtc/test/peerconnectiontestwrapper.cc
+++ b/talk/app/webrtc/test/peerconnectiontestwrapper.cc
@@ -75,9 +75,8 @@
     return false;
   }
 
-  audio_thread_.Start();
   fake_audio_capture_module_ = FakeAudioCaptureModule::Create(
-      &audio_thread_);
+      rtc::Thread::Current());
   if (fake_audio_capture_module_ == NULL) {
     return false;
   }
diff --git a/talk/app/webrtc/test/peerconnectiontestwrapper.h b/talk/app/webrtc/test/peerconnectiontestwrapper.h
index f3477ce..d4a0e4e 100644
--- a/talk/app/webrtc/test/peerconnectiontestwrapper.h
+++ b/talk/app/webrtc/test/peerconnectiontestwrapper.h
@@ -111,7 +111,6 @@
       bool video, const webrtc::FakeConstraints& video_constraints);
 
   std::string name_;
-  rtc::Thread audio_thread_;
   rtc::scoped_refptr<webrtc::PortAllocatorFactoryInterface>
       allocator_factory_;
   rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
diff --git a/talk/session/media/channelmanager.cc b/talk/session/media/channelmanager.cc
index 45e7e47..199bc86 100644
--- a/talk/session/media/channelmanager.cc
+++ b/talk/session/media/channelmanager.cc
@@ -137,6 +137,12 @@
       this, &ChannelManager::OnVideoCaptureStateChange);
   capture_manager_->SignalCapturerStateChange.connect(
       this, &ChannelManager::OnVideoCaptureStateChange);
+
+  if (worker_thread_ != rtc::Thread::Current()) {
+    // Do not allow invoking calls to other threads on the worker thread.
+    worker_thread_->Invoke<bool>(
+        rtc::Bind(&rtc::Thread::SetAllowBlockingCalls, worker_thread_, false));
+  }
 }
 
 ChannelManager::~ChannelManager() {
diff --git a/webrtc/base/thread.cc b/webrtc/base/thread.cc
index 9d2917d..40257ab 100644
--- a/webrtc/base/thread.cc
+++ b/webrtc/base/thread.cc
@@ -411,15 +411,12 @@
 }
 
 void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
-  AssertBlockingIsAllowedOnCurrentThread();
-
   if (fStop_)
     return;
 
   // Sent messages are sent to the MessageHandler directly, in the context
   // of "thread", like Win32 SendMessage. If in the right context,
   // call the handler directly.
-
   Message msg;
   msg.phandler = phandler;
   msg.message_id = id;
@@ -429,6 +426,8 @@
     return;
   }
 
+  AssertBlockingIsAllowedOnCurrentThread();
+
   AutoThread thread;
   Thread *current_thread = Thread::Current();
   ASSERT(current_thread != NULL);  // AutoThread ensures this
@@ -451,7 +450,9 @@
   crit_.Enter();
   while (!ready) {
     crit_.Leave();
-    current_thread->ReceiveSends();
+    // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
+    // thread invoking calls on the current thread.
+    current_thread->ReceiveSendsFromThread(this);
     current_thread->socketserver()->Wait(kForever, false);
     waited = true;
     crit_.Enter();
@@ -475,17 +476,23 @@
 }
 
 void Thread::ReceiveSends() {
+  ReceiveSendsFromThread(NULL);
+}
+
+void Thread::ReceiveSendsFromThread(const Thread* source) {
   // Receive a sent message. Cleanup scenarios:
   // - thread sending exits: We don't allow this, since thread can exit
   //   only via Join, so Send must complete.
   // - thread receiving exits: Wakeup/set ready in Thread::Clear()
   // - object target cleared: Wakeup/set ready in Thread::Clear()
+  _SendMessage smsg;
+
   crit_.Enter();
-  while (!sendlist_.empty()) {
-    _SendMessage smsg = sendlist_.front();
-    sendlist_.pop_front();
+  while (PopSendMessageFromThread(source, &smsg)) {
     crit_.Leave();
+
     smsg.msg.phandler->OnMessage(&smsg.msg);
+
     crit_.Enter();
     *smsg.ready = true;
     smsg.thread->socketserver()->WakeUp();
@@ -493,6 +500,18 @@
   crit_.Leave();
 }
 
+bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
+  for (std::list<_SendMessage>::iterator it = sendlist_.begin();
+       it != sendlist_.end(); ++it) {
+    if (it->thread == source || source == NULL) {
+      *msg = *it;
+      sendlist_.erase(it);
+      return true;
+    }
+  }
+  return false;
+}
+
 void Thread::Clear(MessageHandler *phandler, uint32 id,
                    MessageList* removed) {
   CritScope cs(&crit_);
diff --git a/webrtc/base/thread.h b/webrtc/base/thread.h
index 25b0f56..34ec45e 100644
--- a/webrtc/base/thread.h
+++ b/webrtc/base/thread.h
@@ -165,7 +165,6 @@
   // See ScopedDisallowBlockingCalls for details.
   template <class ReturnT, class FunctorT>
   ReturnT Invoke(const FunctorT& functor) {
-    AssertBlockingIsAllowedOnCurrentThread();
     FunctorMessageHandler<ReturnT, FunctorT> handler(functor);
     Send(&handler);
     return handler.result();
@@ -210,6 +209,10 @@
   // of whatever code is conditionally executing because of the return value!
   bool RunningForTest() { return running(); }
 
+  // Sets the per-thread allow-blocking-calls flag and returns the previous
+  // value.
+  bool SetAllowBlockingCalls(bool allow);
+
  protected:
   // This method should be called when thread is created using non standard
   // method, like derived implementation of rtc::Thread and it can not be
@@ -226,10 +229,6 @@
   // Blocks the calling thread until this thread has terminated.
   void Join();
 
-  // Sets the per-thread allow-blocking-calls flag and returns the previous
-  // value.
-  bool SetAllowBlockingCalls(bool allow);
-
   static void AssertBlockingIsAllowedOnCurrentThread();
 
   friend class ScopedDisallowBlockingCalls;
@@ -248,6 +247,16 @@
   // Return true if the thread was started and hasn't yet stopped.
   bool running() { return running_.Wait(0); }
 
+  // Processes received "Send" requests. If |source| is not NULL, only requests
+  // from |source| are processed, otherwise, all requests are processed.
+  void ReceiveSendsFromThread(const Thread* source);
+
+  // If |source| is not NULL, pops the first "Send" message from |source| in
+  // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.
+  // The caller must lock |crit_| before calling.
+  // Returns true if there is such a message.
+  bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
+
   std::list<_SendMessage> sendlist_;
   std::string name_;
   ThreadPriority priority_;
diff --git a/webrtc/base/thread_unittest.cc b/webrtc/base/thread_unittest.cc
index 4229df2..57b6df6 100644
--- a/webrtc/base/thread_unittest.cc
+++ b/webrtc/base/thread_unittest.cc
@@ -276,6 +276,78 @@
   thread.Invoke<void>(&LocalFuncs::Func2);
 }
 
+// Verifies that two threads calling Invoke on each other at the same time does
+// not deadlock.
+TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
+  AutoThread thread;
+  Thread* current_thread = Thread::Current();
+  ASSERT_TRUE(current_thread != NULL);
+
+  Thread other_thread;
+  other_thread.Start();
+
+  struct LocalFuncs {
+    static void Set(bool* out) { *out = true; }
+    static void InvokeSet(Thread* thread, bool* out) {
+      thread->Invoke<void>(Bind(&Set, out));
+    }
+  };
+
+  bool called = false;
+  other_thread.Invoke<void>(
+      Bind(&LocalFuncs::InvokeSet, current_thread, &called));
+
+  EXPECT_TRUE(called);
+}
+
+// Verifies that if thread A invokes a call on thread B and thread C is trying
+// to invoke A at the same time, thread A does not handle C's invoke while
+// invoking B.
+TEST(ThreadTest, ThreeThreadsInvoke) {
+  AutoThread thread;
+  Thread* thread_a = Thread::Current();
+  Thread thread_b, thread_c;
+  thread_b.Start();
+  thread_c.Start();
+
+  struct LocalFuncs {
+    static void Set(bool* out) { *out = true; }
+    static void InvokeSet(Thread* thread, bool* out) {
+      thread->Invoke<void>(Bind(&Set, out));
+    }
+
+    // Set |out| true and call InvokeSet on |thread|.
+    static void SetAndInvokeSet(bool* out, Thread* thread, bool* out_inner) {
+      *out = true;
+      InvokeSet(thread, out_inner);
+    }
+
+    // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until
+    // |thread1| starts the call.
+    static void AsyncInvokeSetAndWait(
+        Thread* thread1, Thread* thread2, bool* out) {
+      bool async_invoked = false;
+
+      AsyncInvoker invoker;
+      invoker.AsyncInvoke<void>(
+          thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out));
+
+      EXPECT_TRUE_WAIT(async_invoked, 2000);
+    }
+  };
+
+  bool thread_a_called = false;
+
+  // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A.
+  // Thread B returns when C receives the call and C should be blocked until A
+  // starts to process messages.
+  thread_b.Invoke<void>(Bind(&LocalFuncs::AsyncInvokeSetAndWait,
+                             &thread_c, thread_a, &thread_a_called));
+  EXPECT_FALSE(thread_a_called);
+
+  EXPECT_TRUE_WAIT(thread_a_called, 2000);
+}
+
 class AsyncInvokeTest : public testing::Test {
  public:
   void IntCallback(int value) {