Metadata-Version: 2.1
Name: onecat_task_queue
Version: 1.0.0
Summary: 简单异步任务队列。版本[1.0.0]更新内容：又是造轮子系列咯，Python上有很多成熟完善的异步任务队列框架可以用，比如Celery，或者RQ，不过这些都不自带消息队列服务，都需要使用Redis、RabbitMQ之类的消息队列才行，我用到小项目中又不需要附带这么多东西，于是自己动手来实现咯。
Home-page: https://gitee.com/deali/OneCat-TaskQueue
Author: DealiAxy
Author-email: dealiaxy@gmail.com
License: UNKNOWN
Description: 又是造轮子系列咯，Python上有很多成熟完善的异步任务队列框架可以用，比如Celery，或者RQ，不过这些都不自带消息队列服务，都需要使用Redis、RabbitMQ之类的消息队列才行，我用到小项目中又不需要附带这么多东西，于是自己动手来实现咯。
        
        ## 思路
        1. 将需要异步执行的任务添加到队列
        2. 自动从队列中取出任务，创建新线程执行
        3. 保存任务的执行结果和输出
        4. 任务完成，调用回调函数，处理返回的数据
        5. 使用输出重定向处理任务的输出
        
        下面分析一下几个关键部分的代码实现
        
        ## 优先级队列
        Python标准库已经有实现好的优先级队列了，但是当在相同优先级下传入同样的对象时，他会自动比较这些对象，不过我们的Task类没有实现相关的运算符重载，所以无法比较。
        
        要解决这个问题，要不就在Task里面实现运算符重载，要不就是自己实现一个优先级队列，我选择后者。
        
        ```python
        class PriorityQueue:
            """
            自己实现的优先级队列，使用了Python的堆
            """
            def __init__(self):
                self._index = 0
                self.queue = []
        
            def push(self, priority, val):
                heapq.heappush(self.queue, (priority, self._index, val))
                self._index += 1
        
            def pop(self):
                return heapq.heappop(self.queue)[-1]
        
            @property
            def empty(self):
                return len(self.queue) == 0
        ```
        
        ## 输出重定向
        其实这个挺坑的了，毕竟是异步任务队列，难免涉及到线程的竞争问题，又没办法单独控制每个任务的输出，不过我还是做了，小项目的话还是可以用的（其实就是懒得移植一下已有代码）
        
        代码如下，同样是使用队列来实现输出缓冲区，大小可以自己调整，默认支持输出到控制台、文件或者返回列表。
        
        一般重定向输出的话需要自己实现可以输出处理函数，直接给Redirection类的custom 属性赋值即可。
        
        ```python
        class Redirection:
            def __init__(self, buffer_size=512):
                self.buffer = Queue(maxsize=512)
                self._console = sys.stdout
                # 自定义的输出端
                self.custom = None
        
            def write(self, output_stream):
                # 加入缓冲区队列
                self.buffer.put(output_stream)
        
            def to_console(self):
                sys.stdout = self._console
                # 出列
                while not self.buffer.empty():
                    print(self.buffer.get())
        
            def to_file(self, file_path):
                with open(file_path, 'w+') as f:
                    sys.stdout = f
                    while not self.buffer.empty():
                        print(self.buffer.get())
                    f.close()
        
            def to_custom(self):
                while not self.buffer.empty():
                    self.custom(self.buffer.get())
        
            def to_list(self):
                data = []
                while not self.buffer.empty():
                    data.append(self.buffer.get())
                return data
        
            def flush(self):
                self.buffer.empty()
        
            def reset(self):
                sys.stdout = self._console
        ```
        
        ## Task类
        没什么好说的，定义任务类，Task，代码如下：
        
        ```python
        class Task:
            def __init__(self, func, callback=None, priority=Priority.MIDDLE, args=(), kwargs={}):
                """
                Args:
                    func: 需要执行的函数
                    callback:  执行完的回调函数
                    priority: 优先级
                    *args:
                    **kwargs:
                """
                self._id = uuid.uuid4().hex
                self.function = func
                self.callback = callback
                self.priority = priority
                self.args = args
                self.kwargs = kwargs
                # 任务运行过程的输出，stdout的输出
                self._outputs: Redirection = None
        
            @property
            def id(self):
                return self._id
        
            @property
            def outputs(self) -> Redirection:
                return self._outputs
        
            @outputs.setter
            def outputs(self, value: Redirection):
                self._outputs = value
        
            def run(self):
                try:
                    if self.callback:
                        # 回调函数原型 callback(task_obj, result)
                        result = self.callback(self, self.function(*self.args, **self.kwargs))
                    else:
                        result = self.function(*self.args, **self.kwargs)
                    return result
                except Exception as e:
                    if self.callback:
                        result = self.callback(self, e)
                    else:
                        result = e
                    return result
        ```
        
        使用起来很简单，如下：
        
        ```python
        def fun1(num1, num2):
            print(f'num1={num1}')
            print(f'num2={num2}')
            return num1 + num2
        
        Task(
            func=fun1,
            callback=lambda task, result: print(f'task result: {result}'),
        )
        
        # 也可以写成这样
        Task(
            func=lambda num1, num2: num1 + num2,
            callback=lambda task, result: print(f'task result: {result}'),
            args=[2, 3]
        )
        ```
        
        ## 任务队列
        最后是实现任务队列，也很简单，根据任务的优先级进行调度即可。
        
        ```python
        class TaskQueue:
            """
            基于线程的异步任务队列
            todo 下次做一个基于进程的队列，充分利用多核CPU性能
            """
        
            def __init__(self, output_redirect=False):
                self.queue = PriorityQueue()
                self.output_redirect = output_redirect
                self._redirect_objs = {}
                self._results = {}
        
            def put(self, task: Task):
                """
                将task加入任务列表
                Args:
                    task:
                Returns:返回task id
                """
                self.queue.push(task.priority, task)
                return task.id
        
            def get(self):
                return self.queue.pop()
        
            def run(self):
                while not self.queue.empty:
                    task = self.get()
                    # 开启新线程
                    t = threading.Thread(target=self._task_wrapper, name=task.id, args=[task])
                    self._log(f'Start thread {task.id}')
                    t.start()
        
            def get_output(self, task_id: str) -> Redirection:
                return self._redirect_objs.get(task_id, None)
        
            def get_result(self, task_id: str):
                return self._results.get(task_id, None)
        
            @staticmethod
            def _log(msg: str):
                """日志输出接口，可以替换为日志组件"""
                print(f'[TaskQueue] {msg}')
        
            def _task_wrapper(self, task: Task):
                if self.output_redirect:
                    if task.id in self._redirect_objs:
                        redirect_obj = self._redirect_objs[task.id]
                    else:
                        redirect_obj = Redirection(2048)
                        self._redirect_objs[task.id] = redirect_obj
                    # 重定向输出
                    sys.stdout = redirect_obj
                    task.outputs = redirect_obj
                    result = task.run()
                    # 恢复默认输出
                    redirect_obj.reset()
                    self._log(f'Task finished. {task.id}')
                else:
                    result = task.run()
        
                # 保存结果
                self._results[task.id] = result
        ```
        
        整个项目代码就在这了，很简单，也有很多不足的地方，不过小项目用用勉强还可以吧~
        
        过几天发布到pip，有需要的同学直接pip安装就可以使用，请关注博客更新。
        
        
        ## Demo
        附上使用任务队列的demo代码：
        ```python
        if __name__ == '__main__':
            task_queue = TaskQueue(output_redirect=True)
        
        
            def fun1():
                time.sleep(2)
                return 1
        
        
            def fun2():
                time.sleep(3)
                return 2
        
        
            task_queue.put(Task(
                func=fun1,
                callback=lambda task, result: print(f'task1 result: {result}'),
            ))
            task_queue.put(Task(
                func=fun2,
                callback=lambda task, result: print(f'task2 result: {result}'),
            ))
        
        
            def custom_output(msg):
                print(f'[custom] {msg}')
        
        
            def fun3(num1, num2):
                print(f'num1={num1}')
                print(f'num2={num2}')
                return num1 + num2
        
        
            def callback(task_obj, result):
                print(f'task3 result={result}')
                output = task_obj.outputs
                output.custom = custom_output
                output.reset()
                output.to_custom()
        
        
            task_queue.put(Task(func=fun3, callback=callback, args=[2, 3]))
        
            print('task queue run')
            task_queue.run()
            print('do other things...')
        
            for i in range(0, 100):
                print(i * i)
        ```
        
        ## 参考资料
        - 运算符重载 [https://blog.csdn.net/goodlixueyong/article/details/52589979](https://blog.csdn.net/goodlixueyong/article/details/52589979)
        - [https://www.open-open.com/code/view/1433410658322](https://www.open-open.com/code/view/1433410658322)
        - Celery异步任务队列 [https://www.cnblogs.com/StitchSun/p/8552488.html](https://www.cnblogs.com/StitchSun/p/8552488.html)
        - https://zhuanlan.zhihu.com/p/37637660
        
        
        ## 欢迎交流
        我整理了一系列的技术文章和资料，在公众号「程序设计实验室」后台回复 linux、flutter、c#、netcore、android、kotlin、java、python 等可获取相关技术文章和资料，同时有任何问题都可以在公众号后台留言~
        
        
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3.6
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Description-Content-Type: text/markdown
