zstdmt: job table correctly cleaned after synchronous ZSTDMT_compress()
diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c
index 3bf585d..d78257b 100644
--- a/lib/compress/zstdmt_compress.c
+++ b/lib/compress/zstdmt_compress.c
@@ -559,6 +559,7 @@
DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
+ mtctx->jobs[jobID].cSize = 0;
DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].srcBuff.start);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].srcBuff);
mtctx->jobs[jobID].srcBuff = g_nullBuffer;
@@ -816,6 +817,7 @@
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[chunkID].dstBuff);
} }
mtctx->jobs[chunkID].dstBuff = g_nullBuffer;
+ mtctx->jobs[chunkID].cSize = 0;
dstPos += cSize ;
}
} /* for (chunkID=0; chunkID<nbChunks; chunkID++) */
@@ -1158,11 +1160,13 @@
mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */
mtctx->jobs[wJobID].frameChecksumNeeded = 0;
}
- if (cSize > 0) { /* compression is ongoing or completed */
+ if (cSize > 0) { /* compression is ongoing or completed */
size_t const toWrite = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos);
- DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u)",
- (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize);
+ DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)",
+ (U32)toWrite, mtctx->doneJobID, (U32)srcConsumed, (U32)mtctx->jobs[wJobID].srcSize, (U32)cSize);
+ assert(mtctx->doneJobID < mtctx->nextJobID);
assert(cSize >= mtctx->jobs[wJobID].dstFlushed);
+ assert(mtctx->jobs[wJobID].dstBuff.start != NULL);
memcpy((char*)output->dst + output->pos, (const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed, toWrite);
output->pos += toWrite;
mtctx->jobs[wJobID].dstFlushed += toWrite; /* can write : this value is only used by mtctx */
@@ -1204,7 +1208,8 @@
{
size_t const newJobThreshold = mtctx->inBuff.prefixSize + mtctx->targetSectionSize;
unsigned forwardInputProgress = 0;
- DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u)", (U32)endOp);
+ DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)",
+ (U32)endOp, (U32)(input->size - input->pos));
assert(output->pos <= output->size);
assert(input->pos <= input->size);