不得不吐槽一下 Python 的任务队列,异步支持太差了

2022-04-27 09:47:49 +08:00
 LeeReamond

今天带着学习的目的花了半天时间看了一下 python 广泛使用的任务队列 celery ,看完感觉不少地方挺奇怪。我们部门任务因为削峰一直用的是自己写的任务队列,所以说实话其实一直也没用过 rabbitmq 这些。

网上相关信息很少不提了,stackoverflow 能找到几个问题基本都是好多年前的。

这些都是小问题,我感觉其他几个需要吐槽的地方

1 、2202 年了,asyncio 居然不是被原生支持的。

追踪最新任务 issue ,https://github.com/celery/celery/issues/6552,显然异步支持不在当前官方列表里,甚至不在官方发布计划中,可以说是遥遥无期。社区当然是有一些曲折迂回的方法的,比如我看到 so 上有人说用async_to_sync做奇怪的封装

from asgiref.sync import sync_to_async

def task_to_async(task):
    async def wrapper(*args, **kwargs):
        delay = 0.1
        async_result = await sync_to_async(task.delay)(*args, **kwargs)
        while not async_result.ready():
            await asyncio.sleep(delay)
            delay = min(delay * 1.5, 2)  
        return async_result.get()
    return wrapper

只能缓缓打出一个问号?且不提这 while not ready: sleep & try ,看了一下依赖的第三方库里实现是基于一个最大线程数为 1 的线程池维护事件循环,这又带来更多衍生问题,比如我如何在 task 间共享连接状态,比如 mysql 连接池?如果我要在异步任务里使用多线程又该如何管理?总之是问题多多。

再者不提消费者不能异步消费,生产者也不能异步生产,添加任务到任务队列的过程是同步的,也许这带来的延迟非常短暂,但是也许设计者认为该延迟应该忽略?看了看 celery 基于的通信模块是基于 socket 自己搞了个 selector ,自行维护的事件循环没有享受到任何现有生态的好处,纯 py 编写无法享受社区 libuv 版的好处,性能和可靠性都让人质疑。。总之这一个设计让用户代码整个又被拖回同步宇宙,也是问题多多。

2 、生产者不支持任务完成回调

对于一些常见的短任务的需求(处理时间小于 1s ),短的任务就不需要加入任务队列了吗?我觉得显然不是的,但是同样的,段任务的处理规范是否应该是返回一个任务状态,再由前端轮询执行结果?我感觉也未必,毕竟任务很短。所以我们部门网关调用时可以选择用异步通知,await 等待执行结果。而 celery 目前看起来网关想获取任务成功状态的话只能轮询,或者是在 worker 那边定义任务完成后再向队列加入一个子任务,你可以在子任务里用自定义的方式给网关一个回调。。。

8660 次点击
所在节点    Python
76 条回复
gjquoiai
2022-04-28 14:19:22 +08:00
@est #59 现在是 LGPL 了
neoblackcap
2022-04-28 17:15:23 +08:00
@LeeReamond 是全部都可以,gevent 有比较好的支持,asyncio 没有。但理论上你都可以简单写一个 worker 类来支持。几十行代码的事情,全网网友的痛点就过了。
既然 gevent 都能做,asyncio 的支持显然不是什么问题。问题绝大多数来自于跟其他类型的 worker 结合不好。
有 IO loop 的 worker 是不能做 CPU-bound 的任务,所以此类 worker 的使用有限制,你是遇到不能扩展还是什么情况?
neoblackcap
2022-04-28 17:20:01 +08:00
@est celery 不是还不错,是社区比较久。资料相对较多。说它是玩具过了,但是说它写得多么好,我觉得也不是。
毕竟我看过里面的代码,简直让人觉得头大。
不过它很早就写了,现在要搞一个跟它一样多功能的,怕是要下一番功夫。至于你说的失败重试不可靠是怎么一会事能说说吗?
根据我的使用经验,它的失败重试还是比较靠谱的,前提是你的消息后端一定要基于 rabbitmq 。其他 backend 有些问题,比如 redis 。这事 celery 的社区跟 redis 有过直接的联系,不知道修得如何。
neoblackcap
2022-04-28 17:41:36 +08:00
@LeeReamond 不是 celery 开发者说不支持就是不行。他们不支持不代表你不能写。主要是你想要实现到什么程度,要花多大力气。你在对应任务里面开一个 io loop ,自己等待这样行不能,算不算支持?
还是说要 celery 支持对 coroutine 类任务的调度?还是说社区要做到封装好,使用者感知不到,可以随便用一个装饰器把函数直接转变为 task 才算支持?
你都没有说具体的需求,那怎么给你解决方案?
est
2022-04-28 17:51:01 +08:00
> 它的失败重试还是比较靠谱的,前提是你的消息后端一定要基于 rabbitmq

