我最近在尝试使用 FastAPI + Prefect 2(2.8.3),目的是使用 FastAPI 构建的 RESTful API 触发 Prefect 构建的 Workflow 。 代码如下: main.py
from fastapi import FastAPI
from test_workflow import test
app = FastAPI(
title="Data Services",
description="REST API for data services",
version="0.1.0",
)
@app.get("/")
async def root():
return {"message": "Data Services"}
@app.get("/test")
def trigger():
test()
return "completed"
from prefect import flow, task
import time
@task
def sleep1():
print("sleep1")
time.sleep(5) //模拟需要一定运行时间的计算流
return 1
@flow()
def test():
task_1 = sleep1.submit()
x = task_1.result()
print(x)
需首先运行下列命令开启服务:
$ prefect server start
$ uvicorn main:app --reload
如果尝试 GET /test ,会得到如下错误:
RuntimeError: <asyncio.locks.Event object at 0x7f0de4d19bd0 [unset]> is bound to a different event loop
如果 FastAPI 中定义为异步函数async def trigger():
, 则顺利运行不会报错,但问题是这样就丧失了并行性,FastAPI 一次只能处理一个 /test 请求,显然不符合需求。
想了很多办法都没法解决,有没有懂这方面的帮忙看看?
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.