liblzma: Make lzma_outq usable for threaded decompression too.

Before this commit all output queue buffers were allocated as
a single big allocation. Now each buffer is allocated separately
when needed. Used buffers are cached to avoid reallocation
overhead but the cache will keep only one buffer size at a time.
This should make things work OK in the decompression where most
of the time the buffer sizes will be the same but with some less
common files the buffer sizes may vary.

While this should work fine, it's still a bit preliminary
and may even get reverted if it turns out to be useless for
decompression.
This commit is contained in:
Lasse Collin 2021-01-09 21:14:36 +02:00
parent a35a69d693
commit f7fa309e1f
3 changed files with 302 additions and 158 deletions

View File

@ -13,84 +13,100 @@
#include "outqueue.h"
/// This is to ease integer overflow checking: We may allocate up to
/// 2 * LZMA_THREADS_MAX buffers and we need some extra memory for other
/// data structures (that's the second /2).
#define BUF_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX / 2 / 2)
static lzma_ret
get_options(uint64_t *bufs_alloc_size, uint32_t *bufs_count,
uint64_t buf_size_max, uint32_t threads)
{
if (threads > LZMA_THREADS_MAX || buf_size_max > BUF_SIZE_MAX)
return LZMA_OPTIONS_ERROR;
// The number of buffers is twice the number of threads.
// This wastes RAM but keeps the threads busy when buffers
// finish out of order.
//
// NOTE: If this is changed, update BUF_SIZE_MAX too.
*bufs_count = threads * 2;
*bufs_alloc_size = *bufs_count * buf_size_max;
return LZMA_OK;
}
/// Get the maximum number of buffers that may be allocated based
/// on the number of threads. For now this is twice the number of threads.
/// It's a compromise between RAM usage and keeping the worker threads busy
/// when buffers finish out of order.
#define GET_BUFS_LIMIT(threads) (2 * (threads))
extern uint64_t
lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
{
uint64_t bufs_alloc_size;
uint32_t bufs_count;
// This is to ease integer overflow checking: We may allocate up to
// GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra
// memory for other data structures too (that's the /2).
//
// lzma_outq_prealloc_buf() will still accept bigger buffers than this.
const uint64_t limit
= UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2;
if (get_options(&bufs_alloc_size, &bufs_count, buf_size_max, threads)
!= LZMA_OK)
if (threads > LZMA_THREADS_MAX || buf_size_max > limit)
return UINT64_MAX;
return sizeof(lzma_outq) + bufs_count * sizeof(lzma_outbuf)
+ bufs_alloc_size;
return GET_BUFS_LIMIT(threads) * (sizeof(lzma_outbuf) + buf_size_max);
}
static void
move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator)
{
assert(outq->head != NULL);
assert(outq->tail != NULL);
assert(outq->bufs_in_use > 0);
--outq->bufs_in_use;
lzma_outbuf *buf = outq->head;
outq->head = buf->next;
if (outq->head == NULL)
outq->tail = NULL;
if (outq->cache != NULL && outq->cache->allocated != buf->allocated)
lzma_outq_clear_cache(outq, allocator);
buf->next = outq->cache;
outq->cache = buf;
return;
}
static void
free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator)
{
assert(outq->cache != NULL);
lzma_outbuf *buf = outq->cache;
outq->cache = buf->next;
--outq->bufs_allocated;
outq->memusage -= sizeof(*buf) + buf->allocated;
lzma_free(buf, allocator);
return;
}
extern void
lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator)
{
while (outq->cache != NULL)
free_one_cached_buffer(outq, allocator);
return;
}
extern lzma_ret
lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
uint64_t buf_size_max, uint32_t threads)
uint32_t threads)
{
uint64_t bufs_alloc_size;
uint32_t bufs_count;
if (threads > LZMA_THREADS_MAX)
return LZMA_OPTIONS_ERROR;
// Set bufs_count and bufs_alloc_size.
return_if_error(get_options(&bufs_alloc_size, &bufs_count,
buf_size_max, threads));
const uint32_t bufs_limit = GET_BUFS_LIMIT(threads);
// Allocate memory if needed.
if (outq->buf_size_max != buf_size_max
|| outq->bufs_allocated != bufs_count) {
lzma_outq_end(outq, allocator);
// Clear head/tail.
while (outq->head != NULL)
move_head_to_cache(outq, allocator);
#if SIZE_MAX < UINT64_MAX
if (bufs_alloc_size > SIZE_MAX)
return LZMA_MEM_ERROR;
#endif
// If new buf_limit is lower than the old one, we may need to free
// a few cached buffers.
while (bufs_limit < outq->bufs_allocated)
free_one_cached_buffer(outq, allocator);
outq->bufs = lzma_alloc(bufs_count * sizeof(lzma_outbuf),
allocator);
outq->bufs_mem = lzma_alloc((size_t)(bufs_alloc_size),
allocator);
if (outq->bufs == NULL || outq->bufs_mem == NULL) {
lzma_outq_end(outq, allocator);
return LZMA_MEM_ERROR;
}
}
// Initialize the rest of the main structure. Initialization of
// outq->bufs[] is done when they are actually needed.
outq->buf_size_max = (size_t)(buf_size_max);
outq->bufs_allocated = bufs_count;
outq->bufs_pos = 0;
outq->bufs_used = 0;
outq->bufs_limit = bufs_limit;
outq->read_pos = 0;
return LZMA_OK;
@ -100,33 +116,76 @@ lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
extern void
lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator)
{
lzma_free(outq->bufs, allocator);
outq->bufs = NULL;
lzma_free(outq->bufs_mem, allocator);
outq->bufs_mem = NULL;
while (outq->head != NULL)
move_head_to_cache(outq, allocator);
lzma_outq_clear_cache(outq, allocator);
return;
}
extern lzma_outbuf *
lzma_outq_get_buf(lzma_outq *outq)
extern lzma_ret
lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator,
size_t size)
{
// Caller must have checked it with lzma_outq_has_buf().
assert(outq->bufs_used < outq->bufs_allocated);
assert(outq->bufs_in_use < outq->bufs_limit);
// Initialize the new buffer.
lzma_outbuf *buf = &outq->bufs[outq->bufs_pos];
buf->buf = outq->bufs_mem + outq->bufs_pos * outq->buf_size_max;
buf->size = 0;
// If there already is appropriately-sized buffer in the cache,
// we need to do nothing.
if (outq->cache != NULL && outq->cache->allocated == size)
return LZMA_OK;
if (size > SIZE_MAX - sizeof(lzma_outbuf))
return LZMA_MEM_ERROR;
// The cache may have buffers but their size is wrong.
lzma_outq_clear_cache(outq, allocator);
outq->cache = lzma_alloc(sizeof(lzma_outbuf) + size, allocator);
if (outq->cache == NULL)
return LZMA_MEM_ERROR;
outq->cache->next = NULL;
outq->cache->allocated = size;
++outq->bufs_allocated;
outq->memusage += sizeof(lzma_outbuf) + size;
return LZMA_OK;
}
extern lzma_outbuf *
lzma_outq_get_buf(lzma_outq *outq, void *worker)
{
// Caller must have used lzma_outq_prealloc_buf() to ensure these.
assert(outq->bufs_in_use < outq->bufs_limit);
assert(outq->bufs_in_use < outq->bufs_allocated);
assert(outq->cache != NULL);
lzma_outbuf *buf = outq->cache;
outq->cache = buf->next;
buf->next = NULL;
if (outq->tail != NULL) {
assert(outq->head != NULL);
outq->tail->next = buf;
} else {
assert(outq->head == NULL);
outq->head = buf;
}
outq->tail = buf;
buf->worker = worker;
buf->finished = false;
buf->pos = 0;
// Update the queue state.
if (++outq->bufs_pos == outq->bufs_allocated)
outq->bufs_pos = 0;
buf->unpadded_size = 0;
buf->uncompressed_size = 0;
++outq->bufs_used;
++outq->bufs_in_use;
return buf;
}
@ -135,50 +194,65 @@ lzma_outq_get_buf(lzma_outq *outq)
extern bool
lzma_outq_is_readable(const lzma_outq *outq)
{
uint32_t i = outq->bufs_pos - outq->bufs_used;
if (outq->bufs_pos < outq->bufs_used)
i += outq->bufs_allocated;
if (outq->head == NULL)
return false;
return outq->bufs[i].finished;
return outq->read_pos < outq->head->pos || outq->head->finished;
}
extern lzma_ret
lzma_outq_read(lzma_outq *restrict outq, uint8_t *restrict out,
size_t *restrict out_pos, size_t out_size,
lzma_outq_read(lzma_outq *restrict outq,
const lzma_allocator *restrict allocator,
uint8_t *restrict out, size_t *restrict out_pos,
size_t out_size,
lzma_vli *restrict unpadded_size,
lzma_vli *restrict uncompressed_size)
{
// There must be at least one buffer from which to read.
if (outq->bufs_used == 0)
if (outq->bufs_in_use == 0)
return LZMA_OK;
// Get the buffer.
uint32_t i = outq->bufs_pos - outq->bufs_used;
if (outq->bufs_pos < outq->bufs_used)
i += outq->bufs_allocated;
lzma_outbuf *buf = &outq->bufs[i];
// If it isn't finished yet, we cannot read from it.
if (!buf->finished)
return LZMA_OK;
lzma_outbuf *buf = outq->head;
// Copy from the buffer to output.
lzma_bufcpy(buf->buf, &outq->read_pos, buf->size,
//
// FIXME? In threaded decoder it may be bad to do this copy while
// the mutex is being held.
lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos,
out, out_pos, out_size);
// Return if we didn't get all the data from the buffer.
if (outq->read_pos < buf->size)
if (!buf->finished || outq->read_pos < buf->pos)
return LZMA_OK;
// The buffer was finished. Tell the caller its size information.
*unpadded_size = buf->unpadded_size;
*uncompressed_size = buf->uncompressed_size;
if (unpadded_size != NULL)
*unpadded_size = buf->unpadded_size;
if (uncompressed_size != NULL)
*uncompressed_size = buf->uncompressed_size;
// Free this buffer for further use.
--outq->bufs_used;
move_head_to_cache(outq, allocator);
outq->read_pos = 0;
return LZMA_STREAM_END;
}
extern void
lzma_outq_enable_partial_output(lzma_outq *outq,
void (*enable_partial_output)(void *worker))
{
if (outq->head != NULL && !outq->head->finished
&& outq->head->worker != NULL) {
enable_partial_output(outq->head->worker);
// Set it to NULL since calling it twice is pointless.
outq->head->worker = NULL;
}
return;
}

View File

@ -14,16 +14,27 @@
/// Output buffer for a single thread
typedef struct {
/// Pointer to the output buffer of lzma_outq.buf_size_max bytes
uint8_t *buf;
typedef struct lzma_outbuf_s lzma_outbuf;
struct lzma_outbuf_s {
/// Pointer to the next buffer. This is used for the cached buffers.
/// The worker thread must not modify this.
lzma_outbuf *next;
/// Amount of data written to buf
size_t size;
/// This initialized by lzma_outq_get_buf() and
/// is used by lzma_outq_enable_partial_output().
/// The worker thread must not modify this.
void *worker;
/// Additional size information
lzma_vli unpadded_size;
lzma_vli uncompressed_size;
/// Amount of memory allocated for buf[].
/// The worker thread must not modify this.
size_t allocated;
/// Writing position in the worker thread or, in other words, the
/// amount of finished data written to buf[] which can be copied out
///
/// \note This is read by another thread and thus access
/// to this variable needs a mutex.
size_t pos;
/// True when no more data will be written into this buffer.
///
@ -31,32 +42,44 @@ typedef struct {
/// to this variable needs a mutex.
bool finished;
} lzma_outbuf;
/// Additional size information. lzma_outq_read() may read these
/// when "finished" is true.
lzma_vli unpadded_size;
lzma_vli uncompressed_size;
/// Buffer of "allocated" bytes
uint8_t buf[];
};
typedef struct {
/// Array of buffers that are used cyclically.
lzma_outbuf *bufs;
/// Linked list of buffers in use. The next output byte will be
/// read from the head and buffers for the next thread will be
/// appended to the tail. tail->next is always NULL.
lzma_outbuf *head;
lzma_outbuf *tail;
/// Memory allocated for all the buffers
uint8_t *bufs_mem;
/// Amount of buffer space available in each buffer
size_t buf_size_max;
/// Number of buffers allocated
uint32_t bufs_allocated;
/// Position in the bufs array. The next buffer to be taken
/// into use is bufs[bufs_pos].
uint32_t bufs_pos;
/// Number of buffers in use
uint32_t bufs_used;
/// Position in the buffer in lzma_outq_read()
/// Number of bytes read from head->buf[] in lzma_outq_read()
size_t read_pos;
/// Linked list of allocated buffers that aren't currently used.
/// This way buffers of similar size can be reused and don't
/// need to be reallocated every time. For simplicity, all
/// cached buffers in the list have the same allocated size.
lzma_outbuf *cache;
/// Total amount of memory allocated for buffers
uint64_t memusage;
/// Number of buffers in use in the head...tail list. If and only if
/// this is zero, the pointers head and tail above are NULL.
uint32_t bufs_in_use;
/// Number of buffers allocated (in use + cached)
uint32_t bufs_allocated;
/// Maximum allowed number of allocated buffers
uint32_t bufs_limit;
} lzma_outq;
@ -76,32 +99,50 @@ extern uint64_t lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads);
/// function knows that there are no previous
/// allocations to free.
/// \param allocator Pointer to allocator or NULL
/// \param buf_size_max Maximum amount of data that a single buffer
/// in the queue may need to store.
/// \param threads Number of buffers that may be in use
/// concurrently. Note that more than this number
/// of buffers will actually get allocated to
/// of buffers may actually get allocated to
/// improve performance when buffers finish
/// out of order.
/// out of order. The actual maximum number of
/// allocated buffers is derived from the number
/// of threads.
///
/// \return - LZMA_OK
/// - LZMA_MEM_ERROR
///
extern lzma_ret lzma_outq_init(
lzma_outq *outq, const lzma_allocator *allocator,
uint64_t buf_size_max, uint32_t threads);
extern lzma_ret lzma_outq_init(lzma_outq *outq,
const lzma_allocator *allocator, uint32_t threads);
/// \brief Free the memory associated with the output queue
extern void lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator);
/// \brief Free all cached buffers that consume memory but aren't in use
extern void lzma_outq_clear_cache(
lzma_outq *outq, const lzma_allocator *allocator);
/// \brief Preallocate a new buffer into cache
///
/// Splitting the buffer allocation into a separate function makes it
/// possible to ensure that way lzma_outq_get_buf() cannot fail.
/// If the preallocated buffer isn't actually used (for example, some
/// other error occurs), the caller has to do nothing as the buffer will
/// be used later or cleared from the cache when not needed.
///
/// \return LZMA_OK on success, LZMA_MEM_ERROR if allocation fails
///
extern lzma_ret lzma_outq_prealloc_buf(
lzma_outq *outq, const lzma_allocator *allocator, size_t size);
/// \brief Get a new buffer
///
/// lzma_outq_has_buf() must be used to check that there is a buffer
/// lzma_outq_prealloc_buf() must be used to ensure that there is a buffer
/// available before calling lzma_outq_get_buf().
///
extern lzma_outbuf *lzma_outq_get_buf(lzma_outq *outq);
extern lzma_outbuf *lzma_outq_get_buf(lzma_outq *outq, void *worker);
/// \brief Test if there is data ready to be read
@ -126,17 +167,32 @@ extern bool lzma_outq_is_readable(const lzma_outq *outq);
/// \return - LZMA: All OK. Either no data was available or the buffer
/// being read didn't become empty yet.
/// - LZMA_STREAM_END: The buffer being read was finished.
/// *unpadded_size and *uncompressed_size were set.
/// *unpadded_size and *uncompressed_size were set if they
/// were not NULL.
///
/// \note This reads lzma_outbuf.finished variables and thus call
/// to this function needs to be protected with a mutex.
/// \note This reads lzma_outbuf.finished and .pos variables and thus
/// calls to this function need to be protected with a mutex.
///
extern lzma_ret lzma_outq_read(lzma_outq *restrict outq,
const lzma_allocator *restrict allocator,
uint8_t *restrict out, size_t *restrict out_pos,
size_t out_size, lzma_vli *restrict unpadded_size,
lzma_vli *restrict uncompressed_size);
/// \brief Enable partial output from a worker thread
///
/// If the buffer at the head of the output queue isn't finished,
/// this will call enable_partial_output on the worker associated with
/// that output buffer.
///
/// \note This reads a lzma_outbuf.finished variable and thus
/// calls to this function need to be protected with a mutex.
///
extern void lzma_outq_enable_partial_output(lzma_outq *outq,
void (*enable_partial_output)(void *worker));
/// \brief Test if there is at least one buffer free
///
/// This must be used before getting a new buffer with lzma_outq_get_buf().
@ -144,7 +200,7 @@ extern lzma_ret lzma_outq_read(lzma_outq *restrict outq,
static inline bool
lzma_outq_has_buf(const lzma_outq *outq)
{
return outq->bufs_used < outq->bufs_allocated;
return outq->bufs_in_use < outq->bufs_limit;
}
@ -152,5 +208,5 @@ lzma_outq_has_buf(const lzma_outq *outq)
static inline bool
lzma_outq_is_empty(const lzma_outq *outq)
{
return outq->bufs_used == 0;
return outq->bufs_in_use == 0;
}

View File

@ -133,6 +133,9 @@ struct lzma_stream_coder_s {
/// Output buffer queue for compressed data
lzma_outq outq;
/// How much memory to allocate for each lzma_outbuf.buf
size_t outbuf_alloc_size;
/// Maximum wait time if cannot use all the input and cannot
/// fill the output buffer. This is in milliseconds.
@ -196,7 +199,7 @@ worker_error(worker_thread *thr, lzma_ret ret)
static worker_state
worker_encode(worker_thread *thr, worker_state state)
worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
{
assert(thr->progress_in == 0);
assert(thr->progress_out == 0);
@ -205,7 +208,7 @@ worker_encode(worker_thread *thr, worker_state state)
thr->block_options = (lzma_block){
.version = 0,
.check = thr->coder->stream_flags.check,
.compressed_size = thr->coder->outq.buf_size_max,
.compressed_size = thr->outbuf->allocated,
.uncompressed_size = thr->coder->block_size,
// TODO: To allow changing the filter chain, the filters
@ -234,12 +237,12 @@ worker_encode(worker_thread *thr, worker_state state)
size_t in_pos = 0;
size_t in_size = 0;
thr->outbuf->size = thr->block_options.header_size;
const size_t out_size = thr->coder->outq.buf_size_max;
*out_pos = thr->block_options.header_size;
const size_t out_size = thr->outbuf->allocated;
do {
mythread_sync(thr->mutex) {
// Store in_pos and out_pos into *thr so that
// Store in_pos and *out_pos into *thr so that
// an application may read them via
// lzma_get_progress() to get progress information.
//
@ -247,7 +250,7 @@ worker_encode(worker_thread *thr, worker_state state)
// finishes. Instead, the final values are taken
// later from thr->outbuf.
thr->progress_in = in_pos;
thr->progress_out = thr->outbuf->size;
thr->progress_out = *out_pos;
while (in_size == thr->in_size
&& thr->state == THR_RUN)
@ -277,8 +280,8 @@ worker_encode(worker_thread *thr, worker_state state)
ret = thr->block_encoder.code(
thr->block_encoder.coder, thr->allocator,
thr->in, &in_pos, in_limit, thr->outbuf->buf,
&thr->outbuf->size, out_size, action);
} while (ret == LZMA_OK && thr->outbuf->size < out_size);
out_pos, out_size, action);
} while (ret == LZMA_OK && *out_pos < out_size);
switch (ret) {
case LZMA_STREAM_END:
@ -313,10 +316,10 @@ worker_encode(worker_thread *thr, worker_state state)
return state;
// Do the encoding. This takes care of the Block Header too.
thr->outbuf->size = 0;
*out_pos = 0;
ret = lzma_block_uncomp_encode(&thr->block_options,
thr->in, in_size, thr->outbuf->buf,
&thr->outbuf->size, out_size);
out_pos, out_size);
// It shouldn't fail.
if (ret != LZMA_OK) {
@ -367,11 +370,13 @@ worker_start(void *thr_ptr)
}
}
size_t out_pos = 0;
assert(state != THR_IDLE);
assert(state != THR_STOP);
if (state <= THR_FINISH)
state = worker_encode(thr, state);
state = worker_encode(thr, &out_pos, state);
if (state == THR_EXIT)
break;
@ -387,14 +392,17 @@ worker_start(void *thr_ptr)
}
mythread_sync(thr->coder->mutex) {
// Mark the output buffer as finished if
// no errors occurred.
thr->outbuf->finished = state == THR_FINISH;
// If no errors occurred, make the encoded data
// available to be copied out.
if (state == THR_FINISH) {
thr->outbuf->pos = out_pos;
thr->outbuf->finished = true;
}
// Update the main progress info.
thr->coder->progress_in
+= thr->outbuf->uncompressed_size;
thr->coder->progress_out += thr->outbuf->size;
thr->coder->progress_out += out_pos;
thr->progress_in = 0;
thr->progress_out = 0;
@ -519,6 +527,11 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
if (!lzma_outq_has_buf(&coder->outq))
return LZMA_OK;
// That's also true if we cannot allocate memory for the output
// buffer in the output queue.
return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
coder->outbuf_alloc_size));
// If there is a free structure on the stack, use it.
mythread_sync(coder->mutex) {
if (coder->threads_free != NULL) {
@ -541,7 +554,7 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
mythread_sync(coder->thr->mutex) {
coder->thr->state = THR_RUN;
coder->thr->in_size = 0;
coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
mythread_cond_signal(&coder->thr->cond);
}
@ -704,7 +717,7 @@ stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
}
// Try to read compressed data to out[].
ret = lzma_outq_read(&coder->outq,
ret = lzma_outq_read(&coder->outq, allocator,
out, out_pos, out_size,
&unpadded_size,
&uncompressed_size);
@ -951,7 +964,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
&block_size, &outbuf_size_max));
#if SIZE_MAX < UINT64_MAX
if (block_size > SIZE_MAX)
if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
return LZMA_MEM_ERROR;
#endif
@ -1012,6 +1025,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Basic initializations
coder->sequence = SEQ_STREAM_HEADER;
coder->block_size = (size_t)(block_size);
coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
coder->thread_error = LZMA_OK;
coder->thr = NULL;
@ -1041,7 +1055,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Output queue
return_if_error(lzma_outq_init(&coder->outq, allocator,
outbuf_size_max, options->threads));
options->threads));
// Timeout
coder->timeout = options->timeout;