diff --git a/src/xz/args.c b/src/xz/args.c index 9a4f82be..d71a23c4 100644 --- a/src/xz/args.c +++ b/src/xz/args.c @@ -140,6 +140,7 @@ parse_real(args_info *args, int argc, char **argv) OPT_NO_ADJUST, OPT_INFO_MEMORY, OPT_ROBOT, + OPT_FLUSH_TIMEOUT, }; static const char short_opts[] @@ -176,6 +177,7 @@ parse_real(args_info *args, int argc, char **argv) { "memory", required_argument, NULL, 'M' }, // Old alias { "no-adjust", no_argument, NULL, OPT_NO_ADJUST }, { "threads", required_argument, NULL, 'T' }, + { "flush-timeout", required_argument, NULL, OPT_FLUSH_TIMEOUT }, { "extreme", no_argument, NULL, 'e' }, { "fast", no_argument, NULL, '0' }, @@ -483,6 +485,11 @@ parse_real(args_info *args, int argc, char **argv) opt_auto_adjust = false; break; + case OPT_FLUSH_TIMEOUT: + opt_flush_timeout = str_to_uint64("flush-timeout", + optarg, 0, UINT64_MAX); + break; + default: message_try_help(); tuklib_exit(E_ERROR, E_ERROR, false); diff --git a/src/xz/coder.c b/src/xz/coder.c index d29e40f4..5d422d60 100644 --- a/src/xz/coder.c +++ b/src/xz/coder.c @@ -586,6 +586,9 @@ coder_normal(file_pair *pair) if (block_remaining == 0) action = LZMA_FULL_FLUSH; } + + if (action == LZMA_RUN && flush_needed) + action = LZMA_SYNC_FLUSH; } // Let liblzma do the actual work. @@ -601,21 +604,42 @@ coder_normal(file_pair *pair) strm.avail_out = IO_BUFFER_SIZE; } - if (ret == LZMA_STREAM_END && action == LZMA_FULL_FLUSH) { - // Start a new Block. - action = LZMA_RUN; + if (ret == LZMA_STREAM_END && (action == LZMA_SYNC_FLUSH + || action == LZMA_FULL_FLUSH)) { + // Flushing completed. Write the pending data out + // immediatelly so that the reading side can + // decompress everything compressed so far. Do this + // also with LZMA_FULL_FLUSH because if it is combined + // with timed LZMA_SYNC_FLUSH the same flushing + // timer can be used. + if (io_write(pair, &out_buf, IO_BUFFER_SIZE + - strm.avail_out)) + break; - if (opt_block_list == NULL) { - block_remaining = opt_block_size; - } else { - // FIXME: Make it work together with - // --block-size. - if (opt_block_list[list_pos + 1] != 0) - ++list_pos; + strm.next_out = out_buf.u8; + strm.avail_out = IO_BUFFER_SIZE; - block_remaining = opt_block_list[list_pos]; + if (action == LZMA_FULL_FLUSH) { + if (opt_block_list == NULL) { + block_remaining = opt_block_size; + } else { + // FIXME: Make it work together with + // --block-size. + if (opt_block_list[list_pos + 1] != 0) + ++list_pos; + + block_remaining + = opt_block_list[list_pos]; + } } + // Set the time of the most recent flushing. + mytime_set_flush_time(); + + // Start a new Block after LZMA_FULL_FLUSH or continue + // the same block after LZMA_SYNC_FLUSH. + action = LZMA_RUN; + } else if (ret != LZMA_OK) { // Determine if the return value indicates that we // won't continue coding. diff --git a/src/xz/file_io.c b/src/xz/file_io.c index 921f79d1..fb8d64bd 100644 --- a/src/xz/file_io.c +++ b/src/xz/file_io.c @@ -38,6 +38,13 @@ static bool warn_fchown; #endif +typedef enum { + IO_WAIT_MORE, // Reading or writing is possible. + IO_WAIT_ERROR, // Error or user_abort + IO_WAIT_TIMEOUT, // poll() timed out +} io_wait_ret; + + /// If true, try to create sparse files when decompressing. static bool try_sparse = true; @@ -130,8 +137,8 @@ io_no_sparse(void) /// pops up again. There are pselect() (POSIX-1.2001) and ppoll() (not in /// POSIX) but neither is portable enough in 2013. The self-pipe trick is /// old and very portable. -static bool -io_wait(file_pair *pair, bool is_reading) +static io_wait_ret +io_wait(file_pair *pair, int timeout, bool is_reading) { struct pollfd pfd[2]; @@ -147,10 +154,10 @@ io_wait(file_pair *pair, bool is_reading) pfd[1].events = POLLIN; while (true) { - const int ret = poll(pfd, 2, -1); + const int ret = poll(pfd, 2, timeout); if (user_abort) - return true; + return IO_WAIT_ERROR; if (ret == -1) { if (errno == EINTR || errno == EAGAIN) @@ -160,10 +167,17 @@ io_wait(file_pair *pair, bool is_reading) is_reading ? pair->src_name : pair->dest_name, strerror(errno)); + return IO_WAIT_ERROR; + } + + if (ret == 0) { + assert(opt_flush_timeout != 0); + flush_needed = true; + return IO_WAIT_TIMEOUT; } if (pfd[0].revents != 0) - return false; + return IO_WAIT_MORE; } } #endif @@ -583,10 +597,10 @@ io_open_src_real(file_pair *pair) // will work when open() is used with O_NONBLOCK. if (!S_ISREG(pair->src_st.st_mode)) { signals_unblock(); - const bool ret = io_wait(pair, true); + const io_wait_ret ret = io_wait(pair, -1, true); signals_block(); - if (ret) + if (ret != IO_WAIT_MORE) goto error; } #endif @@ -1001,10 +1015,22 @@ io_read(file_pair *pair, io_buf *buf_union, size_t size) #ifndef TUKLIB_DOSLIKE if (errno == EAGAIN || errno == EWOULDBLOCK) { - if (!io_wait(pair, true)) + const io_wait_ret ret = io_wait(pair, + mytime_get_flush_timeout(), + true); + switch (ret) { + case IO_WAIT_MORE: continue; - return SIZE_MAX; + case IO_WAIT_ERROR: + return SIZE_MAX; + + case IO_WAIT_TIMEOUT: + return size - left; + + default: + message_bug(); + } } #endif @@ -1077,7 +1103,7 @@ io_write_buf(file_pair *pair, const uint8_t *buf, size_t size) #ifndef TUKLIB_DOSLIKE if (errno == EAGAIN || errno == EWOULDBLOCK) { - if (!io_wait(pair, false)) + if (io_wait(pair, -1, false) == IO_WAIT_MORE) continue; return true;