Fix a problem in Thread::Send.
Previously if thread A->Send is called on thread B, B->ReceiveSends will be called, which enables an arbitrary thread to invoke calls on B while B is wait for A->Send to return. This caused mutliple problems like issue 3559, 3579.
The fix is to limit B->ReceiveSends to only process requests from A.
Also disallow the worker thread invoking other threads.

BUG=3559
R=juberti@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/15089004

git-svn-id: http://webrtc.googlecode.com/svn/trunk/webrtc@7290 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/base/thread.cc b/base/thread.cc
index 9d2917d..40257ab 100644
--- a/base/thread.cc
+++ b/base/thread.cc
@@ -411,15 +411,12 @@
 }
 
 void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
-  AssertBlockingIsAllowedOnCurrentThread();
-
   if (fStop_)
     return;
 
   // Sent messages are sent to the MessageHandler directly, in the context
   // of "thread", like Win32 SendMessage. If in the right context,
   // call the handler directly.
-
   Message msg;
   msg.phandler = phandler;
   msg.message_id = id;
@@ -429,6 +426,8 @@
     return;
   }
 
+  AssertBlockingIsAllowedOnCurrentThread();
+
   AutoThread thread;
   Thread *current_thread = Thread::Current();
   ASSERT(current_thread != NULL);  // AutoThread ensures this
@@ -451,7 +450,9 @@
   crit_.Enter();
   while (!ready) {
     crit_.Leave();
-    current_thread->ReceiveSends();
+    // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
+    // thread invoking calls on the current thread.
+    current_thread->ReceiveSendsFromThread(this);
     current_thread->socketserver()->Wait(kForever, false);
     waited = true;
     crit_.Enter();
@@ -475,17 +476,23 @@
 }
 
 void Thread::ReceiveSends() {
+  ReceiveSendsFromThread(NULL);
+}
+
+void Thread::ReceiveSendsFromThread(const Thread* source) {
   // Receive a sent message. Cleanup scenarios:
   // - thread sending exits: We don't allow this, since thread can exit
   //   only via Join, so Send must complete.
   // - thread receiving exits: Wakeup/set ready in Thread::Clear()
   // - object target cleared: Wakeup/set ready in Thread::Clear()
+  _SendMessage smsg;
+
   crit_.Enter();
-  while (!sendlist_.empty()) {
-    _SendMessage smsg = sendlist_.front();
-    sendlist_.pop_front();
+  while (PopSendMessageFromThread(source, &smsg)) {
     crit_.Leave();
+
     smsg.msg.phandler->OnMessage(&smsg.msg);
+
     crit_.Enter();
     *smsg.ready = true;
     smsg.thread->socketserver()->WakeUp();
@@ -493,6 +500,18 @@
   crit_.Leave();
 }
 
+bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
+  for (std::list<_SendMessage>::iterator it = sendlist_.begin();
+       it != sendlist_.end(); ++it) {
+    if (it->thread == source || source == NULL) {
+      *msg = *it;
+      sendlist_.erase(it);
+      return true;
+    }
+  }
+  return false;
+}
+
 void Thread::Clear(MessageHandler *phandler, uint32 id,
                    MessageList* removed) {
   CritScope cs(&crit_);
diff --git a/base/thread.h b/base/thread.h
index 25b0f56..34ec45e 100644
--- a/base/thread.h
+++ b/base/thread.h
@@ -165,7 +165,6 @@
   // See ScopedDisallowBlockingCalls for details.
   template <class ReturnT, class FunctorT>
   ReturnT Invoke(const FunctorT& functor) {
-    AssertBlockingIsAllowedOnCurrentThread();
     FunctorMessageHandler<ReturnT, FunctorT> handler(functor);
     Send(&handler);
     return handler.result();
@@ -210,6 +209,10 @@
   // of whatever code is conditionally executing because of the return value!
   bool RunningForTest() { return running(); }
 
+  // Sets the per-thread allow-blocking-calls flag and returns the previous
+  // value.
+  bool SetAllowBlockingCalls(bool allow);
+
  protected:
   // This method should be called when thread is created using non standard
   // method, like derived implementation of rtc::Thread and it can not be
@@ -226,10 +229,6 @@
   // Blocks the calling thread until this thread has terminated.
   void Join();
 
-  // Sets the per-thread allow-blocking-calls flag and returns the previous
-  // value.
-  bool SetAllowBlockingCalls(bool allow);
-
   static void AssertBlockingIsAllowedOnCurrentThread();
 
   friend class ScopedDisallowBlockingCalls;
@@ -248,6 +247,16 @@
   // Return true if the thread was started and hasn't yet stopped.
   bool running() { return running_.Wait(0); }
 
+  // Processes received "Send" requests. If |source| is not NULL, only requests
+  // from |source| are processed, otherwise, all requests are processed.
+  void ReceiveSendsFromThread(const Thread* source);
+
+  // If |source| is not NULL, pops the first "Send" message from |source| in
+  // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.
+  // The caller must lock |crit_| before calling.
+  // Returns true if there is such a message.
+  bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
+
   std::list<_SendMessage> sendlist_;
   std::string name_;
   ThreadPriority priority_;
diff --git a/base/thread_unittest.cc b/base/thread_unittest.cc
index 4229df2..57b6df6 100644
--- a/base/thread_unittest.cc
+++ b/base/thread_unittest.cc
@@ -276,6 +276,78 @@
   thread.Invoke<void>(&LocalFuncs::Func2);
 }
 
+// Verifies that two threads calling Invoke on each other at the same time does
+// not deadlock.
+TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
+  AutoThread thread;
+  Thread* current_thread = Thread::Current();
+  ASSERT_TRUE(current_thread != NULL);
+
+  Thread other_thread;
+  other_thread.Start();
+
+  struct LocalFuncs {
+    static void Set(bool* out) { *out = true; }
+    static void InvokeSet(Thread* thread, bool* out) {
+      thread->Invoke<void>(Bind(&Set, out));
+    }
+  };
+
+  bool called = false;
+  other_thread.Invoke<void>(
+      Bind(&LocalFuncs::InvokeSet, current_thread, &called));
+
+  EXPECT_TRUE(called);
+}
+
+// Verifies that if thread A invokes a call on thread B and thread C is trying
+// to invoke A at the same time, thread A does not handle C's invoke while
+// invoking B.
+TEST(ThreadTest, ThreeThreadsInvoke) {
+  AutoThread thread;
+  Thread* thread_a = Thread::Current();
+  Thread thread_b, thread_c;
+  thread_b.Start();
+  thread_c.Start();
+
+  struct LocalFuncs {
+    static void Set(bool* out) { *out = true; }
+    static void InvokeSet(Thread* thread, bool* out) {
+      thread->Invoke<void>(Bind(&Set, out));
+    }
+
+    // Set |out| true and call InvokeSet on |thread|.
+    static void SetAndInvokeSet(bool* out, Thread* thread, bool* out_inner) {
+      *out = true;
+      InvokeSet(thread, out_inner);
+    }
+
+    // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until
+    // |thread1| starts the call.
+    static void AsyncInvokeSetAndWait(
+        Thread* thread1, Thread* thread2, bool* out) {
+      bool async_invoked = false;
+
+      AsyncInvoker invoker;
+      invoker.AsyncInvoke<void>(
+          thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out));
+
+      EXPECT_TRUE_WAIT(async_invoked, 2000);
+    }
+  };
+
+  bool thread_a_called = false;
+
+  // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A.
+  // Thread B returns when C receives the call and C should be blocked until A
+  // starts to process messages.
+  thread_b.Invoke<void>(Bind(&LocalFuncs::AsyncInvokeSetAndWait,
+                             &thread_c, thread_a, &thread_a_called));
+  EXPECT_FALSE(thread_a_called);
+
+  EXPECT_TRUE_WAIT(thread_a_called, 2000);
+}
+
 class AsyncInvokeTest : public testing::Test {
  public:
   void IntCallback(int value) {