Skip to content

1.0.3 框架支持的函数调度并发模式种类

ydf0509 edited this page Apr 20, 2021 · 1 revision
1、threading 多线程,使用自定义的可缩小、节制开启新线程的自定义线程池,不是直接用官方内置concurrent.futures.ThreadpoolExecutor
   此线程池非常智能,配合qps参数,任何场景可以无脑开500线程,真正的做到智能扩张,智能自动缩小。
   这线程池是智能线程池,由于非常好用,为这个线程池做了独立的pypi包,可以单独用于没有使用此框架的项目。
2、gevent    需要在运行起点的脚本首行打 gevent 猴子补丁。
3、eventlet  需要在运行起点的脚本首行打 eventlet 猴子补丁。
4、asyncio  async异步,主要是针对消费函数已经定义成了   async def fun(x)  这种情况,这种情况不能直接使用多线程,
   因为执行  fun(1)  后得到的并不是所想象的函数最终结果,而是得到的一个协程对象,所以针对已经定义成异步函数了的,需要使用此种并发模式。
   框架不鼓励用户定义异步函数,你就用同步的直观方式思维定义函数就行了,其余的并发调度交给框架就行了。
5、开启多进程启动多个consumer,此模式是 多进程  + 上面4种的其中一种并发方式,充分利用多核和充分利用io,用法如下。可以实现 多进程 叠加 协程并发。
# 这种是多进程方式,一次编写能够兼容win和linux的运行。
from function_scheduling_distributed_framework import task_deco, BrokerEnum, ConcurrentModeEnum, run_consumer_with_multi_process
import os

@task_deco('test_multi_process_queue',broker_kind=BrokerEnum.REDIS_ACK_ABLE,
           concurrent_mode=ConcurrentModeEnum.THREADING,)
def fff(x):
    print(x * 10,os.getpid())

if __name__ == '__main__':
    # run_many_consumer_with_multi_process([consumer_init_params_include_consuming_function(fff)],process_num=6)
    run_consumer_with_multi_process(fff,6)