blob: 443ad073a29f766000a68b8ec3730879efbb42ba [file] [log] [blame] [edit]
/* ------------------------------------------------------------------
* Copyright (C) 1998-2009 PacketVideo
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied.
* See the License for the specific language governing permissions
* and limitations under the License.
* -------------------------------------------------------------------
*/
#include "threadsafe_queue.h"
//Only do sem wait on pre-emptive thread OS.
//The reason is that sem wait is not always available on
//non-pre-emptive OS (such as under Brew applet thread)
#if !(OSCL_HAS_NON_PREEMPTIVE_THREAD_SUPPORT)
#define USE_SEM_WAIT 1
#else
#define USE_SEM_WAIT 0
#endif
OSCL_EXPORT_REF ThreadSafeQueue::ThreadSafeQueue()
: OsclActiveObject(OsclActiveObject::EPriorityNominal, "ThreadSafeQueue")
{
iObserver = NULL;
iCounter = 1;
if (OsclThread::GetId(iThreadId) != OsclProcStatus::SUCCESS_ERROR)
OsclError::Leave(OsclErrSystemCallFailed);
#if USE_SEM_WAIT
iQueueReadySem.Create();
#endif
iQueueMut.Create();
AddToScheduler();
PendForExec();
iQueueReadySem.Signal();
}
OSCL_EXPORT_REF ThreadSafeQueue::~ThreadSafeQueue()
{
RemoveFromScheduler();
#if USE_SEM_WAIT
iQueueReadySem.Close();
#endif
iQueueMut.Close();
}
OSCL_EXPORT_REF bool ThreadSafeQueue::IsInThread()
{
TOsclThreadId id;
if (OsclThread::GetId(id) == OsclProcStatus::SUCCESS_ERROR)
{
return OsclThread::CompareId(id, iThreadId);
}
return false;
}
OSCL_EXPORT_REF void ThreadSafeQueue::Configure(ThreadSafeQueueObserver* aObs, uint32 aReserve, uint32 aId)
{
iQueueMut.Lock();
iObserver = aObs;
iQueue.reserve(aReserve);
iCounter = aId;
iQueueMut.Unlock();
}
OSCL_EXPORT_REF ThreadSafeQueueId ThreadSafeQueue::AddToQueue(OsclAny *EventData, ThreadSafeQueueId* aId)
{
iQueueMut.Lock();
uint32 count = (aId) ? *aId : ++iCounter;
ThreadSafeQueueElement elem(count, EventData);
iQueue.push_back(elem);
uint32 size = iQueue.size();
iQueueMut.Unlock();
//Signal the AO. Only signal when the queue was previously empty in order
// to minimize the amount of blocking in this call.
if (size == 1)
{
#if USE_SEM_WAIT
//Wait on the AO to be ready to be signaled.
iQueueReadySem.Wait();
PendComplete(OSCL_REQUEST_ERR_NONE);
#else
//To avoid problems under brew applet, don't do a sem wait here.
//instead just check AO status and signal if needed. It should
//not be possible to lose data, since the only time AO is *not* ready
//to be signaled is when a notification is already pending.
//The reason to not do this in all platforms is that Status() call
//is not thread-safe, but on non-preemptive OS it's ok here.
if (Status() == OSCL_REQUEST_PENDING)
PendComplete(OSCL_REQUEST_ERR_NONE);
#endif
}
return count;
}
OSCL_EXPORT_REF uint32 ThreadSafeQueue::DeQueue(ThreadSafeQueueId& aId, OsclAny*& aData)
{
uint32 num = 0;
iQueueMut.Lock();
if (iQueue.size())
{
aId = iQueue[0].iId;
aData = iQueue[0].iData;
iQueue.erase(&iQueue[0]);
num++;
}
iQueueMut.Unlock();
return num;
}
void ThreadSafeQueue::Run()
{
iQueueMut.Lock();
PendForExec();
#if USE_SEM_WAIT
iQueueReadySem.Signal();
#endif
uint32 count = iQueue.size();
ThreadSafeQueueObserver* obs = iObserver;
iQueueMut.Unlock();
//note: don't do the callback under the lock, in order to allow
//de-queueing the data in the callback. this creates the possibility
//that queue size may not equal "count" in the callback.
if (count && obs)
obs->ThreadSafeQueueDataAvailable(this);
}