diff --git a/Lib/test/test_free_threading/test_lzma.py b/Lib/test/test_free_threading/test_lzma.py new file mode 100644 index 00000000000000..b93718f4834f35 --- /dev/null +++ b/Lib/test/test_free_threading/test_lzma.py @@ -0,0 +1,56 @@ +import unittest + +from test.support import import_helper, threading_helper +from test.support.threading_helper import run_concurrently + +lzma = import_helper.import_module("lzma") +from lzma import LZMACompressor, LZMADecompressor + +from test.test_lzma import INPUT + + +NTHREADS = 10 + + +@threading_helper.requires_working_threading() +class TestLZMA(unittest.TestCase): + def test_compressor(self): + lzc = LZMACompressor() + + # First compress() outputs LZMA header + header = lzc.compress(INPUT) + self.assertGreater(len(header), 0) + + def worker(): + # it should return empty bytes as it buffers data internally + data = lzc.compress(INPUT) + self.assertEqual(data, b"") + + run_concurrently(worker_func=worker, nthreads=NTHREADS - 1) + full_compressed = header + lzc.flush() + decompressed = lzma.decompress(full_compressed) + # The decompressed data should be INPUT repeated NTHREADS times + self.assertEqual(decompressed, INPUT * NTHREADS) + + def test_decompressor(self): + chunk_size = 128 + chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)] + input_data = b"".join(chunks) + compressed = lzma.compress(input_data) + + lzd = LZMADecompressor() + output = [] + + def worker(): + data = lzd.decompress(compressed, chunk_size) + self.assertEqual(len(data), chunk_size) + output.append(data) + + run_concurrently(worker_func=worker, nthreads=NTHREADS) + self.assertEqual(len(output), NTHREADS) + # Verify the expected chunks (order doesn't matter due to append race) + self.assertEqual(set(output), set(chunks)) + + +if __name__ == "__main__": + unittest.main() diff --git a/Modules/_lzmamodule.c b/Modules/_lzmamodule.c index 6fc072f6d0a382..5876623399837b 100644 --- a/Modules/_lzmamodule.c +++ b/Modules/_lzmamodule.c @@ -72,13 +72,6 @@ OutputBuffer_OnError(_BlocksOutputBuffer *buffer) } -#define ACQUIRE_LOCK(obj) do { \ - if (!PyThread_acquire_lock((obj)->lock, 0)) { \ - Py_BEGIN_ALLOW_THREADS \ - PyThread_acquire_lock((obj)->lock, 1); \ - Py_END_ALLOW_THREADS \ - } } while (0) -#define RELEASE_LOCK(obj) PyThread_release_lock((obj)->lock) typedef struct { PyTypeObject *lzma_compressor_type; @@ -111,7 +104,7 @@ typedef struct { lzma_allocator alloc; lzma_stream lzs; int flushed; - PyThread_type_lock lock; + PyMutex mutex; } Compressor; typedef struct { @@ -124,7 +117,7 @@ typedef struct { char needs_input; uint8_t *input_buffer; size_t input_buffer_size; - PyThread_type_lock lock; + PyMutex mutex; } Decompressor; #define Compressor_CAST(op) ((Compressor *)(op)) @@ -617,14 +610,14 @@ _lzma_LZMACompressor_compress_impl(Compressor *self, Py_buffer *data) { PyObject *result = NULL; - ACQUIRE_LOCK(self); + PyMutex_Lock(&self->mutex); if (self->flushed) { PyErr_SetString(PyExc_ValueError, "Compressor has been flushed"); } else { result = compress(self, data->buf, data->len, LZMA_RUN); } - RELEASE_LOCK(self); + PyMutex_Unlock(&self->mutex); return result; } @@ -644,14 +637,14 @@ _lzma_LZMACompressor_flush_impl(Compressor *self) { PyObject *result = NULL; - ACQUIRE_LOCK(self); + PyMutex_Lock(&self->mutex); if (self->flushed) { PyErr_SetString(PyExc_ValueError, "Repeated call to flush()"); } else { self->flushed = 1; result = compress(self, NULL, 0, LZMA_FINISH); } - RELEASE_LOCK(self); + PyMutex_Unlock(&self->mutex); return result; } @@ -820,12 +813,7 @@ Compressor_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) self->alloc.free = PyLzma_Free; self->lzs.allocator = &self->alloc; - self->lock = PyThread_allocate_lock(); - if (self->lock == NULL) { - Py_DECREF(self); - PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock"); - return NULL; - } + self->mutex = (PyMutex){0}; self->flushed = 0; switch (format) { @@ -867,10 +855,8 @@ static void Compressor_dealloc(PyObject *op) { Compressor *self = Compressor_CAST(op); + assert(!PyMutex_IsLocked(&self->mutex)); lzma_end(&self->lzs); - if (self->lock != NULL) { - PyThread_free_lock(self->lock); - } PyTypeObject *tp = Py_TYPE(self); tp->tp_free(self); Py_DECREF(tp); @@ -1146,12 +1132,12 @@ _lzma_LZMADecompressor_decompress_impl(Decompressor *self, Py_buffer *data, { PyObject *result = NULL; - ACQUIRE_LOCK(self); + PyMutex_Lock(&self->mutex); if (self->eof) PyErr_SetString(PyExc_EOFError, "Already at end of stream"); else result = decompress(self, data->buf, data->len, max_length); - RELEASE_LOCK(self); + PyMutex_Unlock(&self->mutex); return result; } @@ -1244,12 +1230,7 @@ _lzma_LZMADecompressor_impl(PyTypeObject *type, int format, self->lzs.allocator = &self->alloc; self->lzs.next_in = NULL; - self->lock = PyThread_allocate_lock(); - if (self->lock == NULL) { - Py_DECREF(self); - PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock"); - return NULL; - } + self->mutex = (PyMutex){0}; self->check = LZMA_CHECK_UNKNOWN; self->needs_input = 1; @@ -1304,14 +1285,13 @@ static void Decompressor_dealloc(PyObject *op) { Decompressor *self = Decompressor_CAST(op); + assert(!PyMutex_IsLocked(&self->mutex)); + if(self->input_buffer != NULL) PyMem_Free(self->input_buffer); lzma_end(&self->lzs); Py_CLEAR(self->unused_data); - if (self->lock != NULL) { - PyThread_free_lock(self->lock); - } PyTypeObject *tp = Py_TYPE(self); tp->tp_free(self); Py_DECREF(tp);