关于 asyncio 执行 IO 密集型操作的不解

2021-11-20 22:17:36 +08:00
 firejoke

有一个读文件然后写数据库的操作,想尝试使用协程。
使用协程的:

async def parse_text(file_path: Path, context_qs: [asyncio.Queue]):
    ql = len(context_qs)
    i = 0
    # 每一个 Queue 放 step 个数据就切换下一个
    step = 2
    with open(file_path, encoding="utf8") as f:
        for text in f:
            if i // step == ql:
                i = 0
            context_q = context_qs[i // step]
            context = {}
            text = re.findall(r"\d+", text)
            if text:
                context = {"解析然后组装成 dict"}
                await context_q.put(context)
                # 这里如果不 join ,会一直在这个 for 循环里不出去
                await context_q.join()
                i = i + 1
        else:
            await context_q.put("结束标记")
            return


async def write_db(context_q: asyncio.Queue, model: ModelBase):
    async with AsyncSession() as session:
        while 1:
            context = await context_q.get()
            if context["结束标记"] == "end":
                return
            info, obj = None, None
            try:
                if context["info"]:
                    info = await session.execute(
                        select(InfoModel).filter(
                            InfoModel.attr == context["info"]
                        )
                    )
                    info = info.scalars().one_or_none()
                    if not info:
                        info = InfoModel(attr=context["info"])
                        session.add(info)
                if context["header"]:
                    obj = await session.execute(
                        select(model).filter(
                            model.header == context["header"]
                        ).options(selectinload(getattr(model, "info")))
                    )
                    obj = obj.scalars().one_or_none()
                    if not obj:
                        obj = model(header=context["header"])
                        session.add(obj)
                if obj or info:
                    if info not in obj.info:
                        obj.info.append(info)
                        session.add(obj)
                    await session.commit()
            except Exception as e:
                await session.rollback()
                raise e
            else:
                context_q.task_done()


async def main():
	# 每个读取文件并解析的方法对应 c_q_count 个写数据库的方法
    c_q_count = 3
    a_context_qs = [asyncio.Queue() for i in range(c_q_count)]
    b_context_qs = [asyncio.Queue() for i in range(c_q_count)]
    tasks = [
        asyncio.create_task(
            parse_text(Path("a.txt"), a_context_qs)
        ),
        asyncio.create_task(
            parse_text(Path("b.txt"), b_context_qs)
        ),
    ]
    for i in range(c_q_count):
        tasks.append(asyncio.create_task(write_db(a_context_qs[i], AModel)))
        tasks.append(asyncio.create_task(write_db(b_context_qs[i], BModel)))
    await asyncio.gather(*tasks)



if __name__ == '__main__':
    asyncio.run(main(), debug=settings.DEBUG)

不使用协程的:

def sync_read_file():
    af = Path("a.txt").open(encoding="utf8")
    bf = Path("b.txt").open(encoding="utf8")
    with Session() as session:
        while 1:
            if af:
                try:
                    text = af.readline()
                    context = parse_text(text)
                    sync_write_db(session, context, AModel)
                except IOError:
                    af.close()
                    af = None
            if bf:
                try:
                    text = bf.readline()
                    context = parse_text(text)
                    sync_write_db(session, context, BModel)
                except IOError:
                    bf.close()
                    bf = None
            if not af and not bf:
                return


def sync_write_db(session, context, model):
    info, obj = None, None
    try:
        if context["info"]:
            info = session.execute(
                select(Info).filter(
                    Info.attr == context["info"]
                )
            )
            info = info.scalars().one_or_none()
            if not info:
                info = Info(attr=context["info"])
                session.add(info)
        if context["header"]:
            obj = session.execute(
                select(model).filter(model.info == context["info"]))
            obj = obj.scalars().one_or_none()
            if not obj:
                obj = model(info=context["info"])
                session.add(obj)
        if obj or info:
            if info not in obj.info:
                obj.info.append(info)
                session.add(obj)
            session.commit()
    except Exception as e:
        session.rollback()
        raise e


if __name__ == '__main__':
    sync_read_file()

这个协程的方法,每秒每个表可以写 400 多行,改为同步单线程的还是每秒写 400 多行。
不知道是我协程的用法有问题?还是说有别的什么原因?

4163 次点击
所在节点    Python
36 条回复
jenlors
2021-11-20 23:11:37 +08:00
试试 aiofiles 之类的,你的文件 IO 还是同步的
Trim21
2021-11-20 23:15:05 +08:00
with open(file_path, encoding="utf8") as f:
for text in f:

这两行都是阻塞的
firejoke
2021-11-20 23:33:33 +08:00
@long2ice #1
@Trim21 #2
文件这里只是读取,然后放进队列里,这也会导致阻塞吗?
Trim21
2021-11-20 23:36:52 +08:00
@firejoke #3 同步 io 会阻塞掉整个事件循环。
firejoke
2021-11-20 23:47:33 +08:00
@Trim21 #4 所以,也会导致如果不主动用队列的 join 阻塞住,就不会跳到其他 await 的地方?
Nitroethane
2021-11-20 23:48:52 +08:00
@firejoke 如果读取文件速度比较慢,而且文件比较大的话影响应该比较明显
firejoke
2021-11-20 23:50:19 +08:00
@Nitroethane #6 每行数据小于 1kb ,而且是用的 for ,这里相当于一个生成器
Trim21
2021-11-20 23:50:53 +08:00
@firejoke #5 不是,你的代码中仅仅会阻塞在 open 和 for text in f 这两行。在等待这两行底层的同步 io 完成的时间里是不会运行其他 task 的。
firejoke
2021-11-21 00:19:02 +08:00
@Trim21 #8 我改成了 asyncfiles ,然后把队列的 join 去掉了,这次成功跳到了其他 await 的位置,确实如你所说,感谢!
但测试发现,虽然没了 io 的阻塞,但写入速度还是没太大变化,他每读一行,切到其他 task ,和我之前没读一行,join 住,就执行流程来说,是不是没差?
Trim21
2021-11-21 00:51:20 +08:00
我没仔细看完整的代码,只是看到一开始就有同步阻塞的问题就回复了。
locoz
2021-11-21 02:50:56 +08:00
目测是正则导致的阻塞...有一说一你这种情况不太适合用 asyncio ,或者说不太适合没有包上隐式多进程的 asyncio ,毕竟不是纯粹的 IO 操作。然后文件操作方面 aiofiles 实际背后也是靠线程池跑的,这一点需要注意一下,有时候可能会导致踩坑。
documentzhangx66
2021-11-21 03:29:02 +08:00
先监视一下设备性能极限。
iostat -x -m -d 1
LeeReamond
2021-11-21 04:07:18 +08:00
大概看了一眼楼上说的应该没问题,并非所有类型的任务都能通过异步加速,你要做好心理准备。另外 aiofiles 的实现其实很丑陋。。楼上说是线程池跑的,我有点忘记具体情况了,只记得以前读源码的印象是很丑陋。。
Contextualist
2021-11-21 08:17:31 +08:00
看上去没有明显的问题,不过对于任何为了改进性能的重写建议还是先 profile 一下,看看瓶颈到底出在哪个调用上。

然后异步文件 IO 不是为了提升性能(降低平均延迟)的,而是为了降低尾延迟的,参见: https://trio.readthedocs.io/en/stable/reference-io.html#background-why-is-async-file-i-o-useful-the-answer-may-surprise-you
2i2Re2PLMaDnghL
2021-11-21 08:47:44 +08:00
(我会尝试先把所有信息读进内存然后 timeit 数据库部分,看瓶颈是不是文件
lesismal
2021-11-21 11:37:53 +08:00
不是说给函数加上异步就是一切都异步了:
1. 异步的函数 A
2. A 内部调用 B C D ,B C D 有任意同步阻塞的行为,A 也一样跟着阻塞

py 的性能痛点远不只是 asyncio 就能解决的了的,how about trying golang -_-
firejoke
2021-11-21 12:26:21 +08:00
@locoz #11 我也感觉似乎没发挥出 asyncio 的优势,每一条数据都不超过 1kb ,所以可能除了数据库操作稍微耗时长一点,其他地方等待的很少,所以和单线程的性能差不多?另外请教一下,“没有包上隐式多进程” 具体是指什么呢?
firejoke
2021-11-21 12:30:43 +08:00
@documentzhangx66 #12 设备性能应该没问题,12 核 24 线程,64G 内存,磁盘读取速度也没有跑满,IO 读写也不是特别高。
firejoke
2021-11-21 12:31:57 +08:00
@LeeReamond #13 嗯,我昨天也想了一下,如果每一步阻塞住的操作实际上都很快,那 asyncio 其实发挥不出切换等待的优势。
locoz
2021-11-21 12:37:05 +08:00
@firejoke #17 建议用调试工具或者排除法看看具体是哪里拖慢了,单看代码和前面的讨论我感觉是正则部分导致的。

前面没讲清楚,“包上隐式多进程的 asyncio”指的是把多进程和协程结合,开一堆子进程然后每个子进程一个 eventloop ,因为之前有看到过一个专门的库把这部分操作给隐式处理了,使用起来两三行搞定,不需要自己写进程管理部分。然后一些框架其实也会隐式地做这种结合处理来提高效率。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/816841

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX