接上文, rq worker 里面又创建新的 worker 需要怎么处理

2022-09-10 13:46:10 +08:00
 cyberpoint

感谢大家在前文 请教下各位 Gunicorn...提供的帮助,前段时间有些忙,没有一一回复大家。在这里感谢各位了,中秋快乐。
听取了大家的意见,由于 celery 对于我来说过于复杂,所以使用 rq 来做。
叨扰一下新的问题:

  1. 长耗时任务分为两个步骤
  2. 步骤 1 执行完,才执行步骤 2
  3. 步骤 2 内部需要拆分为多个 worker 并行执行。

目前遇到的问题是

  1. step_two 里面的 worker 变成串行了。
  2. 要怎么汇总 step_two 的运行结果
  3. 如何 close 访问 /task 后创建的任务

以下代码使用 gunicorn 默认模式运行

from datetime import datetime
from redis import Redis
import time
import rq_dashboard
from flask import Flask, request
from rq import Queue, Worker
from rq.command import send_stop_job_command

redis = Redis(host='127.0.0.1', port=6379)
queue = Queue(connection=redis, name='yixianghuitong')

app = Flask(__name__)
app.config['RQ_DASHBOARD_REDIS_URL'] = 'redis://localhost:6379'
app.config.from_object(rq_dashboard.default_settings)
app.register_blueprint(rq_dashboard.blueprint, url_prefix="/rq")

@app.route('/task')
def add_task():
    user = request.args.get('user')
    if user:
        print('执行步骤 1')
        d = datetime.now().strftime('%Y%m%d%H%M%S')
        name = f"{user}_{d}"
        queue.enqueue(
            step_one,
            args=(name,)
        )
        worker = Worker([queue], connection=redis, name=name)
        worker.work(burst=True)

        print('执行步骤 2')
        step_two(name)
        # TODO: 汇总上面的结果返回给网页端
        return f"Task {name} complete"
    return 'No value for n'

def step_one(name):
    delay = 5
    print(f"Running {name}, Simulating {delay} second delay")

    time.sleep(delay)

    f = open(f"step_one_{name}.txt", 'a')
    d = datetime.now().strftime('%Y%m%d-%H%M%S')
    f.write(d)
    print(f"Task {name} complete")

    return name

def step_two(name):
    for i in range(2):
        worker_name = f"{name}_{i}"
        queue.enqueue(step_two_fn, worker_name)
        worker = Worker([queue], connection=redis, name=name)
        worker.work(burst=True)


def step_two_fn(name):
    delay = 5
    print(f"Running {name}, Simulating {delay} second delay")

    time.sleep(delay)

    f = open(f"step_two_fn_{name}.txt", 'a')
    d = datetime.now().strftime('%Y%m%d-%H%M%S')
    f.write(d)
    print(f"Task {name} complete")

    return name


@app.route('/close')
def close():
    # list_jobs = queue.get_jobs()
    # print(list_jobs)
    id = request.args.get('id')
    # job = queue.fetch_job(id)
    send_stop_job_command(redis, id)
    # for j in list_jobs:
    #     print(11, j.get_id())
    #     j.cancel()
    return 'len(list_jobs)'


1404 次点击
所在节点    Python
4 条回复
cyberpoint
2022-09-10 13:48:49 +08:00
目前项目使用 gunicorn 默认模式启用。
cyberpoint
2022-09-10 16:37:33 +08:00
可[有偿解决]( https://www.v2ex.com/t/879114)
noparking188
2022-09-10 23:26:35 +08:00
长耗时指多久,分钟级别还是小时级别?
这个可以参考下 https://github.com/RandyDeng/rq-docker-supervisor
supervisor 比较适合托管后台任务
如果并行任务是 CPU 密集型

换种思路,也许可以考虑 lambda function
runningman
2022-09-13 13:24:54 +08:00
可以加我,zhtsuc

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/879108

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX