diff --git a/src/liblzma/common/stream_decoder_mt.c b/src/liblzma/common/stream_decoder_mt.c index 1fd7dd85..47433de8 100644 --- a/src/liblzma/common/stream_decoder_mt.c +++ b/src/liblzma/common/stream_decoder_mt.c @@ -300,12 +300,25 @@ struct lzma_stream_coder { /// Stream Padding is a multiple of four bytes. bool concatenated; + /// When decoding concatenated Streams, this is true as long as we /// are decoding the first Stream. This is needed to avoid misleading /// LZMA_FORMAT_ERROR in case the later Streams don't have valid magic /// bytes. bool first_stream; + /// This is used to track if the previous call to stream_decode_mt() + /// had output space (*out_pos < out_size) and managed to fill the + /// output buffer (*out_pos == out_size). This may be set to true + /// in read_output_and_wait(). This is read and then reset to false + /// at the beginning of stream_decode_mt(). + /// + /// This is needed to support applications that call lzma_code() in + /// such a way that more input is provided only when lzma_code() + /// didn't fill the output buffer completely. Basically, this makes + /// it easier to convert such applications from single-threaded + /// decoder to multi-threaded decoder. + bool out_was_filled; /// Write position in buffer[] and position in Stream Padding size_t pos; @@ -656,6 +669,7 @@ read_output_and_wait(struct lzma_stream_coder *coder, do { // Get as much output from the queue as is possible // without blocking. + const size_t out_start = *out_pos; do { ret = lzma_outq_read(&coder->outq, allocator, out, out_pos, out_size, @@ -683,6 +697,14 @@ read_output_and_wait(struct lzma_stream_coder *coder, if (ret != LZMA_OK) break; + // If the output buffer is now full but it wasn't full + // when this function was called, set out_was_filled. + // This way the next call to stream_decode_mt() knows + // that some output was produced and no output space + // remained in the previous call to stream_decode_mt(). + if (*out_pos == out_size && *out_pos != out_start) + coder->out_was_filled = true; + // Check if any thread has indicated an error. if (coder->thread_error != LZMA_OK) { if (coder->pending_error == LZMA_OK) @@ -949,11 +971,39 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, { struct lzma_stream_coder *coder = coder_ptr; - const size_t in_start = *in_pos; - mythread_condtime wait_abs; bool has_blocked = false; + // Determine if in SEQ_BLOCK_HEADER and SEQ_BLOCK_THR_RUN we should + // tell read_output_and_wait() to wait until it can fill the output + // buffer (or a timeout occurs). Two conditions must be met: + // + // (1) If the caller provided no new input. The reason for this + // can be, for example, the end of the file or that there is + // a pause in the input stream and more input is available + // a little later. In this situation we should wait for output + // because otherwise we would end up in a busy-waiting loop where + // we make no progress and the application just calls us again + // without providing any new input. This would then result in + // LZMA_BUF_ERROR even though more output would be available + // once the worker threads decode more data. + // + // (2) Even if (1) is true, we will not wait if the previous call to + // this function managed to produce some output and the output + // buffer became full. This is for compatibility with applications + // that call lzma_code() in such a way that new input is provided + // only when the output buffer didn't become full. Without this + // trick such applications would have bad performance (bad + // parallelization due to decoder not getting input fast enough). + // + // NOTE: Such loops might require that timeout is disabled (0) + // if they assume that output-not-full implies that all input has + // been consumed. If and only if timeout is enabled, we may return + // when output isn't full *and* not all input has been consumed. + const bool waiting_allowed = *in_pos == in_size + && !coder->out_was_filled; + coder->out_was_filled = false; + while (true) switch (coder->sequence) { case SEQ_STREAM_HEADER: { @@ -1030,11 +1080,11 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, // without a delay. // // On the other hand, if lzma_code() was called with - // an empty input buffer (in_start == in_size), treat - // it specially: try to fill the output buffer even - // if it requires waiting for the worker threads to - // provide output (timeout, if specified, can still - // cause us to return). + // an empty input buffer(*), treat it specially: try + // to fill the output buffer even if it requires + // waiting for the worker threads to provide output + // (timeout, if specified, can still cause us to + // return). // // - This way the application will be able to get all // data that can be decoded from the input provided @@ -1049,11 +1099,15 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, // anything and will return LZMA_OK immediately // (coder->timeout is completely ignored). // + // (*) See the comment at the beginning of this + // function how waiting_allowed is determined + // and why there is an exception to the rule + // of "called with an empty input buffer". assert(*in_pos == in_size); return_if_error(read_output_and_wait(coder, allocator, out, out_pos, out_size, - NULL, in_start == in_size, + NULL, waiting_allowed, &wait_abs, &has_blocked)); if (coder->pending_error != LZMA_OK) { @@ -1403,10 +1457,11 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, // Read output from the output queue. Just like in // SEQ_BLOCK_HEADER, we wait to fill the output buffer - // only if lzma_code() was called without providing any input. + // only if waiting_allowed was set to true in the beginning + // of this function (see the comment there). return_if_error(read_output_and_wait(coder, allocator, out, out_pos, out_size, - NULL, in_start == in_size, + NULL, waiting_allowed, &wait_abs, &has_blocked)); if (coder->pending_error != LZMA_OK) { @@ -1823,7 +1878,9 @@ stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, coder->tell_any_check = (options->flags & LZMA_TELL_ANY_CHECK) != 0; coder->ignore_check = (options->flags & LZMA_IGNORE_CHECK) != 0; coder->concatenated = (options->flags & LZMA_CONCATENATED) != 0; + coder->first_stream = true; + coder->out_was_filled = false; coder->pos = 0; coder->threads_max = options->threads;