Celery+Django 如何动态修改已被接收或已在队列中的任务?

2022-11-21 16:45:02 +08:00
 Raul7

项目遇到两种场景:

队列配置了默认任务优先级,我想在任务被接收或者在队列中,动态修改任务的优先级,有什么比较好的方法吗?

希望各位大佬不吝赐教。

2509 次点击
所在节点    Python
8 条回复
crysislinux
2022-11-21 16:49:23 +08:00
把任务处理做成幂等的,然后要改优先级就同样的参数但是更高的优先级再发一次。
Raul7
2022-11-21 16:51:05 +08:00
@crysislinux 再发一次的话,之前的那个任务要清掉吗?应该不会覆盖掉吧?队列就需要执行两个任务了
westoy
2022-11-21 17:01:10 +08:00
不能修改
revoke 之前那个 task id, 再重新创建一个,celery 运行到的时候会判断状态
如果你本身要处理的数据会保存状态, 那也不用取消了, 执行的时候判断一下就行了
celery 的优先级比较玄学, 也只有基于 rabbitmq 才支持
Raul7
2022-11-21 17:20:29 +08:00
@westoy 现在新版本的 celery ,Redis 也支持的
crysislinux
2022-11-21 17:24:27 +08:00
@Raul7 不会覆盖,但是后面允许的那个处理的时候你的代码能判断出来处理过了就可以跳过
akaHenry
2022-11-22 12:28:31 +08:00
应该没有什么队列产品, 满足你这奇怪的需求. 不只是 celery.

队列, 只是管道, 是无状态的.

你想改 task 状态, 是业务需求. 需要自己单独设计方案.

给个思路:

1. task 数据包内, 定义 task_type 字段.
2. redis, 根据 task_type, 动态调整优先级 level 值.
3. 定义第一级 MQ 管道 + task worker, 此 worker 主要做调度器. 收到 task 时, 根据 task_type, 查询 redis, 判断实时优先级, 进而决定:
- 立即执行
- 转发到另一个队列中, 等待其他 task worker 处理.
- 忽略丢弃
4. 次一级的 MQ + task worker, 无脑执行.


celery, 可以是个管道链. 其他消息队列, 都可按照此思路设计业务.
akaHenry
2022-11-22 12:35:28 +08:00
另外不要滥用 celery, 这东西, 本身是个调度器, 做定时任务用还行.

常规的 MQ 需求, 踢开 celery, 直接写业务代码, 操作 rabbitmq/kafka 就好.

当然, 如果是维护的不值钱代码, 当我没说.
kelvin_fly
2022-12-07 15:34:44 +08:00
队列是无状态的管道。已经进入后,不能再做修改。这应该是个业务上的问题,需要记录队里的 ID ,然后在任务分配那,和 worker 里做业务联动处理 i

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/896844

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX