Skip to content

4.celery和这个框架比,存储的内容差异

ydf0509 edited this page Apr 20, 2021 · 1 revision

4.1celery的

{
 "body": "W1szLCA2XSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d",
  "content-encoding":  "utf-8",
  "content-type":  "application/json",
  "headers":  {
   "lang":  "py",
    "task":  "test_task\u554a",
    "id":  "39198371-8e6a-4994-9f6b-0335fe2e9b92",
    "shadow":  null,
    "eta":  null,
    "expires":  null,
    "group":  null,
    "retries":  0,
    "timelimit":  [
     null,
      null
   ],
    "root_id":  "39198371-8e6a-4994-9f6b-0335fe2e9b92",
    "parent_id":  null,
    "argsrepr":  "(3, 6)",
    "kwargsrepr":  "{}",
    "origin":  "gen22848@FQ9H7TVDZLJ4RBT"
 },
  "properties":  {
   "correlation_id":  "39198371-8e6a-4994-9f6b-0335fe2e9b92",
    "reply_to":  "3ef38b98-1417-3f3d-995b-89e8e15849fa",
    "delivery_mode":  2,
    "delivery_info":  {
     "exchange":  "",
      "routing_key":  "test_a"
   },
    "priority":  0,
    "body_encoding":  "base64",
    "delivery_tag":  "59e39055-2086-4be8-a801-993061fee443"
 }
}

4.2 此框架的消息很短,就是一个字典,内容的键值对和函数入参一一对应。

额外控制参数如重试、超时kill,由代码决定, 不需要存到中间件里面去。例如函数运行超时大小在本地代码修改后,立即生效。

不由中间件里面的配置来决定。 (现在也加入了少量优先级控制参数,队列里面消息任务的配置将覆盖python代码的consumer配置,但还是可以人为构造出来,没有celery那么难人为构造)

{"a":3,"b":6}

这样做好处有:

1)修改代码时候,函数控制功能立即生效。举个例子原来是设置的函数运行时间超时10秒kill,现在想设置20秒超时kill,对于celery已经发布的任务,还是10秒, 此框架的方式立即生效。

2)celery的方式不是跨平台,如果发布方和消费方不在同一个git项目,不能直接调用。例如java人员或ai人员并不能直接调用你项目里面的add.delay(1,2)这样写来发布消息, 非常难以构造出来上面celery任务的那一长串东西,几乎不可能。

4.2 性能对比,celery比次框架推送慢5倍,消费比次框架慢10倍。

测试的消费基准函数为简单地求和函数,两个框架都推送和消费20000次。
都使用gevent并发模型,相同并发数量 相同的linux机器内网连接中间件下反复测试的。

假设框架包装和调度一个函数执行的代码需要消耗的cpu执行时间是t1,消费函数自身需要的cpu执行时间是t2,

如果消费函数本身非常简单,例如消费函数是def f(x): print(x)这种机器简单地,那么t1远大于t2,
此时celery的t1远大于此框架的t1,所以出现了10倍的性能差距。

如果消费函数非常复杂,执行逻辑需要消耗的cpu时间远大于调度和包装一个函数的cpu执行时间,即t2远大于t1,
此时相同的非常复杂的消费函数逻辑,两个框架的执行效率就不会出现有10倍的差距。

如果有人怀疑吹牛,可以试试十分简单地求和函数,不做limit_rate控频,使用redis中间件,测试执行2万次函数消耗的时间,
我指的是单进程,并发模式gevent evetlet你随意测试。我电脑cpu频率低一点,消费2万次时间是69秒,频率高一点的cpu,怎么也得30秒。
此框架单进程情况下执行2万次求和需要时间是4秒。
如果不信的人可以自己测试一下,如果测试后你的celery单进程(或在单核电脑),你使用任意gvent eventlet threading模式能在30秒内完成2万次求和,
我愿意转账2000元你,如果你测试后30秒不能完成2万次求和,你只需要转账1000元给我就可以。鄙视又怀疑又不愿意亲自测试的人。
不是celery不行,而是实现手段上包袱上有区别,如果你直接裸写 redis + lpush + brpop + Threadpoolexecutor,调度上面的函数,
你会发现你也可以暴击celery几十倍也可以秒杀此框架好多倍,复杂调用链路长的代码一般都会消耗更多的cpu,导致运行耗时变长。
celery的kombu性能很差,再加上celery性能也不行,造成celery慢很多,此框架也支持kombu操作中间件,再换成konbu操作redis而不是redispy操作redis时候,
光用这个kombu性能就会降低4倍了。

如果有人说,不要限定成不准使用多进程模式只能使用其他并发模式,说这样那就没意义了,
测试条件要对等,一定要坚持控制变量法,变化的只能是使用上面框架;具体开多少并发,并发模式选择进程还是gevent eventlet threading  中间件类型 中间件地址 这些要全部保持一致。
如果你非要吧celery开4个进程,那么此分布式函数调度框架也学样开4个进程,那么7秒钟就不止是消费2万次了,应该是8万次左右了。

4.2.1 celery 4.2.1版本的测试基准代码,消费

celery_app.config_from_object(Config2)


@celery_app.task(name='求和啊',)  
def add(a, b):
    print(f'计算 {a} + {b} 得到的结果是  {a + b}')
    return a + b



if __name__ == '__main__':
    celery_app.worker_main(
        argv=['worker', '--pool=gevent', '--concurrency=50', '-n', 'worker1@%h', '--loglevel=debug',
              '--queues=queue_add', '--detach',])

4.2.2 function_scheduling_distributed_framework测试基准代码,消费

def add(a, b):
    # if random.randint(4, 6) == 5:
    #     raise RandomError('演示随机出错')
    print(f'计算 {a} + {b} 得到的结果是  {a + b}')
    return a + b


# 把消费的函数名传给consuming_function,就这么简单。
consumer_add = get_consumer('queue_test569', consuming_function=add, max_retry_times=2,
                            qps=0, log_level=10, logger_prefix='zz平台消费',
                            concurrent_mode=2,  broker_kind=2, ) # 通过设置broker_kind,一键切换中间件为rabbitmq或redis等数十种中间件或包。


if __name__ == '__main__':
    consumer_add.start_consuming_message()