Adding basic support for posting tasks to a process thread.

BUG=
R=magjed@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/41099004

Cr-Commit-Position: refs/heads/master@{#8614}
git-svn-id: http://webrtc.googlecode.com/svn/trunk@8614 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/webrtc/modules/utility/interface/mock/mock_process_thread.h b/webrtc/modules/utility/interface/mock/mock_process_thread.h
index a722180..fd108a8 100644
--- a/webrtc/modules/utility/interface/mock/mock_process_thread.h
+++ b/webrtc/modules/utility/interface/mock/mock_process_thread.h
@@ -22,8 +22,16 @@
   MOCK_METHOD0(Start, void());
   MOCK_METHOD0(Stop, void());
   MOCK_METHOD1(WakeUp, void(Module* module));
+  MOCK_METHOD1(PostTask, void(ProcessTask* task));
   MOCK_METHOD1(RegisterModule, void(Module* module));
   MOCK_METHOD1(DeRegisterModule, void(Module* module));
+
+  // MOCK_METHOD1 gets confused with mocking this method, so we work around it
+  // by overriding the method from the interface and forwarding the call to a
+  // mocked, simpler method.
+  void PostTask(rtc::scoped_ptr<ProcessTask> task) override {
+    PostTask(task.get());
+  }
 };
 
 }  // namespace webrtc
diff --git a/webrtc/modules/utility/interface/process_thread.h b/webrtc/modules/utility/interface/process_thread.h
index e30325c..0e84506 100644
--- a/webrtc/modules/utility/interface/process_thread.h
+++ b/webrtc/modules/utility/interface/process_thread.h
@@ -17,6 +17,14 @@
 namespace webrtc {
 class Module;
 
+class ProcessTask {
+ public:
+  ProcessTask() {}
+  virtual ~ProcessTask() {}
+
+  virtual void Run() = 0;
+};
+
 class ProcessThread {
  public:
   virtual ~ProcessThread();
@@ -36,6 +44,14 @@
   // Can be called on any thread.
   virtual void WakeUp(Module* module) = 0;
 
+  // Queues a task object to run on the worker thread.  Ownership of the
+  // task object is transferred to the ProcessThread and the object will
+  // either be deleted after running on the worker thread, or on the
+  // construction thread of the ProcessThread instance, if the task did not
+  // get a chance to run (e.g. posting the task while shutting down or when
+  // the thread never runs).
+  virtual void PostTask(rtc::scoped_ptr<ProcessTask> task) = 0;
+
   // Adds a module that will start to receive callbacks on the worker thread.
   // Can be called from any thread.
   virtual void RegisterModule(Module* module) = 0;
diff --git a/webrtc/modules/utility/source/process_thread_impl.cc b/webrtc/modules/utility/source/process_thread_impl.cc
index c408b2c..8268fff 100644
--- a/webrtc/modules/utility/source/process_thread_impl.cc
+++ b/webrtc/modules/utility/source/process_thread_impl.cc
@@ -51,6 +51,11 @@
   DCHECK(thread_checker_.CalledOnValidThread());
   DCHECK(!thread_.get());
   DCHECK(!stop_);
+
+  while (!queue_.empty()) {
+    delete queue_.front();
+    queue_.pop();
+  }
 }
 
 void ProcessThreadImpl::Start() {
@@ -102,6 +107,15 @@
   wake_up_->Set();
 }
 
+void ProcessThreadImpl::PostTask(rtc::scoped_ptr<ProcessTask> task) {
+  // Allowed to be called on any thread.
+  {
+    rtc::CritScope lock(&lock_);
+    queue_.push(task.release());
+  }
+  wake_up_->Set();
+}
+
 void ProcessThreadImpl::RegisterModule(Module* module) {
   // Allowed to be called on any thread.
   DCHECK(module);
@@ -155,6 +169,7 @@
 bool ProcessThreadImpl::Process() {
   int64_t now = TickTime::MillisecondTimestamp();
   int64_t next_checkpoint = now + (1000 * 60);
+
   {
     rtc::CritScope lock(&lock_);
     if (stop_)
@@ -180,6 +195,15 @@
       if (m.next_callback < next_checkpoint)
         next_checkpoint = m.next_callback;
     }
+
+    while (!queue_.empty()) {
+      ProcessTask* task = queue_.front();
+      queue_.pop();
+      lock_.Leave();
+      task->Run();
+      delete task;
+      lock_.Enter();
+    }
   }
 
   int64_t time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp();
diff --git a/webrtc/modules/utility/source/process_thread_impl.h b/webrtc/modules/utility/source/process_thread_impl.h
index 0ba6f1f..1fd2bf3 100644
--- a/webrtc/modules/utility/source/process_thread_impl.h
+++ b/webrtc/modules/utility/source/process_thread_impl.h
@@ -12,6 +12,7 @@
 #define WEBRTC_MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_
 
 #include <list>
+#include <queue>
 
 #include "webrtc/base/criticalsection.h"
 #include "webrtc/base/thread_checker.h"
@@ -31,6 +32,7 @@
   void Stop() override;
 
   void WakeUp(Module* module) override;
+  void PostTask(rtc::scoped_ptr<ProcessTask> task) override;
 
   void RegisterModule(Module* module) override;
   void DeRegisterModule(Module* module) override;
@@ -64,13 +66,15 @@
   // issues, but I haven't figured out what they are, if there are alignment
   // requirements for mutexes on Mac or if there's something else to it.
   // So be careful with changing the layout.
-  rtc::CriticalSection lock_;  // Used to guard modules_ and stop_.
+  rtc::CriticalSection lock_;  // Used to guard modules_, tasks_ and stop_.
 
   rtc::ThreadChecker thread_checker_;
   const rtc::scoped_ptr<EventWrapper> wake_up_;
   rtc::scoped_ptr<ThreadWrapper> thread_;
 
   ModuleList modules_;
+  // TODO(tommi): Support delayed tasks.
+  std::queue<ProcessTask*> queue_;
   bool stop_;
 };
 
diff --git a/webrtc/modules/utility/source/process_thread_impl_unittest.cc b/webrtc/modules/utility/source/process_thread_impl_unittest.cc
index 47af4d8..cd1f956 100644
--- a/webrtc/modules/utility/source/process_thread_impl_unittest.cc
+++ b/webrtc/modules/utility/source/process_thread_impl_unittest.cc
@@ -30,6 +30,15 @@
   MOCK_METHOD1(ProcessThreadAttached, void(ProcessThread*));
 };
 
+class RaiseEventTask : public ProcessTask {
+ public:
+  RaiseEventTask(EventWrapper* event) : event_(event) {}
+  void Run() override { event_->Set(); }
+
+ private:
+  EventWrapper* event_;
+};
+
 ACTION_P(SetEvent, event) {
   event->Set();
 }
@@ -280,4 +289,16 @@
   EXPECT_LE(diff, 100u);
 }
 
+// Tests that we can post a task that gets run straight away on the worker
+// thread.
+TEST(ProcessThreadImpl, PostTask) {
+  ProcessThreadImpl thread;
+  rtc::scoped_ptr<EventWrapper> task_ran(EventWrapper::Create());
+  rtc::scoped_ptr<RaiseEventTask> task(new RaiseEventTask(task_ran.get()));
+  thread.Start();
+  thread.PostTask(task.Pass());
+  EXPECT_EQ(kEventSignaled, task_ran->Wait(100));
+  thread.Stop();
+}
+
 }  // namespace webrtc