Adding basic support for posting tasks to a process thread.
BUG=
R=magjed@webrtc.org
Review URL: https://webrtc-codereview.appspot.com/41099004
Cr-Commit-Position: refs/heads/master@{#8614}
git-svn-id: http://webrtc.googlecode.com/svn/trunk@8614 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/webrtc/modules/utility/interface/mock/mock_process_thread.h b/webrtc/modules/utility/interface/mock/mock_process_thread.h
index a722180..fd108a8 100644
--- a/webrtc/modules/utility/interface/mock/mock_process_thread.h
+++ b/webrtc/modules/utility/interface/mock/mock_process_thread.h
@@ -22,8 +22,16 @@
MOCK_METHOD0(Start, void());
MOCK_METHOD0(Stop, void());
MOCK_METHOD1(WakeUp, void(Module* module));
+ MOCK_METHOD1(PostTask, void(ProcessTask* task));
MOCK_METHOD1(RegisterModule, void(Module* module));
MOCK_METHOD1(DeRegisterModule, void(Module* module));
+
+ // MOCK_METHOD1 gets confused with mocking this method, so we work around it
+ // by overriding the method from the interface and forwarding the call to a
+ // mocked, simpler method.
+ void PostTask(rtc::scoped_ptr<ProcessTask> task) override {
+ PostTask(task.get());
+ }
};
} // namespace webrtc
diff --git a/webrtc/modules/utility/interface/process_thread.h b/webrtc/modules/utility/interface/process_thread.h
index e30325c..0e84506 100644
--- a/webrtc/modules/utility/interface/process_thread.h
+++ b/webrtc/modules/utility/interface/process_thread.h
@@ -17,6 +17,14 @@
namespace webrtc {
class Module;
+class ProcessTask {
+ public:
+ ProcessTask() {}
+ virtual ~ProcessTask() {}
+
+ virtual void Run() = 0;
+};
+
class ProcessThread {
public:
virtual ~ProcessThread();
@@ -36,6 +44,14 @@
// Can be called on any thread.
virtual void WakeUp(Module* module) = 0;
+ // Queues a task object to run on the worker thread. Ownership of the
+ // task object is transferred to the ProcessThread and the object will
+ // either be deleted after running on the worker thread, or on the
+ // construction thread of the ProcessThread instance, if the task did not
+ // get a chance to run (e.g. posting the task while shutting down or when
+ // the thread never runs).
+ virtual void PostTask(rtc::scoped_ptr<ProcessTask> task) = 0;
+
// Adds a module that will start to receive callbacks on the worker thread.
// Can be called from any thread.
virtual void RegisterModule(Module* module) = 0;
diff --git a/webrtc/modules/utility/source/process_thread_impl.cc b/webrtc/modules/utility/source/process_thread_impl.cc
index c408b2c..8268fff 100644
--- a/webrtc/modules/utility/source/process_thread_impl.cc
+++ b/webrtc/modules/utility/source/process_thread_impl.cc
@@ -51,6 +51,11 @@
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(!thread_.get());
DCHECK(!stop_);
+
+ while (!queue_.empty()) {
+ delete queue_.front();
+ queue_.pop();
+ }
}
void ProcessThreadImpl::Start() {
@@ -102,6 +107,15 @@
wake_up_->Set();
}
+void ProcessThreadImpl::PostTask(rtc::scoped_ptr<ProcessTask> task) {
+ // Allowed to be called on any thread.
+ {
+ rtc::CritScope lock(&lock_);
+ queue_.push(task.release());
+ }
+ wake_up_->Set();
+}
+
void ProcessThreadImpl::RegisterModule(Module* module) {
// Allowed to be called on any thread.
DCHECK(module);
@@ -155,6 +169,7 @@
bool ProcessThreadImpl::Process() {
int64_t now = TickTime::MillisecondTimestamp();
int64_t next_checkpoint = now + (1000 * 60);
+
{
rtc::CritScope lock(&lock_);
if (stop_)
@@ -180,6 +195,15 @@
if (m.next_callback < next_checkpoint)
next_checkpoint = m.next_callback;
}
+
+ while (!queue_.empty()) {
+ ProcessTask* task = queue_.front();
+ queue_.pop();
+ lock_.Leave();
+ task->Run();
+ delete task;
+ lock_.Enter();
+ }
}
int64_t time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp();
diff --git a/webrtc/modules/utility/source/process_thread_impl.h b/webrtc/modules/utility/source/process_thread_impl.h
index 0ba6f1f..1fd2bf3 100644
--- a/webrtc/modules/utility/source/process_thread_impl.h
+++ b/webrtc/modules/utility/source/process_thread_impl.h
@@ -12,6 +12,7 @@
#define WEBRTC_MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_
#include <list>
+#include <queue>
#include "webrtc/base/criticalsection.h"
#include "webrtc/base/thread_checker.h"
@@ -31,6 +32,7 @@
void Stop() override;
void WakeUp(Module* module) override;
+ void PostTask(rtc::scoped_ptr<ProcessTask> task) override;
void RegisterModule(Module* module) override;
void DeRegisterModule(Module* module) override;
@@ -64,13 +66,15 @@
// issues, but I haven't figured out what they are, if there are alignment
// requirements for mutexes on Mac or if there's something else to it.
// So be careful with changing the layout.
- rtc::CriticalSection lock_; // Used to guard modules_ and stop_.
+ rtc::CriticalSection lock_; // Used to guard modules_, tasks_ and stop_.
rtc::ThreadChecker thread_checker_;
const rtc::scoped_ptr<EventWrapper> wake_up_;
rtc::scoped_ptr<ThreadWrapper> thread_;
ModuleList modules_;
+ // TODO(tommi): Support delayed tasks.
+ std::queue<ProcessTask*> queue_;
bool stop_;
};
diff --git a/webrtc/modules/utility/source/process_thread_impl_unittest.cc b/webrtc/modules/utility/source/process_thread_impl_unittest.cc
index 47af4d8..cd1f956 100644
--- a/webrtc/modules/utility/source/process_thread_impl_unittest.cc
+++ b/webrtc/modules/utility/source/process_thread_impl_unittest.cc
@@ -30,6 +30,15 @@
MOCK_METHOD1(ProcessThreadAttached, void(ProcessThread*));
};
+class RaiseEventTask : public ProcessTask {
+ public:
+ RaiseEventTask(EventWrapper* event) : event_(event) {}
+ void Run() override { event_->Set(); }
+
+ private:
+ EventWrapper* event_;
+};
+
ACTION_P(SetEvent, event) {
event->Set();
}
@@ -280,4 +289,16 @@
EXPECT_LE(diff, 100u);
}
+// Tests that we can post a task that gets run straight away on the worker
+// thread.
+TEST(ProcessThreadImpl, PostTask) {
+ ProcessThreadImpl thread;
+ rtc::scoped_ptr<EventWrapper> task_ran(EventWrapper::Create());
+ rtc::scoped_ptr<RaiseEventTask> task(new RaiseEventTask(task_ran.get()));
+ thread.Start();
+ thread.PostTask(task.Pass());
+ EXPECT_EQ(kEventSignaled, task_ran->Wait(100));
+ thread.Stop();
+}
+
} // namespace webrtc