这就是我最大的疑惑了。都有 rabbitmq 了,还需要 celery 干啥?就为了额外做了一个序列化+日志操作+函数里用装饰符标记为任务的 helper ? LZ 所有的问题,如果基于裸 rabbitmq 自己 pub/sub ,恐怕压根都不会成为问题。
neoblackcap
2022-04-28 18:39:21 +08:00
@est 不就是为了这些?什么都裸写,新写的架构做好一点都比 celery 来得强。那个代码看到头疼
abersheeran
2022-04-28 19:49:48 +08:00
@est 确实,werkzeug 设计实在是……我自己写了一个 https://github.com/abersheeran/baize 真正轻量,没有多余共功能又刚好能直接用的程度。
LeeReamond
2022-04-28 20:19:08 +08:00
@neoblackcap 我觉得你有什么方案大可以发出来,不必要说我没有明确需求,我的需求很明确,在 aio 框架下使用消息队列,你的代码能解决到什么程度这是由你决定的,你可以发出来让大家评断。我之所以让你发代码,是因为整个楼里还是有不少网友提出了建设性意见,而你的回复中直接将网友们评价为“很多人也不看源码,也不看文档所以才有问题”,并且你觉得这些问题很简单就可以解决。我觉得既然对你来说这并不需要高昂成本,比起楼里回复很多字,不如直接用你所描述的几行代码解决问题来得实在。
neoblackcap
2022-04-28 23:44:48 +08:00
est
2022-04-28 23:48:56 +08:00
@abersheeran 赞一个~
LeeReamond
2022-04-29 04:19:40 +08:00
@neoblackcap 如果你确实追踪了楼内讨论的话,你的代码没有解决上文提出的两个问题,

首先是消费方,按照你的逻辑该消费者会在各个 worker 子进程创建独立的事件循环并执行,

1 、我提出的如何在 task 间共享状态,这是使用异步很基本的需求,如果我不希望每次协程调用都执行一次创建和销毁后端连接池的话。
2 、你目前所谓的解决方式是,在单个 worker 进程内创建、执行、结束事件循环,然后在开启下一个循环。所以一个事件循环的意义是仅为一个协程服务,并不能并发调度协程任务,所以使用协程的意义在哪里?

其次像楼上已经有人提到的,你的生产者依然在执行同步逻辑请求任务,所以他们如何被事件循环管理?如何获取任务完成的回调?我觉得在 2022 年使用异步网关不是什么罕见需求。
neoblackcap
2022-04-29 09:33:02 +08:00
@LeeReamond

1. 我的代码只能说解决 celery 能不能用 asyncio 的问题。

2. 既然你调研过 celery ,你就会明白你所说的并不是什么不可能的事情,就是需要额外提供一个基于 asyncio 的并发池,可以通过继承 celery.concurrency.base.BasePool ,并实现对应的接口。既然 gevent 能做到,这显然是可以做到的。但是你所说的共享连接池,等资源共享就未必。celery 是分布式的,worker 可以分布在多台机器上,你的需求本身就跟它的设计大方向矛盾。

3. 我理解你想要的回调应该是 rpc 式的回调,而不是在 worker 里面调用你的回调接口。你的回调是需要生产者配合的,哪怕现在 Python 绝大多数的人还是在使用基于 wsgi 的 web 框架。支持 rpc 式的回调,基本上就得在框架上面动手,不改支持不好。

4. 如果你所说的异步生产应该是指这个生产的动作会被你调用者的 IO loop 所管控的话,那么就是跟上面有着一样的问题,那么应该在很长一段时间内 celery 也不会改,这个同步异步需求可以使用线程池绕过。

5. 你的需求很好,但是这不是 celery 能解决或者解决好。这不是 celery 的问题,任何一个开源工具都没有说要对某一个人的需求负责。你如果觉得需求重要,有通用性的,那么你可以提交你的解决方案,或者提思路,又或者提供资金。我觉得退一万步讲,哪怕 celery 是个垃圾它也没有强制你使用。你完全可以选择其他方案,而不是发一个贴将它批判一番。自由软件是它已经提供源代码给你,授权你使用,修改的权利。一些个人需求,不代表就是社区的需求。
abersheeran
2022-04-29 12:03:41 +08:00
@est /(ㄒoㄒ)/~~你甚至都不愿意点个 Star 凑个整
LeeReamond
2022-04-29 12:31:40 +08:00
@neoblackcap 关于复用问题,我觉得进程内资源复用和分布式任务并不矛盾。程序在允许范围内最大限度复用和节约资源是并非不值得提倡,无论如何我们必须承认的是任务队列的使用需求者中相当一部分隐含着高并发需求,如果任务本身只需要单线程执行,那么也没有使用任务队列的必要。在并发需求下不复用连接资源,能承担多少负载能力呢,我很质疑。

确实如同你所说的,开源框架并没有强制要求我使用,所以没有解决我需求的责任。我发帖的目的很单纯,如同我在帖子一开始说了,部门目前使用的任务队列是自己实现的,我是带着学习参考的目的看一下所谓的 py 队列的行业标准是如何做的,然后发现被广泛传播和使用的框架不能解决一些最基础的问题,如同我在上文和本段已经描述的多种场合,将同步逻辑接管于线程池可以绕过事件循环 100%的问题,但显然这不是我们需要的方案,也不应该是框架设计和发展的方向。不过,无论如何谢谢你的回复讨论。
opengo
2022-04-29 17:37:11 +08:00
asyncio 中的 loop 不能跨线程,或许可以实现一个代理类,按当前线程 ID 返回对应 loop ,这样虽然每个线程都创建了一个 loop ,但是每个线程都共享这一个 loop ,不会频繁创建和销毁,

类似:
class A:
pool: {
Thread ID: Loop
}
281x1h2ez12
2022-05-03 07:33:12 +08:00
@est 17 年左右在学校里的时候,有门课指定用 flask 写后端。当时刚刚接触 python ,感觉 flask 也挺好用,后来就不写网站了,也不了解 flask 发生了啥。当时 Django 也很出名,不过大家说没 flask 简单。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/849494

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX