V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
abersheeran
V2EX  ›  Python

对 coroutine.__await__() 调用 next 为什么会导致重复进入 Future.__await__()

  •  
  •   abersheeran ·
    abersheeran · 6 天前 · 1161 次点击
    import types
    from typing import Any, Awaitable
    
    
    @types.coroutine
    def _async_yield(obj):
        return (yield obj)
    
    
    async def gather(*aws: Awaitable[Any], return_exceptions: bool = False):
        result_array: list[Any] = [None for _ in range(len(aws))]
        done, pending = [], [*aws]
        while pending:
            for i, awaitable in enumerate(aws):
                if awaitable in done:
                    continue
                try:
                    next(awaitable.__await__())
                    await _async_yield(None)
                except StopIteration as exc:
                    result_array[i] = exc.value
                    done.append(awaitable)
                    pending.remove(awaitable)
                except BaseException as exc:
                    if not return_exceptions:
                        raise
                    result_array[i] = exc
                    done.append(awaitable)
                    pending.remove(awaitable)
        return result_array
    
    
    async def test_wait():
        async def a():
            return 1
    
        async def b():
            import asyncio
    
            await asyncio.sleep(0)
            return 2
    
        async def c():
            import asyncio
    
            await asyncio.sleep(1)
            return 3
    
        return await gather(a(), b(), c(), asyncio.create_task(c()))
    
    
    if __name__ == "__main__":
        import asyncio
    
        print(asyncio.run(test_wait()))
    

    报错所处的位置:

        # asyncio/futures.py line 281
        def __await__(self):
            if not self.done():
                self._asyncio_future_blocking = True
                yield self  # This tells Task to wait for completion.
            if not self.done():
                raise RuntimeError("await wasn't used with future")
            return self.result()  # May raise too.
    
    第 1 条附言  ·  2 天前
    在 asyncio/tasks.py 的 Task.__step 找到了答案。此贴完结。
    veoco
        1
    veoco  
       6 天前
    不能在同步 for 里调用异步代码
    ruanimal
        2
    ruanimal  
       6 天前
    因为 Python 是无栈协程?
    Kobayashi
        3
    Kobayashi  
       6 天前   ❤️ 2
    原因大概可以解释。看样子 awaitable.__await__() 返回了 future. 而 Future.__iter__ = Future.__await__. 而 Future result 未被设置时,Future.__await__() 返回自己。

    从你自定义的 gather() 逻辑推测,你想绕过 asyncio 的任务运行控制,自己严格控制过个 Task 交替运行?比如任务 A, B, C 分别拆分为步骤 A1, A2, A3, B1, B2 ...,你想确保 A1, B1, C1, A2, ... 顺序?

    建议先把 asyncio 源码读完理解 loop 如何全局调控任务、Task 封装 coroutine 起什么作用,以及 Future 又是什么,之后回过头来想这个事情。

    异步就是在等一个任务的时候,去做另外一个任务。asyncio 事件循环默认不实现 A1, B1, C1 有序完全没有问题。
    假设实现了这样的机制,如果 A1 运行需要等待一个新任务 X ,而 B1, C1 要等待 A1 。现在事件循环中所有任务都在等 X ,这还算异步吗?
    另外,这里强调的是事件循环默认行为不能这么做,不然可能引起阻塞。但 asyncio 确实提供了任务间依赖的机制:Event, Lock, Condition ... 其原理都是 Future ,而 Future 本质上就是一个信号,任务 X 开始时返回一个信号给 A ,A 拿到信号后把自己后续步骤执行作为信号处理函数挂上去,任务 X 完成时触发信号,运行跳回 A1 。

    总之,要自定义实现任务有序,1 )局部加入步骤有序,要么用 Event, Condition, Lock, Semaphore ,或者直接使用其更底层 Future 信号机制在步骤间建立机制。2 )全局的话不行,但作为一个基础包,asyncio 不能让事件循环处理任务步骤时有序,这样就不是异步了。自己玩玩不发包怎么搞都行。
    Sinksky
        4
    Sinksky  
       6 天前
    因为 asyncio.sleep 会跑到 asyncio 的事件循环里面?
    abersheeran
        5
    abersheeran  
    OP
       5 天前
    @Kobayashi loop 是怎么感知到 Future 已经可以读结果的?是通过 isfuture 判断类型再去读状态吗?我没找到这部分代码。
    Kobayashi
        6
    Kobayashi  
       5 天前 via Android
    @abersheeran 不感知,loop 不直接处理 Future 。
    异步里协程不是主动运行,而是把自己交给 loop ,loop 负责调控所有待运行任务列表,它不管协程返回什么值。

    Task 是对于下边协程的封装,Task._step() 调用协程的每一步,并对每一步的返回值做出响应。假设有任务 A ,它在调用协程时需要等待任务 X ,任务 X 先返回 future 给 A ,TaskA._step() 判断拿到了 future ,则把后续运行动作(还是 ._step())注为 future 的回调。等 X 完成后,它会设置 future 值,触发回调调用回到 A 。
    一个任务等待另一个任务不经过 loop ,就是利用信号挂起后续执行。
    abersheeran
        7
    abersheeran  
    OP
       5 天前
    @Kobayashi 不感知是怎么结束 await 等待的?
    关于   ·   帮助文档   ·   API   ·   FAQ   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   1400 人在线   最高记录 5497   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 31ms · UTC 18:15 · PVG 02:15 · LAX 11:15 · JFK 14:15
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.