我 celery 版本为 v3.1.25 ,django 和 celery 的启动项如下:
celery -A proj-test worker -n workerA.%h --concurrency=2
celery -A proj-test worker -n workerB.%h --concurrency=2
python manage.py celeryd -B
python manage.py celery beat
python manage.py runserver 0.0.0.0:8000
这样是可以运行的,不过好像由于我在 celery 的 task 里面,使用了 workflow 的 chord/group 进行 subtask 分发,导致 subtask 过多时,因为 concurrency 有限,beat 任务( beat_function )会因此堵塞结束不掉。
大概代码如下,下面至少有两个 chord:
@task
def beat_function():
xxfunc1.delay()
xxfunc2.delay()
time.sleep(5)
chord((test1.s(x) for x in xs) , test2.s())
chord((test3.s(y) for y in ys) , test4.s())
让它不会占用其他 subtask 运行的 worker 容量,这样就算 beat 所在的 queue 在阻塞,等到 subtask 运行完毕,这边 queue 也会结束相应的 beat 任务。
我是这样弄得,queue 配置如下:
CELERY_QUEUES = {
"celery_beat": {
"exchange": "celery_beat",
"exchange_type": "direct",
"routing_key": "celery_beat"
},
}
尝试过的 beat 运行方式如下两种
python manage.py celeryd -B -Q celery_beat
也尝试了下面的运行法子:
python manage.py celery worker -B -Q celery_beat
不过始终不能把 beat 任务运行起来,每次一加 queue 似乎 beat 就失效了。
尝试过同时单独运行,也失败:
python manage.py celery worker --beat
python manage.py celery worker -Q celery_beat
我不能每次手动结束 beat 任务,确实结束了能解决某一次死锁,那些 subtask 能运行完。我曾尝试设置任务超时,单个 chord 的时可用,该 chord 可以运行完结,不过后面还有 chord 分发其他子任务就 gg 了,也就是说只能运行完第一个 chord,比如我上面举的例子。
我想要实现的是,单个 queue 指定的那些 worker,运行所有的 beat 任务,现在的情况是所有的 worker 都会去抢 beat 任务,结果可能导致了部分 beat 任务阻塞。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.