/* MtDec.c -- Multi-thread Decoder | |
2018-07-04 : Igor Pavlov : Public domain */ | |
#include "Precomp.h" | |
// #define SHOW_DEBUG_INFO | |
// #include <stdio.h> | |
#ifdef SHOW_DEBUG_INFO | |
#include <stdio.h> | |
#endif | |
#ifdef SHOW_DEBUG_INFO | |
#define PRF(x) x | |
#else | |
#define PRF(x) | |
#endif | |
#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d)) | |
#include "MtDec.h" | |
#ifndef _7ZIP_ST | |
void MtProgress_Init(CMtProgress *p, ICompressProgress *progress) | |
{ | |
p->progress = progress; | |
p->res = SZ_OK; | |
p->totalInSize = 0; | |
p->totalOutSize = 0; | |
} | |
SRes MtProgress_Progress_ST(CMtProgress *p) | |
{ | |
if (p->res == SZ_OK && p->progress) | |
if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK) | |
p->res = SZ_ERROR_PROGRESS; | |
return p->res; | |
} | |
SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize) | |
{ | |
SRes res; | |
CriticalSection_Enter(&p->cs); | |
p->totalInSize += inSize; | |
p->totalOutSize += outSize; | |
if (p->res == SZ_OK && p->progress) | |
if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK) | |
p->res = SZ_ERROR_PROGRESS; | |
res = p->res; | |
CriticalSection_Leave(&p->cs); | |
return res; | |
} | |
SRes MtProgress_GetError(CMtProgress *p) | |
{ | |
SRes res; | |
CriticalSection_Enter(&p->cs); | |
res = p->res; | |
CriticalSection_Leave(&p->cs); | |
return res; | |
} | |
void MtProgress_SetError(CMtProgress *p, SRes res) | |
{ | |
CriticalSection_Enter(&p->cs); | |
if (p->res == SZ_OK) | |
p->res = res; | |
CriticalSection_Leave(&p->cs); | |
} | |
#define RINOK_THREAD(x) RINOK(x) | |
static WRes ArEvent_OptCreate_And_Reset(CEvent *p) | |
{ | |
if (Event_IsCreated(p)) | |
return Event_Reset(p); | |
return AutoResetEvent_CreateNotSignaled(p); | |
} | |
typedef struct | |
{ | |
void *next; | |
void *pad[3]; | |
} CMtDecBufLink; | |
#define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink) | |
#define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET) | |
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp); | |
static WRes MtDecThread_CreateEvents(CMtDecThread *t) | |
{ | |
WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite); | |
if (wres == 0) | |
{ | |
wres = ArEvent_OptCreate_And_Reset(&t->canRead); | |
if (wres == 0) | |
return SZ_OK; | |
} | |
return wres; | |
} | |
static SRes MtDecThread_CreateAndStart(CMtDecThread *t) | |
{ | |
WRes wres = MtDecThread_CreateEvents(t); | |
// wres = 17; // for test | |
if (wres == 0) | |
{ | |
if (Thread_WasCreated(&t->thread)) | |
return SZ_OK; | |
wres = Thread_Create(&t->thread, ThreadFunc, t); | |
if (wres == 0) | |
return SZ_OK; | |
} | |
return MY_SRes_HRESULT_FROM_WRes(wres); | |
} | |
void MtDecThread_FreeInBufs(CMtDecThread *t) | |
{ | |
if (t->inBuf) | |
{ | |
void *link = t->inBuf; | |
t->inBuf = NULL; | |
do | |
{ | |
void *next = ((CMtDecBufLink *)link)->next; | |
ISzAlloc_Free(t->mtDec->alloc, link); | |
link = next; | |
} | |
while (link); | |
} | |
} | |
static void MtDecThread_CloseThread(CMtDecThread *t) | |
{ | |
if (Thread_WasCreated(&t->thread)) | |
{ | |
Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */ | |
Event_Set(&t->canRead); | |
Thread_Wait(&t->thread); | |
Thread_Close(&t->thread); | |
} | |
Event_Close(&t->canRead); | |
Event_Close(&t->canWrite); | |
} | |
static void MtDec_CloseThreads(CMtDec *p) | |
{ | |
unsigned i; | |
for (i = 0; i < MTDEC__THREADS_MAX; i++) | |
MtDecThread_CloseThread(&p->threads[i]); | |
} | |
static void MtDecThread_Destruct(CMtDecThread *t) | |
{ | |
MtDecThread_CloseThread(t); | |
MtDecThread_FreeInBufs(t); | |
} | |
static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize) | |
{ | |
size_t size = *processedSize; | |
*processedSize = 0; | |
while (size != 0) | |
{ | |
size_t cur = size; | |
SRes res = ISeqInStream_Read(stream, data, &cur); | |
*processedSize += cur; | |
data += cur; | |
size -= cur; | |
RINOK(res); | |
if (cur == 0) | |
return SZ_OK; | |
} | |
return SZ_OK; | |
} | |
static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted) | |
{ | |
SRes res; | |
CriticalSection_Enter(&p->mtProgress.cs); | |
*wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex); | |
res = p->mtProgress.res; | |
CriticalSection_Leave(&p->mtProgress.cs); | |
return res; | |
} | |
static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted) | |
{ | |
SRes res; | |
CriticalSection_Enter(&p->mtProgress.cs); | |
p->mtProgress.totalInSize += inSize; | |
p->mtProgress.totalOutSize += outSize; | |
if (p->mtProgress.res == SZ_OK && p->mtProgress.progress) | |
if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK) | |
p->mtProgress.res = SZ_ERROR_PROGRESS; | |
*wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex); | |
res = p->mtProgress.res; | |
CriticalSection_Leave(&p->mtProgress.cs); | |
return res; | |
} | |
static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex) | |
{ | |
CriticalSection_Enter(&p->mtProgress.cs); | |
if (!p->needInterrupt || interruptIndex < p->interruptIndex) | |
{ | |
p->interruptIndex = interruptIndex; | |
p->needInterrupt = True; | |
} | |
CriticalSection_Leave(&p->mtProgress.cs); | |
} | |
Byte *MtDec_GetCrossBuff(CMtDec *p) | |
{ | |
Byte *cr = p->crossBlock; | |
if (!cr) | |
{ | |
cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize); | |
if (!cr) | |
return NULL; | |
p->crossBlock = cr; | |
} | |
return MTDEC__DATA_PTR_FROM_LINK(cr); | |
} | |
/* | |
ThreadFunc2() returns: | |
0 - in all normal cases (even for stream error or memory allocation error) | |
(!= 0) - WRes error return by system threading function | |
*/ | |
// #define MTDEC_ProgessStep (1 << 22) | |
#define MTDEC_ProgessStep (1 << 0) | |
static WRes ThreadFunc2(CMtDecThread *t) | |
{ | |
CMtDec *p = t->mtDec; | |
PRF_STR_INT("ThreadFunc2", t->index); | |
// SetThreadAffinityMask(GetCurrentThread(), 1 << t->index); | |
for (;;) | |
{ | |
SRes res, codeRes; | |
BoolInt wasInterrupted, isAllocError, overflow, finish; | |
SRes threadingErrorSRes; | |
BoolInt needCode, needWrite, needContinue; | |
size_t inDataSize_Start; | |
UInt64 inDataSize; | |
// UInt64 inDataSize_Full; | |
UInt64 blockIndex; | |
UInt64 inPrev = 0; | |
UInt64 outPrev = 0; | |
UInt64 inCodePos; | |
UInt64 outCodePos; | |
Byte *afterEndData = NULL; | |
size_t afterEndData_Size = 0; | |
BoolInt canCreateNewThread = False; | |
// CMtDecCallbackInfo parse; | |
CMtDecThread *nextThread; | |
PRF_STR_INT("Event_Wait(&t->canRead)", t->index); | |
RINOK_THREAD(Event_Wait(&t->canRead)); | |
if (p->exitThread) | |
return 0; | |
PRF_STR_INT("after Event_Wait(&t->canRead)", t->index); | |
// if (t->index == 3) return 19; // for test | |
blockIndex = p->blockIndex++; | |
// PRF(printf("\ncanRead\n")) | |
res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted); | |
finish = p->readWasFinished; | |
needCode = False; | |
needWrite = False; | |
isAllocError = False; | |
overflow = False; | |
inDataSize_Start = 0; | |
inDataSize = 0; | |
// inDataSize_Full = 0; | |
if (res == SZ_OK && !wasInterrupted) | |
{ | |
// if (p->inStream) | |
{ | |
CMtDecBufLink *prev = NULL; | |
CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf; | |
size_t crossSize = p->crossEnd - p->crossStart; | |
PRF(printf("\ncrossSize = %d\n", crossSize)); | |
for (;;) | |
{ | |
if (!link) | |
{ | |
link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize); | |
if (!link) | |
{ | |
finish = True; | |
// p->allocError_for_Read_BlockIndex = blockIndex; | |
isAllocError = True; | |
break; | |
} | |
link->next = NULL; | |
if (prev) | |
{ | |
// static unsigned g_num = 0; | |
// printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev)); | |
prev->next = link; | |
} | |
else | |
t->inBuf = (void *)link; | |
} | |
{ | |
Byte *data = MTDEC__DATA_PTR_FROM_LINK(link); | |
Byte *parseData = data; | |
size_t size; | |
if (crossSize != 0) | |
{ | |
inDataSize = crossSize; | |
// inDataSize_Full = inDataSize; | |
inDataSize_Start = crossSize; | |
size = crossSize; | |
parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart; | |
PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d", | |
(int)p->crossStart, (int)p->crossEnd, (int)finish)); | |
} | |
else | |
{ | |
size = p->inBufSize; | |
res = FullRead(p->inStream, data, &size); | |
// size = 10; // test | |
inDataSize += size; | |
// inDataSize_Full = inDataSize; | |
if (!prev) | |
inDataSize_Start = size; | |
p->readProcessed += size; | |
finish = (size != p->inBufSize); | |
if (finish) | |
p->readWasFinished = True; | |
// res = E_INVALIDARG; // test | |
if (res != SZ_OK) | |
{ | |
// PRF(printf("\nRead error = %d\n", res)) | |
// we want to decode all data before error | |
p->readRes = res; | |
// p->readError_BlockIndex = blockIndex; | |
p->readWasFinished = True; | |
finish = True; | |
res = SZ_OK; | |
// break; | |
} | |
if (inDataSize - inPrev >= MTDEC_ProgessStep) | |
{ | |
res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted); | |
if (res != SZ_OK || wasInterrupted) | |
break; | |
inPrev = inDataSize; | |
} | |
} | |
{ | |
CMtDecCallbackInfo parse; | |
parse.startCall = (prev == NULL); | |
parse.src = parseData; | |
parse.srcSize = size; | |
parse.srcFinished = finish; | |
parse.canCreateNewThread = True; | |
// PRF(printf("\nParse size = %d\n", (unsigned)size)) | |
p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse); | |
needWrite = True; | |
canCreateNewThread = parse.canCreateNewThread; | |
// printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize); | |
if ( | |
// parseRes != SZ_OK || | |
// inDataSize - (size - parse.srcSize) > p->inBlockMax | |
// || | |
parse.state == MTDEC_PARSE_OVERFLOW | |
// || wasInterrupted | |
) | |
{ | |
// Overflow or Parse error - switch from MT decoding to ST decoding | |
finish = True; | |
overflow = True; | |
{ | |
PRF(printf("\n Overflow")); | |
// PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished)); | |
PRF(printf("\n inDataSize = %d", (unsigned)inDataSize)); | |
} | |
if (crossSize != 0) | |
memcpy(data, parseData, size); | |
p->crossStart = 0; | |
p->crossEnd = 0; | |
break; | |
} | |
if (crossSize != 0) | |
{ | |
memcpy(data, parseData, parse.srcSize); | |
p->crossStart += parse.srcSize; | |
} | |
if (parse.state != MTDEC_PARSE_CONTINUE || finish) | |
{ | |
// we don't need to parse in current thread anymore | |
if (parse.state == MTDEC_PARSE_END) | |
finish = True; | |
needCode = True; | |
// p->crossFinished = finish; | |
if (parse.srcSize == size) | |
{ | |
// full parsed - no cross transfer | |
p->crossStart = 0; | |
p->crossEnd = 0; | |
break; | |
} | |
if (parse.state == MTDEC_PARSE_END) | |
{ | |
p->crossStart = 0; | |
p->crossEnd = 0; | |
if (crossSize != 0) | |
memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data | |
afterEndData_Size = size - parse.srcSize; | |
afterEndData = parseData + parse.srcSize; | |
// we reduce data size to required bytes (parsed only) | |
inDataSize -= (size - parse.srcSize); | |
if (!prev) | |
inDataSize_Start = parse.srcSize; | |
break; | |
} | |
{ | |
// partial parsed - need cross transfer | |
if (crossSize != 0) | |
inDataSize = parse.srcSize; // it's only parsed now | |
else | |
{ | |
// partial parsed - is not in initial cross block - we need to copy new data to cross block | |
Byte *cr = MtDec_GetCrossBuff(p); | |
if (!cr) | |
{ | |
{ | |
PRF(printf("\ncross alloc error error\n")); | |
// res = SZ_ERROR_MEM; | |
finish = True; | |
// p->allocError_for_Read_BlockIndex = blockIndex; | |
isAllocError = True; | |
break; | |
} | |
} | |
{ | |
size_t crSize = size - parse.srcSize; | |
inDataSize -= crSize; | |
p->crossEnd = crSize; | |
p->crossStart = 0; | |
memcpy(cr, parseData + parse.srcSize, crSize); | |
} | |
} | |
// inDataSize_Full = inDataSize; | |
if (!prev) | |
inDataSize_Start = parse.srcSize; // it's partial size (parsed only) | |
finish = False; | |
break; | |
} | |
} | |
if (parse.srcSize != size) | |
{ | |
res = SZ_ERROR_FAIL; | |
PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res)); | |
break; | |
} | |
} | |
} | |
prev = link; | |
link = link->next; | |
if (crossSize != 0) | |
{ | |
crossSize = 0; | |
p->crossStart = 0; | |
p->crossEnd = 0; | |
} | |
} | |
} | |
if (res == SZ_OK) | |
res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted); | |
} | |
codeRes = SZ_OK; | |
if (res == SZ_OK && needCode && !wasInterrupted) | |
{ | |
codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index); | |
if (codeRes != SZ_OK) | |
{ | |
needCode = False; | |
finish = True; | |
// SZ_ERROR_MEM is expected error here. | |
// if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later. | |
// if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding. | |
} | |
} | |
if (res != SZ_OK || wasInterrupted) | |
finish = True; | |
nextThread = NULL; | |
threadingErrorSRes = SZ_OK; | |
if (!finish) | |
{ | |
if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread) | |
{ | |
SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]); | |
if (res2 == SZ_OK) | |
{ | |
// if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads)); | |
p->numStartedThreads++; | |
} | |
else | |
{ | |
PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads)); | |
if (p->numStartedThreads == 1) | |
{ | |
// if only one thread is possible, we leave muti-threading code | |
finish = True; | |
needCode = False; | |
threadingErrorSRes = res2; | |
} | |
else | |
p->numStartedThreads_Limit = p->numStartedThreads; | |
} | |
} | |
if (!finish) | |
{ | |
unsigned nextIndex = t->index + 1; | |
nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex]; | |
RINOK_THREAD(Event_Set(&nextThread->canRead)) | |
// We have started executing for new iteration (with next thread) | |
// And that next thread now is responsible for possible exit from decoding (threading_code) | |
} | |
} | |
// each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite) | |
// if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case | |
// if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block): | |
// - if (needContinue) after Write(&needContinue), we restore decoding with new iteration | |
// - otherwise we stop decoding and exit from ThreadFunc2() | |
// Don't change (finish) variable in the further code | |
// ---------- CODE ---------- | |
inPrev = 0; | |
outPrev = 0; | |
inCodePos = 0; | |
outCodePos = 0; | |
if (res == SZ_OK && needCode && codeRes == SZ_OK) | |
{ | |
BoolInt isStartBlock = True; | |
CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf; | |
for (;;) | |
{ | |
size_t inSize; | |
int stop; | |
if (isStartBlock) | |
inSize = inDataSize_Start; | |
else | |
{ | |
UInt64 rem = inDataSize - inCodePos; | |
inSize = p->inBufSize; | |
if (inSize > rem) | |
inSize = (size_t)rem; | |
} | |
inCodePos += inSize; | |
stop = True; | |
codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index, | |
(const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize, | |
(inCodePos == inDataSize), // srcFinished | |
&inCodePos, &outCodePos, &stop); | |
if (codeRes != SZ_OK) | |
{ | |
PRF(printf("\nCode Interrupt error = %x\n", codeRes)); | |
// we interrupt only later blocks | |
MtDec_Interrupt(p, blockIndex); | |
break; | |
} | |
if (stop || inCodePos == inDataSize) | |
break; | |
{ | |
const UInt64 inDelta = inCodePos - inPrev; | |
const UInt64 outDelta = outCodePos - outPrev; | |
if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep) | |
{ | |
// Sleep(1); | |
res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted); | |
if (res != SZ_OK || wasInterrupted) | |
break; | |
inPrev = inCodePos; | |
outPrev = outCodePos; | |
} | |
} | |
link = link->next; | |
isStartBlock = False; | |
} | |
} | |
// ---------- WRITE ---------- | |
RINOK_THREAD(Event_Wait(&t->canWrite)); | |
{ | |
BoolInt isErrorMode = False; | |
BoolInt canRecode = True; | |
BoolInt needWriteToStream = needWrite; | |
if (p->exitThread) return 0; // it's never executed in normal cases | |
if (p->wasInterrupted) | |
wasInterrupted = True; | |
else | |
{ | |
if (codeRes != SZ_OK) // || !needCode // check it !!! | |
{ | |
p->wasInterrupted = True; | |
p->codeRes = codeRes; | |
if (codeRes == SZ_ERROR_MEM) | |
isAllocError = True; | |
} | |
if (threadingErrorSRes) | |
{ | |
p->wasInterrupted = True; | |
p->threadingErrorSRes = threadingErrorSRes; | |
needWriteToStream = False; | |
} | |
if (isAllocError) | |
{ | |
p->wasInterrupted = True; | |
p->isAllocError = True; | |
needWriteToStream = False; | |
} | |
if (overflow) | |
{ | |
p->wasInterrupted = True; | |
p->overflow = True; | |
needWriteToStream = False; | |
} | |
} | |
if (needCode) | |
{ | |
if (wasInterrupted) | |
{ | |
inCodePos = 0; | |
outCodePos = 0; | |
} | |
{ | |
const UInt64 inDelta = inCodePos - inPrev; | |
const UInt64 outDelta = outCodePos - outPrev; | |
// if (inDelta != 0 || outDelta != 0) | |
res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta); | |
} | |
} | |
needContinue = (!finish); | |
// if (res == SZ_OK && needWrite && !wasInterrupted) | |
if (needWrite) | |
{ | |
// p->inProcessed += inCodePos; | |
res = p->mtCallback->Write(p->mtCallbackObject, t->index, | |
res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite | |
afterEndData, afterEndData_Size, | |
&needContinue, | |
&canRecode); | |
// res= E_INVALIDARG; // for test | |
PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue)); | |
PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed)); | |
if (res != SZ_OK) | |
{ | |
PRF(printf("\nWrite error = %d\n", res)); | |
isErrorMode = True; | |
p->wasInterrupted = True; | |
} | |
if (res != SZ_OK | |
|| (!needContinue && !finish)) | |
{ | |
PRF(printf("\nWrite Interrupt error = %x\n", res)); | |
MtDec_Interrupt(p, blockIndex); | |
} | |
} | |
if (canRecode) | |
if (!needCode | |
|| res != SZ_OK | |
|| p->wasInterrupted | |
|| codeRes != SZ_OK | |
|| wasInterrupted | |
|| p->numFilledThreads != 0 | |
|| isErrorMode) | |
{ | |
if (p->numFilledThreads == 0) | |
p->filledThreadStart = t->index; | |
if (inDataSize != 0 || !finish) | |
{ | |
t->inDataSize_Start = inDataSize_Start; | |
t->inDataSize = inDataSize; | |
p->numFilledThreads++; | |
} | |
PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads)); | |
PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart)); | |
} | |
if (!finish) | |
{ | |
RINOK_THREAD(Event_Set(&nextThread->canWrite)); | |
} | |
else | |
{ | |
if (needContinue) | |
{ | |
// we restore decoding with new iteration | |
RINOK_THREAD(Event_Set(&p->threads[0].canWrite)); | |
} | |
else | |
{ | |
// we exit from decoding | |
if (t->index == 0) | |
return SZ_OK; | |
p->exitThread = True; | |
} | |
RINOK_THREAD(Event_Set(&p->threads[0].canRead)); | |
} | |
} | |
} | |
} | |
#ifdef _WIN32 | |
#define USE_ALLOCA | |
#endif | |
#ifdef USE_ALLOCA | |
#ifdef _WIN32 | |
#include <malloc.h> | |
#else | |
#include <stdlib.h> | |
#endif | |
#endif | |
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp) | |
{ | |
WRes res; | |
CMtDecThread *t = (CMtDecThread *)pp; | |
CMtDec *p; | |
// fprintf(stdout, "\n%d = %p\n", t->index, &t); | |
res = ThreadFunc2(t); | |
p = t->mtDec; | |
if (res == 0) | |
return p->exitThreadWRes; | |
{ | |
// it's unexpected situation for some threading function error | |
if (p->exitThreadWRes == 0) | |
p->exitThreadWRes = res; | |
PRF(printf("\nthread exit error = %d\n", res)); | |
p->exitThread = True; | |
Event_Set(&p->threads[0].canRead); | |
Event_Set(&p->threads[0].canWrite); | |
MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res)); | |
} | |
return res; | |
} | |
static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp) | |
{ | |
CMtDecThread *t = (CMtDecThread *)pp; | |
// fprintf(stderr, "\n%d = %p - before", t->index, &t); | |
#ifdef USE_ALLOCA | |
t->allocaPtr = alloca(t->index * 128); | |
#endif | |
return ThreadFunc1(pp); | |
} | |
int MtDec_PrepareRead(CMtDec *p) | |
{ | |
if (p->crossBlock && p->crossStart == p->crossEnd) | |
{ | |
ISzAlloc_Free(p->alloc, p->crossBlock); | |
p->crossBlock = NULL; | |
} | |
{ | |
unsigned i; | |
for (i = 0; i < MTDEC__THREADS_MAX; i++) | |
if (i > p->numStartedThreads | |
|| p->numFilledThreads <= | |
(i >= p->filledThreadStart ? | |
i - p->filledThreadStart : | |
i + p->numStartedThreads - p->filledThreadStart)) | |
MtDecThread_FreeInBufs(&p->threads[i]); | |
} | |
return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd); | |
} | |
const Byte *MtDec_Read(CMtDec *p, size_t *inLim) | |
{ | |
while (p->numFilledThreads != 0) | |
{ | |
CMtDecThread *t = &p->threads[p->filledThreadStart]; | |
if (*inLim != 0) | |
{ | |
{ | |
void *link = t->inBuf; | |
void *next = ((CMtDecBufLink *)link)->next; | |
ISzAlloc_Free(p->alloc, link); | |
t->inBuf = next; | |
} | |
if (t->inDataSize == 0) | |
{ | |
MtDecThread_FreeInBufs(t); | |
if (--p->numFilledThreads == 0) | |
break; | |
if (++p->filledThreadStart == p->numStartedThreads) | |
p->filledThreadStart = 0; | |
t = &p->threads[p->filledThreadStart]; | |
} | |
} | |
{ | |
size_t lim = t->inDataSize_Start; | |
if (lim != 0) | |
t->inDataSize_Start = 0; | |
else | |
{ | |
UInt64 rem = t->inDataSize; | |
lim = p->inBufSize; | |
if (lim > rem) | |
lim = (size_t)rem; | |
} | |
t->inDataSize -= lim; | |
*inLim = lim; | |
return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf); | |
} | |
} | |
{ | |
size_t crossSize = p->crossEnd - p->crossStart; | |
if (crossSize != 0) | |
{ | |
const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart; | |
*inLim = crossSize; | |
p->crossStart = 0; | |
p->crossEnd = 0; | |
return data; | |
} | |
*inLim = 0; | |
if (p->crossBlock) | |
{ | |
ISzAlloc_Free(p->alloc, p->crossBlock); | |
p->crossBlock = NULL; | |
} | |
return NULL; | |
} | |
} | |
void MtDec_Construct(CMtDec *p) | |
{ | |
unsigned i; | |
p->inBufSize = (size_t)1 << 18; | |
p->numThreadsMax = 0; | |
p->inStream = NULL; | |
// p->inData = NULL; | |
// p->inDataSize = 0; | |
p->crossBlock = NULL; | |
p->crossStart = 0; | |
p->crossEnd = 0; | |
p->numFilledThreads = 0; | |
p->progress = NULL; | |
p->alloc = NULL; | |
p->mtCallback = NULL; | |
p->mtCallbackObject = NULL; | |
p->allocatedBufsSize = 0; | |
for (i = 0; i < MTDEC__THREADS_MAX; i++) | |
{ | |
CMtDecThread *t = &p->threads[i]; | |
t->mtDec = p; | |
t->index = i; | |
t->inBuf = NULL; | |
Event_Construct(&t->canRead); | |
Event_Construct(&t->canWrite); | |
Thread_Construct(&t->thread); | |
} | |
// Event_Construct(&p->finishedEvent); | |
CriticalSection_Init(&p->mtProgress.cs); | |
} | |
static void MtDec_Free(CMtDec *p) | |
{ | |
unsigned i; | |
p->exitThread = True; | |
for (i = 0; i < MTDEC__THREADS_MAX; i++) | |
MtDecThread_Destruct(&p->threads[i]); | |
// Event_Close(&p->finishedEvent); | |
if (p->crossBlock) | |
{ | |
ISzAlloc_Free(p->alloc, p->crossBlock); | |
p->crossBlock = NULL; | |
} | |
} | |
void MtDec_Destruct(CMtDec *p) | |
{ | |
MtDec_Free(p); | |
CriticalSection_Delete(&p->mtProgress.cs); | |
} | |
SRes MtDec_Code(CMtDec *p) | |
{ | |
unsigned i; | |
p->inProcessed = 0; | |
p->blockIndex = 1; // it must be larger than not_defined index (0) | |
p->isAllocError = False; | |
p->overflow = False; | |
p->threadingErrorSRes = SZ_OK; | |
p->needContinue = True; | |
p->readWasFinished = False; | |
p->needInterrupt = False; | |
p->interruptIndex = (UInt64)(Int64)-1; | |
p->readProcessed = 0; | |
p->readRes = SZ_OK; | |
p->codeRes = SZ_OK; | |
p->wasInterrupted = False; | |
p->crossStart = 0; | |
p->crossEnd = 0; | |
p->filledThreadStart = 0; | |
p->numFilledThreads = 0; | |
{ | |
unsigned numThreads = p->numThreadsMax; | |
if (numThreads > MTDEC__THREADS_MAX) | |
numThreads = MTDEC__THREADS_MAX; | |
p->numStartedThreads_Limit = numThreads; | |
p->numStartedThreads = 0; | |
} | |
if (p->inBufSize != p->allocatedBufsSize) | |
{ | |
for (i = 0; i < MTDEC__THREADS_MAX; i++) | |
{ | |
CMtDecThread *t = &p->threads[i]; | |
if (t->inBuf) | |
MtDecThread_FreeInBufs(t); | |
} | |
if (p->crossBlock) | |
{ | |
ISzAlloc_Free(p->alloc, p->crossBlock); | |
p->crossBlock = NULL; | |
} | |
p->allocatedBufsSize = p->inBufSize; | |
} | |
MtProgress_Init(&p->mtProgress, p->progress); | |
// RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent)); | |
p->exitThread = False; | |
p->exitThreadWRes = 0; | |
{ | |
WRes wres; | |
WRes sres; | |
CMtDecThread *nextThread = &p->threads[p->numStartedThreads++]; | |
// wres = MtDecThread_CreateAndStart(nextThread); | |
wres = MtDecThread_CreateEvents(nextThread); | |
if (wres == 0) { wres = Event_Set(&nextThread->canWrite); | |
if (wres == 0) { wres = Event_Set(&nextThread->canRead); | |
if (wres == 0) { wres = ThreadFunc(nextThread); | |
if (wres != 0) | |
{ | |
p->needContinue = False; | |
MtDec_CloseThreads(p); | |
}}}} | |
// wres = 17; // for test | |
// wres = Event_Wait(&p->finishedEvent); | |
sres = MY_SRes_HRESULT_FROM_WRes(wres); | |
if (sres != 0) | |
p->threadingErrorSRes = sres; | |
if ( | |
// wres == 0 | |
// wres != 0 | |
// || p->mtc.codeRes == SZ_ERROR_MEM | |
p->isAllocError | |
|| p->threadingErrorSRes != SZ_OK | |
|| p->overflow) | |
{ | |
// p->needContinue = True; | |
} | |
else | |
p->needContinue = False; | |
if (p->needContinue) | |
return SZ_OK; | |
// if (sres != SZ_OK) | |
return sres; | |
// return E_FAIL; | |
} | |
} | |
#endif |