V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
cyberpoint
V2EX  ›  Python

接上文, rq worker 里面又创建新的 worker 需要怎么处理

  •  
  •   cyberpoint · 2022-09-10 13:46:10 +08:00 · 1528 次点击
    这是一个创建于 797 天前的主题,其中的信息可能已经有所发展或是发生改变。

    感谢大家在前文 请教下各位 Gunicorn...提供的帮助,前段时间有些忙,没有一一回复大家。在这里感谢各位了,中秋快乐。
    听取了大家的意见,由于 celery 对于我来说过于复杂,所以使用 rq 来做。
    叨扰一下新的问题:

    1. 长耗时任务分为两个步骤
    2. 步骤 1 执行完,才执行步骤 2
    3. 步骤 2 内部需要拆分为多个 worker 并行执行。

    目前遇到的问题是

    1. step_two 里面的 worker 变成串行了。
    2. 要怎么汇总 step_two 的运行结果
    3. 如何 close 访问 /task 后创建的任务

    以下代码使用 gunicorn 默认模式运行

    from datetime import datetime
    from redis import Redis
    import time
    import rq_dashboard
    from flask import Flask, request
    from rq import Queue, Worker
    from rq.command import send_stop_job_command
    
    redis = Redis(host='127.0.0.1', port=6379)
    queue = Queue(connection=redis, name='yixianghuitong')
    
    app = Flask(__name__)
    app.config['RQ_DASHBOARD_REDIS_URL'] = 'redis://localhost:6379'
    app.config.from_object(rq_dashboard.default_settings)
    app.register_blueprint(rq_dashboard.blueprint, url_prefix="/rq")
    
    @app.route('/task')
    def add_task():
        user = request.args.get('user')
        if user:
            print('执行步骤 1')
            d = datetime.now().strftime('%Y%m%d%H%M%S')
            name = f"{user}_{d}"
            queue.enqueue(
                step_one,
                args=(name,)
            )
            worker = Worker([queue], connection=redis, name=name)
            worker.work(burst=True)
    
            print('执行步骤 2')
            step_two(name)
            # TODO: 汇总上面的结果返回给网页端
            return f"Task {name} complete"
        return 'No value for n'
    
    def step_one(name):
        delay = 5
        print(f"Running {name}, Simulating {delay} second delay")
    
        time.sleep(delay)
    
        f = open(f"step_one_{name}.txt", 'a')
        d = datetime.now().strftime('%Y%m%d-%H%M%S')
        f.write(d)
        print(f"Task {name} complete")
    
        return name
    
    def step_two(name):
        for i in range(2):
            worker_name = f"{name}_{i}"
            queue.enqueue(step_two_fn, worker_name)
            worker = Worker([queue], connection=redis, name=name)
            worker.work(burst=True)
    
    
    def step_two_fn(name):
        delay = 5
        print(f"Running {name}, Simulating {delay} second delay")
    
        time.sleep(delay)
    
        f = open(f"step_two_fn_{name}.txt", 'a')
        d = datetime.now().strftime('%Y%m%d-%H%M%S')
        f.write(d)
        print(f"Task {name} complete")
    
        return name
    
    
    @app.route('/close')
    def close():
        # list_jobs = queue.get_jobs()
        # print(list_jobs)
        id = request.args.get('id')
        # job = queue.fetch_job(id)
        send_stop_job_command(redis, id)
        # for j in list_jobs:
        #     print(11, j.get_id())
        #     j.cancel()
        return 'len(list_jobs)'
    
    
    
    第 1 条附言  ·  2022-09-11 10:41:50 +08:00
    暂时改成 multiprocessing 来实现了
    4 条回复    2022-09-13 13:24:54 +08:00
    cyberpoint
        1
    cyberpoint  
    OP
       2022-09-10 13:48:49 +08:00
    目前项目使用 gunicorn 默认模式启用。
    cyberpoint
        2
    cyberpoint  
    OP
       2022-09-10 16:37:33 +08:00
    可[有偿解决]( https://www.v2ex.com/t/879114)
    noparking188
        3
    noparking188  
       2022-09-10 23:26:35 +08:00
    长耗时指多久,分钟级别还是小时级别?
    这个可以参考下 https://github.com/RandyDeng/rq-docker-supervisor
    supervisor 比较适合托管后台任务
    如果并行任务是 CPU 密集型

    换种思路,也许可以考虑 lambda function
    runningman
        4
    runningman  
       2022-09-13 13:24:54 +08:00
    可以加我,zhtsuc
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1743 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 16:53 · PVG 00:53 · LAX 08:53 · JFK 11:53
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.