blob: 06daf68b8376a8d98b17384773b470341ea936c8 [file] [log] [blame]
/*
Copyright (C) 2012 Samsung Electronics
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public License
along with this library; see the file COPYING.LIB. If not, write to
the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1301, USA.
*/
#include "config.h"
#include "WorkQueue.h"
#include <sys/timerfd.h>
#include <wtf/Assertions.h>
#include <wtf/CurrentTime.h>
static const int invalidSocketDescriptor = -1;
static const int threadMessageSize = 1;
static const char finishThreadMessage[] = "F";
static const char wakupThreadMessage[] = "W";
PassOwnPtr<WorkQueue::TimerWorkItem> WorkQueue::TimerWorkItem::create(Function<void()> function, double expireTime)
{
if (expireTime < 0)
return nullptr;
return adoptPtr(new TimerWorkItem(function, expireTime));
}
WorkQueue::TimerWorkItem::TimerWorkItem(Function<void()> function, double expireTime)
: m_function(function)
, m_expireTime(expireTime)
{
}
void WorkQueue::platformInitialize(const char* name)
{
int fds[2];
if (pipe(fds))
ASSERT_NOT_REACHED();
m_readFromPipeDescriptor = fds[0];
m_writeToPipeDescriptor = fds[1];
FD_ZERO(&m_fileDescriptorSet);
FD_SET(m_readFromPipeDescriptor, &m_fileDescriptorSet);
m_maxFileDescriptor = m_readFromPipeDescriptor;
m_socketDescriptor = invalidSocketDescriptor;
m_threadLoop = true;
createThread(reinterpret_cast<WTF::ThreadFunction>(&WorkQueue::workQueueThread), this, name);
}
void WorkQueue::platformInvalidate()
{
sendMessageToThread(finishThreadMessage);
}
void WorkQueue::performWork()
{
while (true) {
Vector<Function<void()> > workItemQueue;
{
MutexLocker locker(m_workItemQueueLock);
if (m_workItemQueue.isEmpty())
return;
m_workItemQueue.swap(workItemQueue);
}
for (size_t i = 0; i < workItemQueue.size(); ++i)
workItemQueue[i]();
}
}
void WorkQueue::performFileDescriptorWork()
{
fd_set readFileDescriptorSet = m_fileDescriptorSet;
if (select(m_maxFileDescriptor + 1, &readFileDescriptorSet, 0, 0, getNextTimeOut()) >= 0) {
if (FD_ISSET(m_readFromPipeDescriptor, &readFileDescriptorSet)) {
char readBuf[threadMessageSize];
if (read(m_readFromPipeDescriptor, readBuf, threadMessageSize) == -1)
LOG_ERROR("Failed to read from WorkQueueThread pipe");
if (!strncmp(readBuf, finishThreadMessage, threadMessageSize))
m_threadLoop = false;
}
if (m_socketDescriptor != invalidSocketDescriptor && FD_ISSET(m_socketDescriptor, &readFileDescriptorSet))
m_socketEventHandler();
}
}
struct timeval* WorkQueue::getNextTimeOut()
{
MutexLocker locker(m_timerWorkItemsLock);
if (m_timerWorkItems.isEmpty())
return 0;
static struct timeval timeValue;
timeValue.tv_sec = 0;
timeValue.tv_usec = 0;
double timeOut = m_timerWorkItems[0]->expireTime() - currentTime();
if (timeOut > 0) {
timeValue.tv_sec = static_cast<long>(timeOut);
timeValue.tv_usec = static_cast<long>((timeOut - timeValue.tv_sec) * 1000000);
}
return &timeValue;
}
void WorkQueue::insertTimerWorkItem(PassOwnPtr<TimerWorkItem> item)
{
if (!item)
return;
size_t position = 0;
MutexLocker locker(m_timerWorkItemsLock);
// m_timerWorkItems should be ordered by expire time.
for (; position < m_timerWorkItems.size(); ++position)
if (item->expireTime() < m_timerWorkItems[position]->expireTime())
break;
m_timerWorkItems.insert(position, item);
}
void WorkQueue::performTimerWork()
{
Vector<OwnPtr<TimerWorkItem> > timerWorkItems;
{
// Protects m_timerWorkItems.
MutexLocker locker(m_timerWorkItemsLock);
if (m_timerWorkItems.isEmpty())
return;
// Copies all the timer work items in m_timerWorkItems to local vector.
m_timerWorkItems.swap(timerWorkItems);
}
double current = currentTime();
for (size_t i = 0; i < timerWorkItems.size(); ++i) {
if (!timerWorkItems[i]->expired(current)) {
// If a timer work item does not expired, keep it to the m_timerWorkItems.
// m_timerWorkItems should be ordered by expire time.
insertTimerWorkItem(timerWorkItems[i].release());
continue;
}
// If a timer work item expired, dispatch the function of the work item.
timerWorkItems[i]->dispatch();
}
}
void WorkQueue::sendMessageToThread(const char* message)
{
MutexLocker locker(m_writeToPipeDescriptorLock);
if (write(m_writeToPipeDescriptor, message, threadMessageSize) == -1)
LOG_ERROR("Failed to wake up WorkQueue Thread");
}
void* WorkQueue::workQueueThread(WorkQueue* workQueue)
{
while (workQueue->m_threadLoop) {
workQueue->performWork();
workQueue->performTimerWork();
workQueue->performFileDescriptorWork();
}
close(workQueue->m_readFromPipeDescriptor);
close(workQueue->m_writeToPipeDescriptor);
return 0;
}
void WorkQueue::registerSocketEventHandler(int fileDescriptor, const Function<void()>& function)
{
if (m_socketDescriptor != invalidSocketDescriptor)
LOG_ERROR("%d is already registerd.", fileDescriptor);
m_socketDescriptor = fileDescriptor;
m_socketEventHandler = function;
if (fileDescriptor > m_maxFileDescriptor)
m_maxFileDescriptor = fileDescriptor;
FD_SET(fileDescriptor, &m_fileDescriptorSet);
}
void WorkQueue::unregisterSocketEventHandler(int fileDescriptor)
{
m_socketDescriptor = invalidSocketDescriptor;
if (fileDescriptor == m_maxFileDescriptor)
m_maxFileDescriptor = m_readFromPipeDescriptor;
FD_CLR(fileDescriptor, &m_fileDescriptorSet);
}
void WorkQueue::dispatch(const Function<void()>& function)
{
{
MutexLocker locker(m_workItemQueueLock);
m_workItemQueue.append(function);
}
sendMessageToThread(wakupThreadMessage);
}
void WorkQueue::dispatchAfterDelay(const Function<void()>& function, double delay)
{
if (delay < 0)
return;
OwnPtr<TimerWorkItem> timerWorkItem = TimerWorkItem::create(function, currentTime() + delay);
if (!timerWorkItem)
return;
insertTimerWorkItem(timerWorkItem.release());
sendMessageToThread(wakupThreadMessage);
}