我 celery 版本为 v3.1.25 ,django 和 celery 的启动项如下:
celery -A proj-test worker -n workerA.%h --concurrency=2
celery -A proj-test worker -n workerB.%h --concurrency=2
python manage.py celeryd -B
python manage.py celery beat
python manage.py runserver 0.0.0.0:8000
这样是可以运行的,不过好像由于我在 celery 的 task 里面,使用了 workflow 的 chord/group 进行 subtask 分发,导致 subtask 过多时,因为 concurrency 有限,beat 任务( beat_function )会因此堵塞结束不掉。
大概代码如下,下面至少有两个 chord:
@task
def beat_function():
xxfunc1.delay()
xxfunc2.delay()
time.sleep(5)
chord((test1.s(x) for x in xs) , test2.s())
chord((test3.s(y) for y in ys) , test4.s())
让它不会占用其他 subtask 运行的 worker 容量,这样就算 beat 所在的 queue 在阻塞,等到 subtask 运行完毕,这边 queue 也会结束相应的 beat 任务。
我是这样弄得,queue 配置如下:
CELERY_QUEUES = {
"celery_beat": {
"exchange": "celery_beat",
"exchange_type": "direct",
"routing_key": "celery_beat"
},
}
尝试过的 beat 运行方式如下两种
python manage.py celeryd -B -Q celery_beat
也尝试了下面的运行法子:
python manage.py celery worker -B -Q celery_beat
不过始终不能把 beat 任务运行起来,每次一加 queue 似乎 beat 就失效了。
尝试过同时单独运行,也失败:
python manage.py celery worker --beat
python manage.py celery worker -Q celery_beat
我不能每次手动结束 beat 任务,确实结束了能解决某一次死锁,那些 subtask 能运行完。我曾尝试设置任务超时,单个 chord 的时可用,该 chord 可以运行完结,不过后面还有 chord 分发其他子任务就 gg 了,也就是说只能运行完第一个 chord,比如我上面举的例子。
我想要实现的是,单个 queue 指定的那些 worker,运行所有的 beat 任务,现在的情况是所有的 worker 都会去抢 beat 任务,结果可能导致了部分 beat 任务阻塞。
1
akmonde OP Woc,没人给点建议么,屌大的大佬们呢,都换上女装去泡吧了么?!
|
2
YaphetYin 2018-09-29 22:23:22 +08:00
beat 单独起啊,celery -A xxx beat
|
3
akmonde OP @YaphetYin 单独起了,会出现我上述的死锁问题,所以才想把 beat 任务单独给一个 queue。
但是结果好像不尽如人意,不知道我 router 和 queue 是否配置有问,还是压根不能这样做。 |