/////////////////////////////////////////////////////////////////////////////// // /// \file process.c /// \brief Compresses or uncompresses a file // // Copyright (C) 2007 Lasse Collin // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public // License as published by the Free Software Foundation; either // version 2.1 of the License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // Lesser General Public License for more details. // /////////////////////////////////////////////////////////////////////////////// #include "private.h" typedef struct { lzma_stream strm; void *options; file_pair *pair; /// We don't need this for *anything* but seems that at least with /// glibc pthread_create() doesn't allow NULL. pthread_t thread; bool in_use; } thread_data; /// Number of available threads static size_t free_threads; /// Thread-specific data static thread_data *threads; static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; /// Attributes of new coder threads. They are created in detached state. /// Coder threads signal to the service thread themselves when they are done. static pthread_attr_t thread_attr; ////////// // Init // ////////// extern void process_init(void) { threads = malloc(sizeof(thread_data) * opt_threads); if (threads == NULL) { out_of_memory(); my_exit(ERROR); } for (size_t i = 0; i < opt_threads; ++i) threads[i] = (thread_data){ .strm = LZMA_STREAM_INIT_VAR, .options = NULL, .pair = NULL, .in_use = false, }; if (pthread_attr_init(&thread_attr) || pthread_attr_setdetachstate( &thread_attr, PTHREAD_CREATE_DETACHED)) { out_of_memory(); my_exit(ERROR); } free_threads = opt_threads; return; } ////////////////////////// // Thread-specific data // ////////////////////////// static thread_data * get_thread_data(void) { pthread_mutex_lock(&mutex); while (free_threads == 0) { pthread_cond_wait(&cond, &mutex); if (user_abort) { pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); return NULL; } } thread_data *t = threads; while (t->in_use) ++t; t->in_use = true; --free_threads; pthread_mutex_unlock(&mutex); return t; } static void release_thread_data(thread_data *t) { pthread_mutex_lock(&mutex); t->in_use = false; ++free_threads; pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); return; } static int create_thread(void *(*func)(thread_data *t), thread_data *t) { if (opt_threads == 1) { func(t); } else { const int err = pthread_create(&t->thread, &thread_attr, (void *(*)(void *))(func), t); if (err) { errmsg(V_ERROR, _("Cannot create a thread: %s"), strerror(err)); user_abort = 1; return -1; } } return 0; } ///////////////////////// // One thread per file // ///////////////////////// static int single_init(thread_data *t) { lzma_ret ret; if (opt_mode == MODE_COMPRESS) { if (opt_header == HEADER_ALONE) { ret = lzma_alone_encoder(&t->strm, opt_filters[0].options); } else { ret = lzma_stream_encoder(&t->strm, opt_filters, opt_check); } } else { // TODO Restrict file format if requested on the command line. ret = lzma_auto_decoder(&t->strm); } if (ret != LZMA_OK) { if (ret == LZMA_MEM_ERROR) out_of_memory(); else internal_error(); return -1; } return 0; } static lzma_ret single_skip_padding(thread_data *t, uint8_t *in_buf) { // Handle decoding of concatenated Streams. There can be arbitrary // number of nul-byte padding between the Streams, which must be // ignored. // // NOTE: Concatenating LZMA_Alone files works only if at least // one of lc, lp, and pb is non-zero. Using the concatenation // on LZMA_Alone files is strongly discouraged. while (true) { while (t->strm.avail_in > 0) { if (*t->strm.next_in != '\0') return LZMA_OK; ++t->strm.next_in; --t->strm.avail_in; } if (t->pair->src_eof) return LZMA_STREAM_END; t->strm.next_in = in_buf; t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ); if (t->strm.avail_in == SIZE_MAX) return LZMA_DATA_ERROR; } } static void * single(thread_data *t) { if (single_init(t)) { io_close(t->pair, false); release_thread_data(t); return NULL; } uint8_t in_buf[BUFSIZ]; uint8_t out_buf[BUFSIZ]; lzma_action action = LZMA_RUN; lzma_ret ret; bool success = false; t->strm.avail_in = 0; while (!user_abort) { if (t->strm.avail_in == 0 && !t->pair->src_eof) { t->strm.next_in = in_buf; t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ); if (t->strm.avail_in == SIZE_MAX) break; else if (t->pair->src_eof && opt_mode == MODE_COMPRESS) action = LZMA_FINISH; } t->strm.next_out = out_buf; t->strm.avail_out = BUFSIZ; ret = lzma_code(&t->strm, action); if (opt_mode != MODE_TEST) if (io_write(t->pair, out_buf, BUFSIZ - t->strm.avail_out)) break; if (ret != LZMA_OK) { if (ret == LZMA_STREAM_END) { if (opt_mode == MODE_COMPRESS) { assert(t->pair->src_eof); success = true; break; } // Support decoding concatenated .lzma files. ret = single_skip_padding(t, in_buf); if (ret == LZMA_STREAM_END) { assert(t->pair->src_eof); success = true; break; } if (ret == LZMA_OK && !single_init(t)) continue; break; } else { errmsg(V_ERROR, "%s: %s", t->pair->src_name, str_strm_error(ret)); break; } } } io_close(t->pair, success); release_thread_data(t); return NULL; } /////////////////////////////// // Multiple threads per file // /////////////////////////////// // TODO // I'm not sure what would the best way to implement this. Here's one // possible way: // - Reader thread would read the input data and control the coders threads. // - Every coder thread is associated with input and output buffer pools. // The input buffer pool is filled by reader thread, and the output buffer // pool is emptied by the writer thread. // - Writer thread writes the output data of the oldest living coder thread. // // The per-file thread started by the application's main thread is used as // the reader thread. In the beginning, it starts the writer thread and the // first coder thread. The coder thread would be left waiting for input from // the reader thread, and the writer thread would be waiting for input from // the coder thread. // // The reader thread reads the input data into a ring buffer, whose size // depends on the value returned by lzma_chunk_size(). If the ring buffer // gets full, the buffer is marked "to be finished", which indicates to // the coder thread that no more input is coming. Then a new coder thread // would be started. // // TODO /* typedef struct { /// Buffers uint8_t (*buffers)[BUFSIZ]; /// Number of buffers size_t buffer_count; /// buffers[read_pos] is the buffer currently being read. Once finish /// is true and read_pos == write_pos, end of input has been reached. size_t read_pos; /// buffers[write_pos] is the buffer into which data is currently /// being written. size_t write_pos; /// This variable matters only when read_pos == write_pos && finish. /// In that case, this variable will contain the size of the /// buffers[read_pos]. size_t last_size; /// True once no more data is being written to the buffer. When this /// is set, the last_size variable must have been set too. bool finish; /// Mutex to protect access to the variables in this structure pthread_mutex_t mutex; /// Condition to indicate when another thread can continue pthread_cond_t cond; } mem_pool; static foo multi_reader(thread_data *t) { bool done = false; do { const size_t size = io_read(t->pair, m->buffers + m->write_pos, BUFSIZ); if (size == SIZE_MAX) { // TODO } else if (t->pair->src_eof) { m->last_size = size; } pthread_mutex_lock(&m->mutex); if (++m->write_pos == m->buffer_count) m->write_pos = 0; if (m->write_pos == m->read_pos || t->pair->src_eof) m->finish = true; pthread_cond_signal(&m->cond); pthread_mutex_unlock(&m->mutex); } while (!m->finish); return done ? 0 : -1; } static foo multi_code() { lzma_action = LZMA_RUN; while (true) { pthread_mutex_lock(&m->mutex); while (m->read_pos == m->write_pos && !m->finish) pthread_cond_wait(&m->cond, &m->mutex); pthread_mutex_unlock(&m->mutex); if (m->finish) { t->strm.avail_in = m->last_size; if (opt_mode == MODE_COMPRESS) action = LZMA_FINISH; } else { t->strm.avail_in = BUFSIZ; } t->strm.next_in = m->buffers + m->read_pos; const lzma_ret ret = lzma_code(&t->strm, action); } } */ /////////////////////// // Starting new file // /////////////////////// extern void process_file(const char *filename) { thread_data *t = get_thread_data(); if (t == NULL) return; // User abort // If this fails, it shows appropriate error messages too. t->pair = io_open(filename); if (t->pair == NULL) { release_thread_data(t); return; } // TODO Currently only one-thread-per-file mode is implemented. if (create_thread(&single, t)) { io_close(t->pair, false); release_thread_data(t); } return; }