liblzma: Add lzma_filters_update() support to the multi-threaded encoder.

A tiny downside of this is that now a 1-4 tiny allocations are made
for every Block because each worker thread needs its own copy of
the filter chain.
This commit is contained in:
Lasse Collin 2022-11-24 16:25:10 +02:00
parent 17ac51e689
commit 93439cfafe
2 changed files with 106 additions and 20 deletions

View File

@ -226,21 +226,27 @@ extern LZMA_API(lzma_ret) lzma_raw_decoder(
/** /**
* \brief Update the filter chain in the encoder * \brief Update the filter chain in the encoder
* *
* This function is for advanced users only. This function has two slightly * This function may be called after lzma_code() has returned LZMA_STREAM_END
* different purposes: * when LZMA_FULL_BARRIER, LZMA_FULL_FLUSH, or LZMA_SYNC_FLUSH was used:
* *
* - After LZMA_FULL_FLUSH when using Stream encoder: Set a new filter * - After LZMA_FULL_BARRIER or LZMA_FULL_FLUSH: Single-threaded .xz Stream
* chain, which will be used starting from the next Block. * encoder (lzma_stream_encoder()) and (since liblzma 5.4.0) multi-threaded
* Stream encoder (lzma_stream_encoder_mt()) allow setting a new filter
* chain to be used for the next Block(s).
* *
* - After LZMA_SYNC_FLUSH using Raw, Block, or Stream encoder: Change * - After LZMA_SYNC_FLUSH: Raw encoder (lzma_raw_encoder()),
* the filter-specific options in the middle of encoding. The actual * Block encocder (lzma_block_encoder()), and single-threaded .xz Stream
* filters in the chain (Filter IDs) cannot be changed. In the future, * encoder (lzma_stream_encoder()) allow changing certain filter-specific
* it might become possible to change the filter options without * options in the middle of encoding. The actual filters in the chain
* using LZMA_SYNC_FLUSH. * (Filter IDs) must not be changed! Currently only the lc, lp, and pb
* options of LZMA2 (not LZMA1) can be changed this way.
* *
* While rarely useful, this function may be called also when no data has * - In the future some filters might allow changing some of their options
* been compressed yet. In that case, this function will behave as if * without any barrier or flushing but currently such filters don't exist.
* LZMA_FULL_FLUSH (Stream encoder) or LZMA_SYNC_FLUSH (Raw or Block *
* This function may also be called when no data has been compressed yet
* although this is rarely useful. In that case, this function will behave
* as if LZMA_FULL_FLUSH (Stream encoders) or LZMA_SYNC_FLUSH (Raw or Block
* encoder) had been used right before calling this function. * encoder) had been used right before calling this function.
* *
* \return - LZMA_OK * \return - LZMA_OK

View File

@ -85,6 +85,11 @@ struct worker_thread_s {
/// Compression options for this Block /// Compression options for this Block
lzma_block block_options; lzma_block block_options;
/// Filter chain for this thread. By copying the filters array
/// to each thread it is possible to change the filter chain
/// between Blocks using lzma_filters_update().
lzma_filter filters[LZMA_FILTERS_MAX + 1];
/// Next structure in the stack of free worker threads. /// Next structure in the stack of free worker threads.
worker_thread *next; worker_thread *next;
@ -109,9 +114,22 @@ struct lzma_stream_coder_s {
/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
size_t block_size; size_t block_size;
/// The filter chain currently in use /// The filter chain to use for the next Block.
/// This can be updated using lzma_filters_update()
/// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
lzma_filter filters[LZMA_FILTERS_MAX + 1]; lzma_filter filters[LZMA_FILTERS_MAX + 1];
/// A copy of filters[] will be put here when attempting to get
/// a new worker thread. This will be copied to a worker thread
/// when a thread becomes free and then this cache is marked as
/// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
/// the filter options from filters[] would get uselessly copied
/// multiple times (allocated and freed) when waiting for a new free
/// worker thread.
///
/// This is freed if filters[] is updated via lzma_filters_update().
lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
/// Index to hold sizes of the Blocks /// Index to hold sizes of the Blocks
lzma_index *index; lzma_index *index;
@ -210,10 +228,7 @@ worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
.check = thr->coder->stream_flags.check, .check = thr->coder->stream_flags.check,
.compressed_size = thr->outbuf->allocated, .compressed_size = thr->outbuf->allocated,
.uncompressed_size = thr->coder->block_size, .uncompressed_size = thr->coder->block_size,
.filters = thr->filters,
// TODO: To allow changing the filter chain, the filters
// array must be copied to each worker_thread.
.filters = thr->coder->filters,
}; };
// Calculate maximum size of the Block Header. This amount is // Calculate maximum size of the Block Header. This amount is
@ -415,6 +430,8 @@ worker_start(void *thr_ptr)
} }
// Exiting, free the resources. // Exiting, free the resources.
lzma_filters_free(thr->filters, thr->allocator);
mythread_mutex_destroy(&thr->mutex); mythread_mutex_destroy(&thr->mutex);
mythread_cond_destroy(&thr->cond); mythread_cond_destroy(&thr->cond);
@ -498,6 +515,7 @@ initialize_new_thread(lzma_stream_coder *coder,
thr->progress_in = 0; thr->progress_in = 0;
thr->progress_out = 0; thr->progress_out = 0;
thr->block_encoder = LZMA_NEXT_CODER_INIT; thr->block_encoder = LZMA_NEXT_CODER_INIT;
thr->filters[0].id = LZMA_VLI_UNKNOWN;
if (mythread_create(&thr->thread_id, &worker_start, thr)) if (mythread_create(&thr->thread_id, &worker_start, thr))
goto error_thread; goto error_thread;
@ -532,6 +550,13 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator, return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
coder->outbuf_alloc_size)); coder->outbuf_alloc_size));
// Make a thread-specific copy of the filter chain. Put it in
// the cache array first so that if we cannot get a new thread yet,
// the allocation is ready when we try again.
if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
return_if_error(lzma_filters_copy(
coder->filters, coder->filters_cache, allocator));
// If there is a free structure on the stack, use it. // If there is a free structure on the stack, use it.
mythread_sync(coder->mutex) { mythread_sync(coder->mutex) {
if (coder->threads_free != NULL) { if (coder->threads_free != NULL) {
@ -555,6 +580,15 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
coder->thr->state = THR_RUN; coder->thr->state = THR_RUN;
coder->thr->in_size = 0; coder->thr->in_size = 0;
coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL); coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
// Free the old thread-specific filter options and replace
// them with the already-allocated new options from
// coder->filters_cache[]. Then mark the cache as empty.
lzma_filters_free(coder->thr->filters, allocator);
memcpy(coder->thr->filters, coder->filters_cache,
sizeof(coder->filters_cache));
coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
mythread_cond_signal(&coder->thr->cond); mythread_cond_signal(&coder->thr->cond);
} }
@ -867,6 +901,7 @@ stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
lzma_outq_end(&coder->outq, allocator); lzma_outq_end(&coder->outq, allocator);
lzma_filters_free(coder->filters, allocator); lzma_filters_free(coder->filters, allocator);
lzma_filters_free(coder->filters_cache, allocator);
lzma_next_end(&coder->index_encoder, allocator); lzma_next_end(&coder->index_encoder, allocator);
lzma_index_end(coder->index, allocator); lzma_index_end(coder->index, allocator);
@ -879,6 +914,45 @@ stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
} }
static lzma_ret
stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
const lzma_filter *filters,
const lzma_filter *reversed_filters
lzma_attribute((__unused__)))
{
lzma_stream_coder *coder = coder_ptr;
// Applications shouldn't attempt to change the options when
// we are already encoding the Index or Stream Footer.
if (coder->sequence > SEQ_BLOCK)
return LZMA_PROG_ERROR;
// For now the threaded encoder doesn't support changing
// the options in the middle of a Block.
if (coder->thr != NULL)
return LZMA_PROG_ERROR;
// Check if the filter chain seems mostly valid. See the comment
// in stream_encoder_mt_init().
if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
return LZMA_OPTIONS_ERROR;
// Make a copy to a temporary buffer first. This way the encoder
// state stays unchanged if an error occurs in lzma_filters_copy().
lzma_filter temp[LZMA_FILTERS_MAX + 1];
return_if_error(lzma_filters_copy(filters, temp, allocator));
// Free the options of the old chain as well as the cache.
lzma_filters_free(coder->filters, allocator);
lzma_filters_free(coder->filters_cache, allocator);
// Copy the new filter chain in place.
memcpy(coder->filters, temp, sizeof(temp));
return LZMA_OK;
}
/// Options handling for lzma_stream_encoder_mt_init() and /// Options handling for lzma_stream_encoder_mt_init() and
/// lzma_stream_encoder_mt_memusage() /// lzma_stream_encoder_mt_memusage()
static lzma_ret static lzma_ret
@ -977,7 +1051,9 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Validate the filter chain so that we can give an error in this // Validate the filter chain so that we can give an error in this
// function instead of delaying it to the first call to lzma_code(). // function instead of delaying it to the first call to lzma_code().
// The memory usage calculation verifies the filter chain as // The memory usage calculation verifies the filter chain as
// a side effect so we take advantage of that. // a side effect so we take advantage of that. It's not a perfect
// check though as raw encoder allows LZMA1 too but such problems
// will be caught eventually with Block Header encoder.
if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
return LZMA_OPTIONS_ERROR; return LZMA_OPTIONS_ERROR;
@ -1017,9 +1093,10 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
next->code = &stream_encode_mt; next->code = &stream_encode_mt;
next->end = &stream_encoder_mt_end; next->end = &stream_encoder_mt_end;
next->get_progress = &get_progress; next->get_progress = &get_progress;
// next->update = &stream_encoder_mt_update; next->update = &stream_encoder_mt_update;
coder->filters[0].id = LZMA_VLI_UNKNOWN; coder->filters[0].id = LZMA_VLI_UNKNOWN;
coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
coder->index_encoder = LZMA_NEXT_CODER_INIT; coder->index_encoder = LZMA_NEXT_CODER_INIT;
coder->index = NULL; coder->index = NULL;
memzero(&coder->outq, sizeof(coder->outq)); memzero(&coder->outq, sizeof(coder->outq));
@ -1066,8 +1143,11 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Timeout // Timeout
coder->timeout = options->timeout; coder->timeout = options->timeout;
// Free the old filter chain and copy the new one. // Free the old filter chain and the cache.
lzma_filters_free(coder->filters, allocator); lzma_filters_free(coder->filters, allocator);
lzma_filters_free(coder->filters_cache, allocator);
// Copy the new filter chain.
return_if_error(lzma_filters_copy( return_if_error(lzma_filters_copy(
filters, coder->filters, allocator)); filters, coder->filters, allocator));