Metadata-Version: 2.1
Name: tqueue
Version: 0.0.16
Summary: Threading Queue
Project-URL: Homepage, https://github.com/haiz/tqueue
Project-URL: Documentation, https://github.com/haiz/tqueue/blob/main/README.md
Project-URL: Changelog, https://github.com/haiz/tqueue/blob/main/CHANGELOG.md
Project-URL: Examples, https://github.com/haiz/tqueue/blob/main/examples
Project-URL: Source, https://github.com/haiz/tqueue
Author-email: Hai Cao <cthai83@gmail.com>
License: MIT License
License-File: LICENSE.txt
Keywords: queue,threading
Classifier: Development Status :: 4 - Beta
Classifier: Environment :: Web Environment
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.7
Provides-Extra: test
Requires-Dist: pytest>=6.0; extra == 'test'
Description-Content-Type: text/markdown

# tqueue package

This library allow to do you tasks in multiple threads easily.

This is helpful when you have a lot of data to processing.

Asume that you have a large list of item to process. You need to write a producer to put items to queue one by one.

Workers will get data from queue then process it. Putting data to queue should be quicker then processing it.

### Installation

```bash
pip install tqueue
```


### Usage
1. Import library
```python
from tqueue import ThreadingQueue
```
2. Create worker
- Create worker function that get the data as the first parameter
- Worker can be a normal function or a coroutine function
- Worker will be called in child threads

```python
def worker(data):
    pass
async def worker2(data):
    pass
```

3. Set threading for a producer
Apply the threading for a producer:
- a. Set the number of threads and the worker
- b. Put data to queue

- You can also use ThreadingQueue like a context manager

```python
def producer():
    # Start the queue
    with ThreadingQueue(40, worker) as tq:
        ...
        tq.put(data)
```

- You can also use it async

```python
async def producer():
    # Start the queue
    async with ThreadingQueue(40, worker) as tq:
        ...
        await tq.put(data)
```

4. Run producer
```python
await producer()
```
or
```python
asyncio.run(producer())
```


### Note
1. You can add more keyword params for all workers running in threads via `worker_params`
2. Apart from number of threads and the worker, you can set `log_dir` to store logs to file 
3. and `worker_params_builder` to generate parameters for each worker.
4. `on_thread_close` is an optional param as a function that is helpful when you need to close the database connection when a thread done
5. Apart from all above params, the rest of keyword params will be pass to the worker. 

### Example

```python
import json
import pymysql
import asyncio

from tqueue import ThreadingQueue


NUM_OF_THREADS = 40


def get_db_connection():
    return pymysql.connect(host='localhost',
                           user='root',
                           password='123456',
                           database='example',
                           cursorclass=pymysql.cursors.DictCursor)


# Build params for worker, the params will be persistent with thread
# This function is called when init a new thread or retry
def worker_params_builder():
    # Threads use db connection separately
    conn = get_db_connection()
    conn.autocommit(1)
    cursor = conn.cursor()
    return {"cursor": cursor, "connection": conn}


# To clear resources: close database connection, ...
# This function is called when the thread ends
def on_close_thread(cursor, connection):
    cursor.close()
    connection.close()


def worker(image_info, cursor, uid: int, **kwargs):
    # Update image info into database
    
    sql = "UPDATE images SET width = %s, height = %s, uid = %s WHERE id = %s"
    cursor.execute(sql, (image_info["width"], image_info["height"], uid, image_info["id"]))
    

def producer(source_file: str):
    with ThreadingQueue(
        NUM_OF_THREADS, worker,
        log_dir=f"logs/update-images",
        worker_params_builder=worker_params_builder,
        on_close_thread=on_close_thread,
        params={"uid": 123},
        retry_count=1
    ) as tq:
        with open(source_file, 'r') as f:
            for line in f:
                if not line:
                    continue
                data = json.loads(line)
    
                tq.put(data)


if __name__ == "__main__":
    producer("images.jsonl")
```

### Development

#### Build project

1. Update the version number in file `src/tqueue/__version__.py`
2. Update Change log
3. Build and publish the changes

```bash
python3 -m build
python3 -m twine upload dist/*
```

## Release Information

### Fixed

* No exception when log to file anymore


---

[Full changelog](https://github.com/haiz/tqueue/blob/main/CHANGELOG.md)
