Python asyncio 求助,要被搞疯了

2023-02-26 12:30:30 +08:00
 Drahcir

我最近在尝试使用 FastAPI + Prefect 2(2.8.3),目的是使用 FastAPI 构建的 RESTful API 触发 Prefect 构建的 Workflow 。 代码如下: main.py

from fastapi import FastAPI
from test_workflow import test

app = FastAPI(
    title="Data Services",
    description="REST API for data services",
    version="0.1.0",
)

@app.get("/")
async def root():
    return {"message": "Data Services"}

@app.get("/test")
def trigger():
    test()
    return "completed"

test_workflow.py

from prefect import flow, task
import time

@task
def sleep1():
    print("sleep1")
    time.sleep(5)  //模拟需要一定运行时间的计算流
    return 1
    
@flow()
def test():
    task_1 = sleep1.submit()
    x = task_1.result()
    print(x)

需首先运行下列命令开启服务:

$ prefect server start

$ uvicorn main:app --reload

如果尝试 GET /test ,会得到如下错误:

RuntimeError: <asyncio.locks.Event object at 0x7f0de4d19bd0 [unset]> is bound to a different event loop

如果 FastAPI 中定义为异步函数async def trigger():, 则顺利运行不会报错,但问题是这样就丧失了并行性,FastAPI 一次只能处理一个 /test 请求,显然不符合需求。

想了很多办法都没法解决,有没有懂这方面的帮忙看看?

3088 次点击
所在节点    Python
8 条回复
louistayd2
2023-02-26 13:06:09 +08:00
问问 ChatGPT:


这个错误通常发生在应用程序中存在多个事件循环时。根据你的描述,这可能是因为 FastAPI 和 Prefect 分别使用不同的事件循环,导致了该错误。

为了解决这个问题,你可以尝试使用一个事件循环来管理整个应用程序,这样就可以避免出现多个事件循环的情况。以下是一个可能的解决方案:

main.py 中创建一个事件循环对象并将其作为参数传递给 test() 函数,确保 FastAPI 和 Prefect 使用相同的事件循环。

```python
import asyncio

loop = asyncio.get_event_loop()

@app.get("/test")
def trigger():
loop.run_until_complete(test(loop))
return "completed"
```

test_workflow.py 中,将任务函数装饰器从 @task 改为 @task(run_on_executor=True),以确保它在一个新的线程中运行,从而不会与 FastAPI 使用的事件循环发生冲突。

```python
from prefect import flow, task
import time

@task(run_on_executor=True)
def sleep1():
print("sleep1")
time.sleep(5)
return 1

@flow
def test(loop):
with loop:
task_1 = sleep1.submit()
x = task_1.result()
print(x)
```

这样,通过将事件循环对象传递给 test() 函数并使用 @task(run_on_executor=True) 装饰器,你可以确保 FastAPI 和 Prefect 使用相同的事件循环,并且在运行任务时不会发生事件循环冲突。
NCZkevin
2023-02-26 13:19:16 +08:00
没用过 Prefect 2(2.8.3),出现这种情况大概率是因为在异步里使用了同步。 可以考虑把 test() 放到 loop.run_in_executor 去异步调用。
```
@app.get("/test")
async def trigger():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, test)
return "completed"
```
Drahcir
2023-02-26 13:40:22 +08:00
@louistayd2 虽然 AI 给的细节不对,比如 @task 并不存在 run_on_executor 这个参数。但是我感觉思路是对的,如果能让 Prefect 2 使用 FastAPI 的事件循环或许能解决这个问题。不过我不太了解 Prefect 2 源代码,暂时无解。
shuimugan
2023-02-26 13:48:45 +08:00
常见错误了,异步循环里别用同步的库,time.sleep 改 asyncio.sleep
Drahcir
2023-02-26 13:50:58 +08:00
@NCZkevin 你好,谢谢,用这种解决方案的话,确实 FastAPI 本身不会被阻塞。比如说如果我执行了 GET /test, 在 sleep 的同时,我可以继续访问 /docs 页面。
不过,问题是如果我想触发同一个请求的话,还是会按顺序执行。比如说,如果我同时执行两个 GET /test ,那么还是会一个接一个执行,不能并行。
实际上,我试了试 uvicorn 指定多个 worker ,发现即使是多进程下仍有这个问题。

这次,直接抛开 Prefect 测试:

```Python
def sleep():
print("sleep", time.time())
time.sleep(5)

@app.get("/test")
async def trigger():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, sleep)
return "completed"

```
Drahcir
2023-02-26 13:54:19 +08:00
@shuimugan 我用 time.sleep 只是为了模拟一个需要一定运行时间的 blocking 计算过程。这个计算过程是阻塞的,如果用 async def 就会在这段时间内无响应。
理论上,直接用 def 的话 FastAPI 会自动用多线程运行,但问题是出错 RuntimeError: <asyncio.locks.Event object at 0x7f0de4d19bd0 [unset]> is bound to a different event loop
NCZkevin
2023-02-26 14:03:07 +08:00
@Drahcir 我试了下,就用这个代码,用 ab 测试同时发 10 个请求看了下结果是并行的呀。
Drahcir
2023-02-26 14:25:33 +08:00
@NCZkevin 不好意思,是我的问题😓
我用 /docs 页面手动测试时有这个问题。不过我在命令行里面测试,发现的确是并行的。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/919246

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX