Make the progress indicator smooth in threaded mode.

This adds lzma_get_progress() to liblzma and takes advantage
of it in xz.

lzma_get_progress() collects progress information from
the thread-specific structures so that fairly accurate
progress information is available to applications. Adding
a new function seemed to be a better way than making the
information directly available in lzma_stream (like total_in
and total_out are) because collecting the information requires
locking mutexes. It's waste of time to do it more often than
the up to date information is actually needed by an application.
This commit is contained in:
Lasse Collin 2012-12-14 20:13:32 +02:00
parent 2ebbb994e3
commit e7b424d267
6 changed files with 129 additions and 13 deletions

View File

@ -456,7 +456,8 @@ typedef struct lzma_internal_s lzma_internal;
*
* Application may modify the values of total_in and total_out as it wants.
* They are updated by liblzma to match the amount of data read and
* written, but aren't used for anything else.
* written but aren't used for anything else except as a possible return
* values from lzma_get_progress().
*/
typedef struct {
const uint8_t *next_in; /**< Pointer to the next input byte. */
@ -556,6 +557,25 @@ extern LZMA_API(lzma_ret) lzma_code(lzma_stream *strm, lzma_action action)
extern LZMA_API(void) lzma_end(lzma_stream *strm) lzma_nothrow;
/**
* \brief Get progress information
*
* In single-threaded mode, applications can get progress information from
* strm->total_in and strm->total_out. In multi-threaded mode this is less
* useful because a significant amount of both input and output data gets
* buffered internally by liblzma. This makes total_in and total_out give
* misleading information and also makes the progress indicator updates
* non-smooth.
*
* This function gives realistic progress information also in multi-threaded
* mode by taking into account the progress made by each thread. In
* single-threaded mode *progress_in and *progress_out are set to
* strm->total_in and strm->total_out, respectively.
*/
extern LZMA_API(void) lzma_get_progress(lzma_stream *strm,
uint64_t *progress_in, uint64_t *progress_out) lzma_nothrow;
/**
* \brief Get the memory usage of decoder filter chain
*

View File

@ -328,6 +328,22 @@ lzma_end(lzma_stream *strm)
}
extern LZMA_API(void)
lzma_get_progress(lzma_stream *strm,
uint64_t *progress_in, uint64_t *progress_out)
{
if (strm->internal->next.get_progress != NULL) {
strm->internal->next.get_progress(strm->internal->next.coder,
progress_in, progress_out);
} else {
*progress_in = strm->total_in;
*progress_out = strm->total_out;
}
return;
}
extern LZMA_API(lzma_check)
lzma_get_check(const lzma_stream *strm)
{

View File

@ -155,6 +155,11 @@ struct lzma_next_coder_s {
/// lzma_next_coder.coder.
lzma_end_function end;
/// Pointer to a function to get progress information. If this is NULL,
/// lzma_stream.total_in and .total_out are used instead.
void (*get_progress)(lzma_coder *coder,
uint64_t *progress_in, uint64_t *progress_out);
/// Pointer to function to return the type of the integrity check.
/// Most coders won't support this.
lzma_check (*get_check)(const lzma_coder *coder);
@ -180,6 +185,7 @@ struct lzma_next_coder_s {
.id = LZMA_VLI_UNKNOWN, \
.code = NULL, \
.end = NULL, \
.get_progress = NULL, \
.get_check = NULL, \
.memconfig = NULL, \
.update = NULL, \

View File

@ -71,6 +71,12 @@ struct worker_thread_s {
/// allocator before calling lzma_end().
const lzma_allocator *allocator;
/// Amount of uncompressed data that has already been compressed.
uint64_t progress_in;
/// Amount of compressed data that is ready.
uint64_t progress_out;
/// Block encoder
lzma_next_coder block_encoder;
@ -157,6 +163,16 @@ struct lzma_coder_s {
/// the new input from the application.
worker_thread *thr;
/// Amount of uncompressed data in Blocks that have already
/// been finished.
uint64_t progress_in;
/// Amount of compressed data in Stream Header + Blocks that
/// have already been finished.
uint64_t progress_out;
pthread_mutex_t mutex;
mythread_cond cond;
};
@ -183,6 +199,9 @@ worker_error(worker_thread *thr, lzma_ret ret)
static worker_state
worker_encode(worker_thread *thr, worker_state state)
{
assert(thr->progress_in == 0);
assert(thr->progress_out == 0);
// Set the Block options.
thr->block_options = (lzma_block){
.version = 0,
@ -221,17 +240,22 @@ worker_encode(worker_thread *thr, worker_state state)
do {
mythread_sync(thr->mutex) {
// Store in_pos and out_pos into *thr so that
// an application may read them via
// lzma_get_progress() to get progress information.
//
// NOTE: These aren't updated when the encoding
// finishes. Instead, the final values are taken
// later from thr->outbuf.
thr->progress_in = in_pos;
thr->progress_out = thr->outbuf->size;
while (in_size == thr->in_size
&& thr->state == THR_RUN)
pthread_cond_wait(&thr->cond, &thr->mutex);
state = thr->state;
in_size = thr->in_size;
// TODO? Store in_pos and out_pos into *thr here
// so that the application may read them via
// some currently non-existing function to get
// progress information.
}
// Return if we were asked to stop or exit.
@ -329,6 +353,13 @@ worker_start(void *thr_ptr)
// no errors occurred.
thr->outbuf->finished = state == THR_FINISH;
// Update the main progress info.
thr->coder->progress_in
+= thr->outbuf->uncompressed_size;
thr->coder->progress_out += thr->outbuf->size;
thr->progress_in = 0;
thr->progress_out = 0;
// Return this thread to the stack of free threads.
thr->next = thr->coder->threads_free;
thr->coder->threads_free = thr;
@ -417,6 +448,8 @@ initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
thr->state = THR_IDLE;
thr->allocator = allocator;
thr->coder = coder;
thr->progress_in = 0;
thr->progress_out = 0;
thr->block_encoder = LZMA_NEXT_CODER_INIT;
if (mythread_create(&thr->thread_id, &worker_start, thr))
@ -695,6 +728,13 @@ stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
&coder->index_encoder, allocator,
coder->index));
coder->sequence = SEQ_INDEX;
// Update the progress info to take the Index and
// Stream Footer into account. Those are very fast to encode
// so in terms of progress information they can be thought
// to be ready to be copied out.
coder->progress_out += lzma_index_size(coder->index)
+ LZMA_STREAM_HEADER_SIZE;
}
// Fall through
@ -810,6 +850,28 @@ get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
}
static void
get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out)
{
// Lock coder->mutex to prevent finishing threads from moving their
// progress info from the worker_thread structure to lzma_coder.
mythread_sync(coder->mutex) {
*progress_in = coder->progress_in;
*progress_out = coder->progress_out;
for (size_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
*progress_in += coder->threads[i].progress_in;
*progress_out += coder->threads[i]
.progress_out;
}
}
}
return;
}
static lzma_ret
stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
const lzma_mt *options)
@ -865,6 +927,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
next->code = &stream_encode_mt;
next->end = &stream_encoder_mt_end;
next->get_progress = &get_progress;
// next->update = &stream_encoder_mt_update;
next->coder->filters[0].id = LZMA_VLI_UNKNOWN;
@ -941,6 +1004,10 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
next->coder->header_pos = 0;
// Progress info
next->coder->progress_in = 0;
next->coder->progress_out = LZMA_STREAM_HEADER_SIZE;
return LZMA_OK;
}

View File

@ -97,6 +97,7 @@ global:
XZ_5.1.2alpha {
global:
lzma_get_progress;
lzma_stream_encoder_mt;
lzma_stream_encoder_mt_memusage;

View File

@ -526,20 +526,26 @@ progress_elapsed(void)
}
/// Get information about position in the stream. This is currently simple,
/// but it will become more complicated once we have multithreading support.
/// Get how much uncompressed and compressed data has been processed.
static void
progress_pos(uint64_t *in_pos,
uint64_t *compressed_pos, uint64_t *uncompressed_pos)
{
*in_pos = progress_strm->total_in;
uint64_t out_pos;
lzma_get_progress(progress_strm, in_pos, &out_pos);
// It cannot have processed more input than it has been given.
assert(*in_pos <= progress_strm->total_in);
// It cannot have produced more output than it claims to have ready.
assert(out_pos >= progress_strm->total_out);
if (opt_mode == MODE_COMPRESS) {
*compressed_pos = progress_strm->total_out;
*uncompressed_pos = progress_strm->total_in;
*compressed_pos = out_pos;
*uncompressed_pos = *in_pos;
} else {
*compressed_pos = progress_strm->total_in;
*uncompressed_pos = progress_strm->total_out;
*compressed_pos = *in_pos;
*uncompressed_pos = out_pos;
}
return;