-
Notifications
You must be signed in to change notification settings - Fork 89
5.常见问题回答
ydf0509 edited this page Apr 20, 2021
·
1 revision
答:与rq相比,rq只是基于redis一种中间件键,并且连最基本的并发方式和并发数量都没法指定
,更不用说包括控频 等一系列辅助控制功能,功能比celery差太远了,也和此框架不能比较。
这个框架的最开始实现绝对没有受到celery框架的半点启发。
这是从无数个无限重复复制粘贴扣字的使用redis队列的py文件中提取出来的框架。
原来有无数个这样的脚本。以下为伪代码演示,实际代码大概就是这个意思。
while 1:
msg = redis_client.lpop('queue_namex')
funcx(msg)
原来有很多个这样的一些列脚本,无限次地把操作redis写到业务流了。
排除无限重复复制粘贴不说,这样除了redis分布式以外,缺少了并发、
控频、断点接续运行、定时、指定时间不运行、
消费确认、重试指定次数、重新入队、超时杀死、计算消费次数速度、预估消费时间、
函数运行日志记录、任务过滤、任务过期丢弃等数十种功能。
所以这个没有借鉴celery,只是使用oop转化公式提取了可复用流程的骨架模板,然后慢慢增加各种辅助控制功能和中间件。
这个和celery一样都属于万能分布式函数调度框架,函数控制功能比rq丰富。比celery的优点见以下解答。
答: 为了直接表达框架的意思。
答: 支持python内置Queue对象作为当前解释器下的消息队列。
支持sqlite3作为本机持久化消息队列。
支持pika包实现的使用rabbitmq作为分布式消息队列。
支持rabbitpy包实现的使用rabbitmq作为分布式消息队列。
支持amqpstorm包实现的使用rabbitmq作为分布式消息队列。
支持redis中间件作为分布式消息队列。(不支持消费确认,例如把消息取出来了,但函数还在运行中没有运行完成,
突然关闭程序或断网断电,会造成部分任务丢失【设置的并发数量越大,丢失数量越惨】,所以推荐mq)
支持mongodb中间件作为分布式消息队列。
答: python内置Queue对象作为当前解释器下的消息队列,
优势:不需要安装任何中间件,消息存储在解释器的变量内存中,调度没有io,传输速度损耗极小,
和直接一个函数直接调用另一个函数差不多快。
劣势:只支持同一个解释器下的任务队列共享,不支持单独启动的两个脚本共享消息任务。
没有持久化,解释器退出消息就消失,代码不能中断。 没有分布式。
persistqueue sqlite3作为本机持久化消息队列,
优势: 不需要安装中间件。可以实现本机消息持久化。支持多个进程或多个不同次启动那个的脚本共享任务。
劣势: 只能单机共享任务。 无法多台机器共享任务。分布式能力有限。
redis 作为消息队列,
优势: 真分布式,多个脚本和多态机器可以共享任务。
劣势: 需要安装redis。 是基于redis的list数组结构实现的,不是真正的ampq消息队列,
不支持消费确认,所以你不能在程序运行中反复随意将脚本启动和停止或者反复断电断网,
这样会丢失相当一部分正在运行的消息任务,断点接续能力弱一些。
mongo 作为消息队列:
优势: 真分布式。是使用mongo col里面的的一行一行的doc表示一个消息队列里面的一个个任务。支持消费确认。
劣势: mongo本身没有消息队列的功能,是使用三方包mongo-queue模拟的消息队列。性能没有专用消息队列好。
rabbitmq 作为消息队列:
优势: 真正的消息队列。支持分布式。支持消费确认,支持随意关闭程序和断网断电。连金融支付系统都用这个,可靠性极高。完美。
劣势: 大多数人没安装这个中间件,需要学习怎么安装rabbitmq,或者docker安装rabbitmq。
kafka 作为消息队列:
优势:性能好吞吐量大,中间件扩展好。kafka的分组消费设计得很好,
还可以随时重置偏移量,不需要重复发布旧消息。
劣势:和传统mq设计很大的不同,使用偏移量来作为断点接续依据,需要消费哪些任务不是原子性的,
偏移量太提前导致部分消息重复消费,太靠后导致丢失部分消息。性能好需要付出可靠性的代价。
高并发数量时候,主题的分区如果设置少,确认消费实现难,框架中采用自动commit的方式。
要求可靠性 一致性高还是用rabbitmq好。
nsq 作为消息队列:
优势:部署安装很简单,比rabittmq和kafka安装简单。性能不错。
劣势:用户很少,比rabbitmq用户少了几百倍,导致资料也很少,需要看nsq包的源码来调用一些冷门功能。
redis 可确认消费方式,作为消息队列,
优势: 真分布式,多个脚本和多态机器可以共享任务。某个消费者突然关闭或者宕机,不会造成内存中正在运行的任务丢失。
劣势: 需要安装redis。
sqlalchemy 作为消息队列,
优势: 真分布式,一次性支持了5种关系型数据库每个队列由一张表来模拟,每个任物是表中的一行;
比直接弹出元素的队列,方便统计消费情况。
劣势: 由于是数据库模拟的消息队列,比原生的mq和redis kafka等直接将任物新增到队尾的方式,性能差很多。
所有中间件总结评分:
总体来说首选rabbitmq,所以这也是不指定broker_kind参数时候的默认的方式。
答: 1) 如5.4所写,新增了python内置 queue队列和 基于本机的持久化消息队列。不需要安装中间件,即可使用。
2) 性能比celery框架提高一丝丝。
3) 公有方法,需要被用户调用的方法或函数一律都没有使用元编程,不需要在消费函数上加app.task这样的装饰器。
例如不 add.delay(1,2)这样发布任务。 不使用字符串来import 一个模块。
过多的元编程过于动态,不仅会降低性能,还会让ide无法补全提示,动态一时爽,重构火葬场不是没原因的。
4) 全部公有方法或函数都能在pycharm智能能提示补全参数名称和参数类型。
一切为了调用时候方便而不是为了实现时候简略,例如get_consumer函数和AbstractConsumer的入参完全重复了,
本来实现的时候可以使用*args **kwargs来省略入参,
但这样会造成ide不能补全提示,此框架一切写法只为给调用者带来使用上的方便。不学celery让用户不知道传什么参数。
如果拼错了参数,pycharm会显红,大大降低了用户调用出错概率。
5)不使用命令行启动,在cmd打那么长的一串命令,容易打错字母。并且让用户不知道如何正确的使用celery命令,不友好。
此框架是直接python xx.py 就启动了。
6)框架不依赖任何固定的目录结构,无结构100%自由,想把使用框架写在哪里就写在哪里,写在10层级的深层文件夹下都可以。
脚本可以四处移动改名。celery想要做到这样,要做额外的处理。
7)使用此框架比celery更简单10倍,如例子所示。使用此框架代码绝对比使用celery少几十行。
8)消息中间件里面存放的消息任务很小,简单任务 比celery的消息小了50倍。 消息中间件存放的只是函数的参数,
辅助参数由consumer自己控制。消息越小,中间件性能压力越小。
9)由于消息中间件里面没有存放其他与python 和项目配置有关的信息,这是真正的跨语言的函数调度框架。
java人员也可以直接使用java的redis类rabbitmq类,发送json参数到中间件,由python消费。
celery里面的那种参数,高达几十项,和项目配置混合了,java人员绝对拼凑不出来这种格式的消息结构。
10)celery有应该中心化的celery app实例,函数注册成任务,添加装饰器时候先要导入app,然后@app.task,
同时celery启动app时候,调度函数就需要知道函数在哪里,所以celery app所在的py文件也是需要导入消费函数的,否则会
celery.exceptions.NotRegistered报错
这样以来就发生了务必蛋疼的互相导入的情况,a要导入b,b要导入a,这问题太令人窘迫了,通常解决这种情况是让其中一个模块后导入,
这样就能解决互相导入的问题了。celery的做法是,使用imports一个列表,列表的每一项是消费函数所在的模块的字符串表示形式,
例如 如果消费函数f1在项目的a文件夹下的b文件夹下的c.py中,消费函数与f2在项目的d文件夹的e.py文件中,
为了解决互相导入问题,celery app中需要配置 imports = ["a.b.c",'d.e'],这种import在pycharm下容易打错字,
例如scrapy和django的中间件注册方式,也是使用的这种类似的字符串表示导入路径,每添加一个函数,只要不在之前的模块中,就要这么写,
不然不写improt的话,那是调度不了消费函数的。此框架原先没有装饰器方式,来加的装饰器方式与celery的用法大不相同,
因为没有一个叫做app类似概念的东西,不需要相互导入,启动也是任意文件夹下的任意脚本都可以,自然不需要写什么imports = ['a.b.c']
11)简单利于团队推广,不需要看复杂的celry 那样的5000页英文文档。
对于不规则文件夹项目的clery使用时如何的麻烦,可以参考 celery_demo项目 https://github.com/ydf0509/celery_demo。
答:框架默认使用json。并且不提供序列化方式选择,有且只能用json序列化。json消息可读性很强,远超其他序列化方式。
默认使用json来序列化和反序列化消息。所以推送的消息必须是简单的,不要把一个自定义类型的对象作为消费函数的入参,
json键的值必须是简单类型,例如 数字 字符串 数组 字典这种。不可以是不可被json序列化的python自定义类型的对象。
用json序列化已经满足所有场景了,picke序列化更强,但仍然有一些自定义类型的对象的实例属性由于是一个不可被序列化
的东西,picke解决不了,这种东西例如self.r = Redis(),而redis对象又包括threding.Lock类型的属性 ,不可以被pike序列化
就算能序列化的对象也是要用一串很长的东西来。
用pike来序列化复杂嵌套属性类型的对象,不仅会导致中间件要存储很大的东西传输效率会降低,在编码和解码也会消耗更多的cpu。如果框架支持了pike序列化,会让使用者养成不好的粗暴习惯。
想消费函数传redis对象作为入参,这种完全可以使用json来解决,例如指定ip 和端口,在消费函数内部来使用redis。所以用json一定可以满足一切传参场景。
如果json不能满足你的消费任务的序列化,那不是框架的问题,一定是你代码设计的问题。所以没有预留不同种类序列化方式的扩展,
也不打算准备加入其他序列化方式。
答:使用的是定时发布任务,那么就能定时消费任务了。导入fsdf_background_scheduler然后添加定时发布任务。
FsdfBackgroundScheduler继承自 apscheduler 的 BackgroundScheduler,定时方式可以百度 apscheduler
import datetime
from function_scheduling_distributed_framework import task_deco, BrokerEnum, fsdf_background_scheduler, timing_publish_deco
@task_deco('queue_test_666', broker_kind=BrokerEnum.LOCAL_PYTHON_QUEUE)
def consume_func(x, y):
print(f'{x} + {y} = {x + y}')
if __name__ == '__main__':
fsdf_background_scheduler.add_job(timing_publish_deco(consume_func), 'interval', id='3_second_job', seconds=3, kwargs={"x": 5, "y": 6}) # 每隔3秒发布一次任务,自然就能每隔3秒消费一次任务了。
fsdf_background_scheduler.add_job(timing_publish_deco(consume_func), 'date', run_date=datetime.datetime(2020, 7, 24, 13, 53, 6), args=(5, 6,)) # 定时,只执行一次
fsdf_background_scheduler.add_timing_publish_job(consume_func, 'cron', day_of_week='*', hour=14, minute=51, second=20, args=(5, 6,)) # 定时,每天的11点32分20秒都执行一次。
# 启动定时
fsdf_background_scheduler.start()
# 启动消费
consume_func.consume()
问的是consuming_function的值能不能是一个类或者一个实例方法。
答:一切对类的调用最后都是体现在对方法的调用。这个问题莫名其妙。
celery rq huery 框架都是针对函数。
调度函数而不是类是因为:
1)类实例化时候构造方法要传参,类的公有方法也要传参,这样就不确定要把中间件里面的参数哪些传给构造方法哪些传给普通方法了。
见5.8
2) 这种分布式一般要求是幂等的,传啥参数有固定的结果,函数是无依赖状态的。类是封装的带有状态,方法依赖了对象的实例属性。
3) 比如例子的add方法是一个是实例方法,看起来好像传个y的值就可以,实际是add要接受两个入参,一个是self,一个是y。如果把self推到消息队列,那就不好玩了。
对象的序列化浪费磁盘空间,浪费网速传输大体积消息,浪费cpu 序列化和反序列化。所以此框架的入参已近说明了,
仅仅支持能够被json序列化的东西,像普通的自定义类型的对象就不能被json序列化了。
celery也是这样的,演示的例子也是用函数(也可以是静态方法),而不是类或者实例方法,这不是刻意要和celery一样,原因已经说了,自己好好体会好好想想原因吧。
框架如何调用你代码里面的类。
假设你的代码是:
class A():
def __init__(x):
self.x = x
def add(self,y):
print( self.x + y)
那么你需要再写一个函数
def your_task(x,y):
return A(x).add(y)
然后把这个your_task函数传给框架就可以了。所以此框架和你在项目里面写类不是冲突的,本人是100%推崇oop编程,非常鲜明的反对极端面向过程编程写代码。
答:基本原理如下
def add(a,b):
print(a + b)
从消息中间件里面取出参数{"a":1,"b":2}
然后使用 add(**{"a":1,"b":2}),就是这样运行函数的。
答:分布式 、并发、 控频、断点接续运行、定时、指定时间不运行、
消费确认、重试指定次数、重新入队、超时杀死、计算消费次数速度、预估消费时间、
函数运行日志记录、任务过滤、任务过期丢弃等数十种功能。
只需要其中的某一种功能就可以使用这。即使不用分布式,也可以使用python内置queue对象。
这就是给函数添加几十项控制的超级装饰器。是快速写代码的生产力保障。
适合一切耗时的函数,不管是cpu密集型 还是io密集型。
不适合的场景主要是:
比如你的函数非常简单,仅仅只需要1微妙 几十纳秒就能完成运行,比如做两数之和,print一下hello,这种就不是分需要使用这种框架了,
如果没有解耦的需求,直接调用这样的简单函数她不香吗,还加个消息队列在中间,那是多此一举。
答:先写自己的函数(类)来实现业务逻辑需求,不需要思考怎么导入框架。
写好函数后把 函数和队列名字绑定传给消费框架就可以了。一行代码就能启动分布式消费。
RedisConsmer('queue_name',consuming_function=your_function).start_consuming_message()
所以即使你不想用这个框架了,你写的your_function函数代码并没有作废。
所以不管是引入这个框架 、废弃使用这个框架、 换成celery框架,你项目的99%行 的业务代码都还是有用的,并没有成为废物。
答: 需要学习真oop和36种设计模式。唯有oop编程思想和设计模式,才能持续设计开发出新的好用的包甚至框架。
如果有不信这句话的,你觉得可以使用纯函数编程,使用0个类来实现这样的框架。
如果完全不理会设计模式,实现threding gevent evenlet 3种并发模式,加上10种中间件类型,实现分布式消费流程,
需要反复复制粘贴扣字30次。代码绝对比你这个多。例如基于nsq消息队列实现任务队列框架,加空格只用了80行。
如果完全反对oop,需要多复制好几千行来实现。
例如没听说设计模式的人,在写完rabbitmq版本后写redis版本,肯定十有八九是在rabbitmq版本写完后,把整个所有文件夹,
全盘复制粘贴,然后在里面扣字母修改,把有关rabbitmq操作的全部扣字眼修改成redis。如果某个逻辑需要修改,
要在两个地方都修改,更别说这是10几种中间件,改一次逻辑需要修改10几次。
我接手维护得老项目很多,这种写法的编程思维的是特别常见的,主要是从来没听说设计模式4个字造成的,
在我没主动学习设计模式之前,我也肯定会是这么写代码的。
只要按照36种设计模式里面的oop4步转化公式思维写代码三个月,光就代码层面而言,写代码的速度、流畅度、可维护性
不会比三年经验的老程序员差,顶多是老程序员的数据库 中间件种类掌握的多一点而已,这个有机会接触只要花时间就能追赶上,
但是编程思维层次,如果没觉悟到,可不是那么容易转变的,包括有些科班大学学过java的也没这种意识,
非科班的只要牢牢抓住把设计模式 oop思维放在第一重要位置,写出来的代码就会比科班好,
不能光学 if else 字典 列表 的基本语法,以前我看python pdf资料时候,资料里面经常会有两章以上讲到类,
我非常头疼,一看到这里的章节,就直接跳过结束学习了,现在我也许只会特意去看这些章节,
然后看资料里面有没有把最本质的特点讲述好,从而让用户知道为什么要使用oop,而不是讲下类的语法,这样导致用户还是不会去使用的。
你来写完包括完成10种中间件和3种并发模式,并且预留消息中间件的扩展。
然后我们来和此框架 比较 实现框架难度上、 实现框架的代码行数上、 用户调用的难度上 这些方面。
如你所见,使用此框架为什么没有配置中间件的 账号 密码 端口号呢。只有运行任何一个导入了框架的脚本文件一次,就会自动生成一个配置文件
然后在配置文件中按需修改需要用到的配置就行。
@task_deco 和celery的 @app.task 装饰器区别很大,导致写代码方便简化容易很多。没有需要先实例化一个 Celery对象一般叫app变量,
然后任何脚本的消费函数都再需要导入这个app,然后@app.task,一点小区别,但造成的两种框架写法难易程度区别很大。
使用此框架,不需要固定的项目文件夹目录,任意多层级深层级文件夹不规则python文件名字下写函数都行,
celery 实际也可以不规则文件夹和文件名字来写任务函数,但是很难掌握,如果这么写的话,那么在任务注册时候会非常难,
一般demo演示文档都不会和你演示这种不规则文件夹和文件名字下写celery消费函数情况,因为如果演示这种情况会非常容易的劝退绝大部分小白。
但是如果不精通celery的任务注册导入机制同时又没严格按照死板固定的目录格式来写celery任务,
一定会出现令人头疼的 Task of kind 'tasks.add' is not registered, please make sure it's imported. 类似这种错误。
主要原因是celery 需要严格Celery类的实例化对象app变量,然后消费函数所在脚本必须import这个app,这还没完,
你必须在settings配置文件写 include imports 等配置,否则cmd 启动celery 后台命令时候,celery并不知情哪些文件脚本导入了 app这个变量,
当celery框架取出到相关的队列任务时候,就会报错找不到应该用哪个脚本下的哪个函数去运行取出的消息了。
你可能会想,为什么celery app 变量的脚本为什么不可以写导入消费函数的import声明呢,比如from dir1.dir2.pyfilename imprt add 了,
这样celery运行时候就能找到函数了是不是?那要动脑子想想,如果celery app主文件用了 from dir1.dir2.pyfilename import add,
同时消费函数 add 所在的脚本 dir1/dir2/pyfilename.py 又从celery app的猪脚本中导入app,然后把@app.task加到add函数上面 ,
那这就是出现了互相导入,a导入b,b导入a的问题了,脚本一启动就报错,正是因为这个互相导入的问题,
celery才需要从配置中写好 include imports autodiscover_tasks,从而实现一方延迟导入以解决互相导入。
此框架的装饰器不存在需要一个类似Celery app实例的东西,不会有这个变量将大大减少编程难度,消费函数写在任意深层级不规则文件下都行。
不规则文件夹层级和名称下的celery使用演示
https://github.com/ydf0509/celery_demo
可以看代码,当文件夹层级不规则和文件名称不规则时候,要使用celery绝非简单事情,如果你只看普通的celery入门文档,是绝对解决不了
这种情况下的celery如何正确使用。