如何实现一个实时转发 Stream 流的接口

2023-03-08 17:26:57 +08:00
 WindyXu

想写一个实时转发 Stream 流内容的接口,不怎么懂 stream 流相关的知识

之前的接口是

async def index(path: str, request: Request):
    data = await request.json()
    resp = c.send_request(path, data)
    headers = dict(resp.headers)

    async def generate():
        for chunk in resp.iter_lines():
            if chunk:
                yield chunk + b'\n\n'
    return StreamingResponse(generate(), headers=headers)

问题是,这个会等接受完全部的 Stream 流信息,才会转发,想实现的效果是实时转发 stream 流信息,具体一点就是,接收到一行或者指定大小的信息块,就转发出去。

想法是将 resp = c.send_request(path, data)这块改造成异步的,然后实时转发。

目前的代码是

async def index(path: str, request: Request):
    data = await request.json()
    async with httpx.AsyncClient() as client:
        async with client.stream('POST', path, json=data) as response:
            resp_headers = dict(response.headers)
            async def generate():
                async for line in response.aiter_lines():
                    if line:
                        yield line.encode('utf-8') + b'\n\n'
            return StreamingResponse(generate(), headers=resp_headers)

但是会报:

httpx.StreamClosed: Attempted to read or stream content, but the stream has been closed.

不知道怎么改,求赐教

1643 次点击
所在节点    Python
5 条回复
yuanxing008
2023-03-08 17:45:14 +08:00
这个为什么还要走代码。直接用 Nginx 转出去不就好了么 你是要有业务逻辑处理?
noparking188
2023-03-08 17:50:30 +08:00
有过类似需求,需要对下载过程中的 stream 数据进行处理,搜到篇文章 [How to stream Microsoft SQL Server to S3 using BCP on linux]( https://dstan.medium.com/streaming-microsoft-sql-server-to-s3-using-bcp-35241967d2e0),原理就是借助 named pipe 来对不支持 pipe 的下载工具截取 stream 进行处理。

基于这个原理我写了个迁移 SQL Server 到 S3 的小工具: https://github.com/zhiweio/StreamXfer

楼主可以参考下,读于你描述的需求,需要一个进程实时写 stream 到 named pipe ,另一个进程实时读 stream 写到其它地方,类似:
1. mkfifo /tmp/fifo
2. wget 'https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2021-financial-year-provisional/Download-data/annual-enterprise-survey-2021-financial-year-provisional-csv.csv' -o /tmp/fifo &
3. cat /tmp/fifo | aws s3 cp - s3://bucket/xxx

希望能够有帮助,如果可以的话欢迎给我一个 star ,谢谢 :)
liprais
2023-03-08 17:51:52 +08:00
找个用 splice api 的完事
009694
2023-03-08 18:55:38 +08:00
不要用 async with 。这样 return 出去的时候连接就已经关了 。用 background 去处理 httpx 未关闭的连接
ruanimal
2023-03-09 09:47:50 +08:00
`data = await request.json()` 这里其实就取了全部数据了, 不存在实时转发吧

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/922321

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX