/* MtCoder.c -- Multi-thread Coder | |
2018-07-04 : Igor Pavlov : Public domain */ | |
#include "Precomp.h" | |
#include "MtCoder.h" | |
#ifndef _7ZIP_ST | |
SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize) | |
{ | |
CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt); | |
UInt64 inSize2 = 0; | |
UInt64 outSize2 = 0; | |
if (inSize != (UInt64)(Int64)-1) | |
{ | |
inSize2 = inSize - thunk->inSize; | |
thunk->inSize = inSize; | |
} | |
if (outSize != (UInt64)(Int64)-1) | |
{ | |
outSize2 = outSize - thunk->outSize; | |
thunk->outSize = outSize; | |
} | |
return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2); | |
} | |
void MtProgressThunk_CreateVTable(CMtProgressThunk *p) | |
{ | |
p->vt.Progress = MtProgressThunk_Progress; | |
} | |
#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; } | |
static WRes ArEvent_OptCreate_And_Reset(CEvent *p) | |
{ | |
if (Event_IsCreated(p)) | |
return Event_Reset(p); | |
return AutoResetEvent_CreateNotSignaled(p); | |
} | |
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp); | |
static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t) | |
{ | |
WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent); | |
if (wres == 0) | |
{ | |
t->stop = False; | |
if (!Thread_WasCreated(&t->thread)) | |
wres = Thread_Create(&t->thread, ThreadFunc, t); | |
if (wres == 0) | |
wres = Event_Set(&t->startEvent); | |
} | |
if (wres == 0) | |
return SZ_OK; | |
return MY_SRes_HRESULT_FROM_WRes(wres); | |
} | |
static void MtCoderThread_Destruct(CMtCoderThread *t) | |
{ | |
if (Thread_WasCreated(&t->thread)) | |
{ | |
t->stop = 1; | |
Event_Set(&t->startEvent); | |
Thread_Wait(&t->thread); | |
Thread_Close(&t->thread); | |
} | |
Event_Close(&t->startEvent); | |
if (t->inBuf) | |
{ | |
ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf); | |
t->inBuf = NULL; | |
} | |
} | |
static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize) | |
{ | |
size_t size = *processedSize; | |
*processedSize = 0; | |
while (size != 0) | |
{ | |
size_t cur = size; | |
SRes res = ISeqInStream_Read(stream, data, &cur); | |
*processedSize += cur; | |
data += cur; | |
size -= cur; | |
RINOK(res); | |
if (cur == 0) | |
return SZ_OK; | |
} | |
return SZ_OK; | |
} | |
/* | |
ThreadFunc2() returns: | |
SZ_OK - in all normal cases (even for stream error or memory allocation error) | |
SZ_ERROR_THREAD - in case of failure in system synch function | |
*/ | |
static SRes ThreadFunc2(CMtCoderThread *t) | |
{ | |
CMtCoder *mtc = t->mtCoder; | |
for (;;) | |
{ | |
unsigned bi; | |
SRes res; | |
SRes res2; | |
BoolInt finished; | |
unsigned bufIndex; | |
size_t size; | |
const Byte *inData; | |
UInt64 readProcessed = 0; | |
RINOK_THREAD(Event_Wait(&mtc->readEvent)) | |
/* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */ | |
if (mtc->stopReading) | |
{ | |
return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD; | |
} | |
res = MtProgress_GetError(&mtc->mtProgress); | |
size = 0; | |
inData = NULL; | |
finished = True; | |
if (res == SZ_OK) | |
{ | |
size = mtc->blockSize; | |
if (mtc->inStream) | |
{ | |
if (!t->inBuf) | |
{ | |
t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize); | |
if (!t->inBuf) | |
res = SZ_ERROR_MEM; | |
} | |
if (res == SZ_OK) | |
{ | |
res = FullRead(mtc->inStream, t->inBuf, &size); | |
readProcessed = mtc->readProcessed + size; | |
mtc->readProcessed = readProcessed; | |
} | |
if (res != SZ_OK) | |
{ | |
mtc->readRes = res; | |
/* after reading error - we can stop encoding of previous blocks */ | |
MtProgress_SetError(&mtc->mtProgress, res); | |
} | |
else | |
finished = (size != mtc->blockSize); | |
} | |
else | |
{ | |
size_t rem; | |
readProcessed = mtc->readProcessed; | |
rem = mtc->inDataSize - (size_t)readProcessed; | |
if (size > rem) | |
size = rem; | |
inData = mtc->inData + (size_t)readProcessed; | |
readProcessed += size; | |
mtc->readProcessed = readProcessed; | |
finished = (mtc->inDataSize == (size_t)readProcessed); | |
} | |
} | |
/* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */ | |
res2 = SZ_OK; | |
if (Semaphore_Wait(&mtc->blocksSemaphore) != 0) | |
{ | |
res2 = SZ_ERROR_THREAD; | |
if (res == SZ_OK) | |
{ | |
res = res2; | |
// MtProgress_SetError(&mtc->mtProgress, res); | |
} | |
} | |
bi = mtc->blockIndex; | |
if (++mtc->blockIndex >= mtc->numBlocksMax) | |
mtc->blockIndex = 0; | |
bufIndex = (unsigned)(int)-1; | |
if (res == SZ_OK) | |
res = MtProgress_GetError(&mtc->mtProgress); | |
if (res != SZ_OK) | |
finished = True; | |
if (!finished) | |
{ | |
if (mtc->numStartedThreads < mtc->numStartedThreadsLimit | |
&& mtc->expectedDataSize != readProcessed) | |
{ | |
res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]); | |
if (res == SZ_OK) | |
mtc->numStartedThreads++; | |
else | |
{ | |
MtProgress_SetError(&mtc->mtProgress, res); | |
finished = True; | |
} | |
} | |
} | |
if (finished) | |
mtc->stopReading = True; | |
RINOK_THREAD(Event_Set(&mtc->readEvent)) | |
if (res2 != SZ_OK) | |
return res2; | |
if (res == SZ_OK) | |
{ | |
CriticalSection_Enter(&mtc->cs); | |
bufIndex = mtc->freeBlockHead; | |
mtc->freeBlockHead = mtc->freeBlockList[bufIndex]; | |
CriticalSection_Leave(&mtc->cs); | |
res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex, | |
mtc->inStream ? t->inBuf : inData, size, finished); | |
// MtProgress_Reinit(&mtc->mtProgress, t->index); | |
if (res != SZ_OK) | |
MtProgress_SetError(&mtc->mtProgress, res); | |
} | |
{ | |
CMtCoderBlock *block = &mtc->blocks[bi]; | |
block->res = res; | |
block->bufIndex = bufIndex; | |
block->finished = finished; | |
} | |
#ifdef MTCODER__USE_WRITE_THREAD | |
RINOK_THREAD(Event_Set(&mtc->writeEvents[bi])) | |
#else | |
{ | |
unsigned wi; | |
{ | |
CriticalSection_Enter(&mtc->cs); | |
wi = mtc->writeIndex; | |
if (wi == bi) | |
mtc->writeIndex = (unsigned)(int)-1; | |
else | |
mtc->ReadyBlocks[bi] = True; | |
CriticalSection_Leave(&mtc->cs); | |
} | |
if (wi != bi) | |
{ | |
if (res != SZ_OK || finished) | |
return 0; | |
continue; | |
} | |
if (mtc->writeRes != SZ_OK) | |
res = mtc->writeRes; | |
for (;;) | |
{ | |
if (res == SZ_OK && bufIndex != (unsigned)(int)-1) | |
{ | |
res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex); | |
if (res != SZ_OK) | |
{ | |
mtc->writeRes = res; | |
MtProgress_SetError(&mtc->mtProgress, res); | |
} | |
} | |
if (++wi >= mtc->numBlocksMax) | |
wi = 0; | |
{ | |
BoolInt isReady; | |
CriticalSection_Enter(&mtc->cs); | |
if (bufIndex != (unsigned)(int)-1) | |
{ | |
mtc->freeBlockList[bufIndex] = mtc->freeBlockHead; | |
mtc->freeBlockHead = bufIndex; | |
} | |
isReady = mtc->ReadyBlocks[wi]; | |
if (isReady) | |
mtc->ReadyBlocks[wi] = False; | |
else | |
mtc->writeIndex = wi; | |
CriticalSection_Leave(&mtc->cs); | |
RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore)) | |
if (!isReady) | |
break; | |
} | |
{ | |
CMtCoderBlock *block = &mtc->blocks[wi]; | |
if (res == SZ_OK && block->res != SZ_OK) | |
res = block->res; | |
bufIndex = block->bufIndex; | |
finished = block->finished; | |
} | |
} | |
} | |
#endif | |
if (finished || res != SZ_OK) | |
return 0; | |
} | |
} | |
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp) | |
{ | |
CMtCoderThread *t = (CMtCoderThread *)pp; | |
for (;;) | |
{ | |
if (Event_Wait(&t->startEvent) != 0) | |
return SZ_ERROR_THREAD; | |
if (t->stop) | |
return 0; | |
{ | |
SRes res = ThreadFunc2(t); | |
CMtCoder *mtc = t->mtCoder; | |
if (res != SZ_OK) | |
{ | |
MtProgress_SetError(&mtc->mtProgress, res); | |
} | |
#ifndef MTCODER__USE_WRITE_THREAD | |
{ | |
unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads); | |
if (numFinished == mtc->numStartedThreads) | |
if (Event_Set(&mtc->finishedEvent) != 0) | |
return SZ_ERROR_THREAD; | |
} | |
#endif | |
} | |
} | |
} | |
void MtCoder_Construct(CMtCoder *p) | |
{ | |
unsigned i; | |
p->blockSize = 0; | |
p->numThreadsMax = 0; | |
p->expectedDataSize = (UInt64)(Int64)-1; | |
p->inStream = NULL; | |
p->inData = NULL; | |
p->inDataSize = 0; | |
p->progress = NULL; | |
p->allocBig = NULL; | |
p->mtCallback = NULL; | |
p->mtCallbackObject = NULL; | |
p->allocatedBufsSize = 0; | |
Event_Construct(&p->readEvent); | |
Semaphore_Construct(&p->blocksSemaphore); | |
for (i = 0; i < MTCODER__THREADS_MAX; i++) | |
{ | |
CMtCoderThread *t = &p->threads[i]; | |
t->mtCoder = p; | |
t->index = i; | |
t->inBuf = NULL; | |
t->stop = False; | |
Event_Construct(&t->startEvent); | |
Thread_Construct(&t->thread); | |
} | |
#ifdef MTCODER__USE_WRITE_THREAD | |
for (i = 0; i < MTCODER__BLOCKS_MAX; i++) | |
Event_Construct(&p->writeEvents[i]); | |
#else | |
Event_Construct(&p->finishedEvent); | |
#endif | |
CriticalSection_Init(&p->cs); | |
CriticalSection_Init(&p->mtProgress.cs); | |
} | |
static void MtCoder_Free(CMtCoder *p) | |
{ | |
unsigned i; | |
/* | |
p->stopReading = True; | |
if (Event_IsCreated(&p->readEvent)) | |
Event_Set(&p->readEvent); | |
*/ | |
for (i = 0; i < MTCODER__THREADS_MAX; i++) | |
MtCoderThread_Destruct(&p->threads[i]); | |
Event_Close(&p->readEvent); | |
Semaphore_Close(&p->blocksSemaphore); | |
#ifdef MTCODER__USE_WRITE_THREAD | |
for (i = 0; i < MTCODER__BLOCKS_MAX; i++) | |
Event_Close(&p->writeEvents[i]); | |
#else | |
Event_Close(&p->finishedEvent); | |
#endif | |
} | |
void MtCoder_Destruct(CMtCoder *p) | |
{ | |
MtCoder_Free(p); | |
CriticalSection_Delete(&p->cs); | |
CriticalSection_Delete(&p->mtProgress.cs); | |
} | |
SRes MtCoder_Code(CMtCoder *p) | |
{ | |
unsigned numThreads = p->numThreadsMax; | |
unsigned numBlocksMax; | |
unsigned i; | |
SRes res = SZ_OK; | |
if (numThreads > MTCODER__THREADS_MAX) | |
numThreads = MTCODER__THREADS_MAX; | |
numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads); | |
if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++; | |
if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++; | |
if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++; | |
if (numBlocksMax > MTCODER__BLOCKS_MAX) | |
numBlocksMax = MTCODER__BLOCKS_MAX; | |
if (p->blockSize != p->allocatedBufsSize) | |
{ | |
for (i = 0; i < MTCODER__THREADS_MAX; i++) | |
{ | |
CMtCoderThread *t = &p->threads[i]; | |
if (t->inBuf) | |
{ | |
ISzAlloc_Free(p->allocBig, t->inBuf); | |
t->inBuf = NULL; | |
} | |
} | |
p->allocatedBufsSize = p->blockSize; | |
} | |
p->readRes = SZ_OK; | |
MtProgress_Init(&p->mtProgress, p->progress); | |
#ifdef MTCODER__USE_WRITE_THREAD | |
for (i = 0; i < numBlocksMax; i++) | |
{ | |
RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i])); | |
} | |
#else | |
RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent)); | |
#endif | |
{ | |
RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent)); | |
if (Semaphore_IsCreated(&p->blocksSemaphore)) | |
{ | |
RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore)); | |
} | |
RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax)); | |
} | |
for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++) | |
p->freeBlockList[i] = i + 1; | |
p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1; | |
p->freeBlockHead = 0; | |
p->readProcessed = 0; | |
p->blockIndex = 0; | |
p->numBlocksMax = numBlocksMax; | |
p->stopReading = False; | |
#ifndef MTCODER__USE_WRITE_THREAD | |
p->writeIndex = 0; | |
p->writeRes = SZ_OK; | |
for (i = 0; i < MTCODER__BLOCKS_MAX; i++) | |
p->ReadyBlocks[i] = False; | |
p->numFinishedThreads = 0; | |
#endif | |
p->numStartedThreadsLimit = numThreads; | |
p->numStartedThreads = 0; | |
// for (i = 0; i < numThreads; i++) | |
{ | |
CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++]; | |
RINOK(MtCoderThread_CreateAndStart(nextThread)); | |
} | |
RINOK_THREAD(Event_Set(&p->readEvent)) | |
#ifdef MTCODER__USE_WRITE_THREAD | |
{ | |
unsigned bi = 0; | |
for (;; bi++) | |
{ | |
if (bi >= numBlocksMax) | |
bi = 0; | |
RINOK_THREAD(Event_Wait(&p->writeEvents[bi])) | |
{ | |
const CMtCoderBlock *block = &p->blocks[bi]; | |
unsigned bufIndex = block->bufIndex; | |
BoolInt finished = block->finished; | |
if (res == SZ_OK && block->res != SZ_OK) | |
res = block->res; | |
if (bufIndex != (unsigned)(int)-1) | |
{ | |
if (res == SZ_OK) | |
{ | |
res = p->mtCallback->Write(p->mtCallbackObject, bufIndex); | |
if (res != SZ_OK) | |
MtProgress_SetError(&p->mtProgress, res); | |
} | |
CriticalSection_Enter(&p->cs); | |
{ | |
p->freeBlockList[bufIndex] = p->freeBlockHead; | |
p->freeBlockHead = bufIndex; | |
} | |
CriticalSection_Leave(&p->cs); | |
} | |
RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore)) | |
if (finished) | |
break; | |
} | |
} | |
} | |
#else | |
{ | |
WRes wres = Event_Wait(&p->finishedEvent); | |
res = MY_SRes_HRESULT_FROM_WRes(wres); | |
} | |
#endif | |
if (res == SZ_OK) | |
res = p->readRes; | |
if (res == SZ_OK) | |
res = p->mtProgress.res; | |
#ifndef MTCODER__USE_WRITE_THREAD | |
if (res == SZ_OK) | |
res = p->writeRes; | |
#endif | |
if (res != SZ_OK) | |
MtCoder_Free(p); | |
return res; | |
} | |
#endif |