V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
akmonde
V2EX  ›  Python

[V 币感谢在线等] celery [Soft time limit exceeded] 无法结束任务

  •  
  •   akmonde · 2018-10-09 13:55:21 +08:00 · 2870 次点击
    这是一个创建于 2266 天前的主题,其中的信息可能已经有所发展或是发生改变。

    在使用 soft_time_limit 的时候遇到了一个问题,debug 信息已经显示类似以下信息:

    Soft time limit (180s) exceeded 
    
    @celery.task(soft_time_limit=180)
    def test_error(para):
    	do_something...
    
    

    然后我去查了下 task id,还是处于 starting 状态。 多个任务出现这种情况,有的再过一段时间会变成 SUCCESS,还有的任务就干脆放弃治疗,一直卡在 starting 了。

    看了个解决方法,是在调用任务时 catch 错误 SoftTimeLimitExceeded:

    from celery.exceptions import SoftTimeLimitExceeded
    
    @app.task
    def mytask():
        try:
            return do_work()
        except SoftTimeLimitExceeded:
            cleanup_in_a_hurry()
    
    

    不过我的任务函数 test_error 是在 workflow 里的 chord 调用的,不太好 catch:

    chord( ( test_error.s(x) for x in y) , test_result.s() )().get()
    

    我原本还以为 soft_time_limit 算是 time_limit 的不报错版本,会自动 continue 啥的。test_error 这里如果直接使用 time_limit 的话,报错似乎会让 chord 直接失败。

    哪位大佬能给个好的解决办法?

    第 1 条附言  ·  2018-10-09 15:46:11 +08:00
    其实我意思就是 mytask 函数,或者 chord 所在主函数里,我尝试过 try except SoftTimeLimitExceeded,一直抓不到,然后随机遇到的某个僵死任务 Soft time limit 是能正常用的,debug 信息会显示:
    ```
    Soft time limit (60s) exceeded for xxxxx
    ```
    但是使用了以后该任务依旧 starting 状态,并没有 fail 或者 success.

    * 关键点在于,我想通过 catch SoftTimeLimitExceeded 去执行超时后的动作,比如 return 一个默认值,保证我 chord 不出错。
    4 条回复    2018-10-10 12:05:52 +08:00
    YaphetYin
        1
    YaphetYin  
       2018-10-10 00:52:07 +08:00
    不是很明白你的表述,是这样用的意思么?
    ```
    @task():
    def mytask():
    chord( ( test_error.s(x) for x in y) , test_result.s() )().get()
    ```
    akmonde
        2
    akmonde  
    OP
       2018-10-10 08:53:29 +08:00
    @YaphetYin
    您那种写法的话,错一个子任务 test_error,chord 就直接报错,不会返回任何内容了。
    这里的 mytask 案例网上找的,对应的应该是我这里的 test_error。
    昨儿我试了下,好像是我在 test_error 函数里,import 了其他耗时的模块造成的,但仍然不知道咋解决,这里因为不能 raise 错误,所以用不了 time_limit,SoftTimeLimitExceeded 在 test_error 函数(乃至主函数里)一直捕获不到。

    另外,我像下面这样是可以在 180s 的时候,得到 soft time sleep 的,然后任务变成 success 状态:
    ```
    @app.task(soft_time_limit=180)
    def test_error():
    time.sleep(500)

    ```
    YaphetYin
        3
    YaphetYin  
       2018-10-10 11:21:00 +08:00
    你可以把完整的例子贴上来,业务逻辑省略掉,保留你想要表达的主任务,try..catch.. 和子任务名,这样比较方便看调用结构,比如

    ```
    def main():
    slow_task.delay()

    @task()
    def slow_task():
    chord( ( test_error.s(x) for x in y) , test_result.s() )().get()

    @task()
    def test_error(a):
    try:
    sleep(a)
    catch SoftTimeLimitExceeded as e:
    pass

    @task()
    def test_result(a):
    print("all success")


    main()
    ```

    这样哪里调同步哪里调异步都看的很清晰,用语言描述的话比较难 get 到吧
    akmonde
        4
    akmonde  
    OP
       2018-10-10 12:05:52 +08:00
    @YaphetYin 说实话,我仔细对比了下,您贴的那段示例代码差不多就是我的调度结构了。
    SoftTimeLimitExceeded 是在 debug 信息里显示的,但是一直 catch 抓不到到,然后部分 test_error 在显示 SoftTimeLimitExceeded 后,超时了一段时间才 success,还有的就一直 starting。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   4843 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 01:15 · PVG 09:15 · LAX 17:15 · JFK 20:15
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.