blob: 9864cc87412410fd4072b04300b223f7ac94c3b7 [file] [log] [blame]
/* MtDec.h -- Multi-thread Decoder
2018-07-04 : Igor Pavlov : Public domain */
#ifndef __MT_DEC_H
#define __MT_DEC_H
#include "7zTypes.h"
#ifndef _7ZIP_ST
#include "Threads.h"
#ifndef _7ZIP_ST
#ifndef _7ZIP_ST
typedef struct
ICompressProgress *progress;
SRes res;
UInt64 totalInSize;
UInt64 totalOutSize;
CCriticalSection cs;
} CMtProgress;
void MtProgress_Init(CMtProgress *p, ICompressProgress *progress);
SRes MtProgress_Progress_ST(CMtProgress *p);
SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize);
SRes MtProgress_GetError(CMtProgress *p);
void MtProgress_SetError(CMtProgress *p, SRes res);
struct _CMtDec;
typedef struct
struct _CMtDec *mtDec;
unsigned index;
void *inBuf;
size_t inDataSize_Start; // size of input data in start block
UInt64 inDataSize; // total size of input data in all blocks
CThread thread;
CAutoResetEvent canRead;
CAutoResetEvent canWrite;
void *allocaPtr;
} CMtDecThread;
void MtDecThread_FreeInBufs(CMtDecThread *t);
typedef enum
MTDEC_PARSE_CONTINUE, // continue this block with more input data
MTDEC_PARSE_OVERFLOW, // MT buffers overflow, need switch to single-thread
MTDEC_PARSE_NEW, // new block
MTDEC_PARSE_END // end of block threading. But we still can return to threading after Write(&needContinue)
} EMtDecParseState;
typedef struct
// in
int startCall;
const Byte *src;
size_t srcSize;
// in : (srcSize == 0) is allowed
// out : it's allowed to return less that actually was used ?
int srcFinished;
// out
EMtDecParseState state;
BoolInt canCreateNewThread;
UInt64 outPos; // check it (size_t)
} CMtDecCallbackInfo;
typedef struct
void (*Parse)(void *p, unsigned coderIndex, CMtDecCallbackInfo *ci);
// PreCode() and Code():
// (SRes_return_result != SZ_OK) means stop decoding, no need another blocks
SRes (*PreCode)(void *p, unsigned coderIndex);
SRes (*Code)(void *p, unsigned coderIndex,
const Byte *src, size_t srcSize, int srcFinished,
UInt64 *inCodePos, UInt64 *outCodePos, int *stop);
// stop - means stop another Code calls
/* Write() must be called, if Parse() was called
set (needWrite) if
&& (was not interrupted by progress)
&& (was not interrupted in previous block)
if (*needContinue), decoder still need to continue decoding with new iteration,
even after MTDEC_PARSE_END
if (*canRecode), we didn't flush current block data, so we still can decode current block later.
SRes (*Write)(void *p, unsigned coderIndex,
BoolInt needWriteToStream,
const Byte *src, size_t srcSize,
// int srcFinished,
BoolInt *needContinue,
BoolInt *canRecode);
} IMtDecCallback;
typedef struct _CMtDec
/* input variables */
size_t inBufSize; /* size of input block */
unsigned numThreadsMax;
// size_t inBlockMax;
unsigned numThreadsMax_2;
ISeqInStream *inStream;
// const Byte *inData;
// size_t inDataSize;
ICompressProgress *progress;
ISzAllocPtr alloc;
IMtDecCallback *mtCallback;
void *mtCallbackObject;
/* internal variables */
size_t allocatedBufsSize;
BoolInt exitThread;
WRes exitThreadWRes;
UInt64 blockIndex;
BoolInt isAllocError;
BoolInt overflow;
SRes threadingErrorSRes;
BoolInt needContinue;
// CAutoResetEvent finishedEvent;
SRes readRes;
SRes codeRes;
BoolInt wasInterrupted;
unsigned numStartedThreads_Limit;
unsigned numStartedThreads;
Byte *crossBlock;
size_t crossStart;
size_t crossEnd;
UInt64 readProcessed;
BoolInt readWasFinished;
UInt64 inProcessed;
unsigned filledThreadStart;
unsigned numFilledThreads;
#ifndef _7ZIP_ST
BoolInt needInterrupt;
UInt64 interruptIndex;
CMtProgress mtProgress;
CMtDecThread threads[MTDEC__THREADS_MAX];
} CMtDec;
void MtDec_Construct(CMtDec *p);
void MtDec_Destruct(CMtDec *p);
MtDec_Code() returns:
SZ_OK - in most cases
MY_SRes_HRESULT_FROM_WRes(WRes_error) - in case of unexpected error in threading function
SRes MtDec_Code(CMtDec *p);
Byte *MtDec_GetCrossBuff(CMtDec *p);
int MtDec_PrepareRead(CMtDec *p);
const Byte *MtDec_Read(CMtDec *p, size_t *inLim);