不死心,再来问一遍关于 Python 的 asyncio 问题

2019-07-22 17:42:42 +08:00
 waibunleung

有两段代码,是关于 asyncio 的。
代码段一:

import asyncio

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')

async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')

async def main():
    print('before await')
    await worker_1()
    print('awaited worker_1')
    await worker_2()
    print('awaited worker_2')

%time asyncio.run(main())

########## 输出 ##########

before await
worker_1 start
worker_1 done
awaited worker_1
worker_2 start
worker_2 done
awaited worker_2
Wall time: 3 s

代码段二:

import asyncio

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')

async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')

async def main():
    task1 = asyncio.create_task(worker_1())
    task2 = asyncio.create_task(worker_2())
    print('before await')
    await task1
    print('awaited worker_1')
    await task2
    print('awaited worker_2')

%time asyncio.run(main())

########## 输出 ##########

before await
worker_1 start
worker_2 start
worker_1 done
awaited worker_1
worker_2 done
awaited worker_2
Wall time: 2.01 s

问题:代码段一里面的协程(coroutine)换成代码段二的任务(task)后,为什么执行顺序就变了?这个过程中发生了什么事情?

说说我的猜想:
发现调用 asyncio.run(main()) 或者 [asyncio.gather()->asyncio.get_event_loop()->loop.run_until_complete()]都会将一个 coroutine 转化成 task/future 再放 event loop 里面去, 交由 event loop 去管理这些 task/future。代码段一只将 main()这个 coroutine 封装成了 task 加入到 event loop 中,所以整个 event loop 中只有一个 task 在走,在这个 task 中代码是顺序执行的,所以最后呈现出同步执行的结果;
但是代码段二调用了两次 asyncio.create_task(),这个方法会将一个 coroutine 转换成一个 task 并且放到 event loop 中,所以整个 event loop 其实有三个 task ( main,task1,task2 ),之后程序就交给 event loop 来调度,执行顺序就变不同了。 这个假设目前来看好像能解释得通

最后,希望各位能指点一下~

6495 次点击
所在节点    Python
90 条回复
waibunleung
2019-07-22 19:54:02 +08:00
@ilucio 不知道你是看了源码还是看了程序输出得出来的结论,我也不能确定你说的就是对的
wwqgtxx
2019-07-22 20:52:15 +08:00
讲道理,我在你上一个问题的#14 和#16 已经分析过 task 在 init 是怎么把自己放进 eventloop 了,核心点还是在于 call_soon

另外你提到的 pycharm 跳转到 pyi 文件的问题,那个 pyi 只是用来做类型标记的,并没有具体的实现,如果你想看到实现可以临时去 pycharm 的用户目录把包含 pyi 的目录暂时移走就能进入真正的.py 文件了。
上面提到了有一部分 eventloop 会涉及到 c 代码,其实那大部分是涉及到怎么 pull 一个 fd 的,并不影响你阅读了解 asyncio 模块的工作原理。对于一个 SelectorEventLoop 是可以纯 python 代码实现的( python3.5 的就是纯 py 代码实现),c 代码只是为了优化性能,大部分情况下源码中还有有纯 python 的替代实现,你可以仔细看看
waibunleung
2019-07-22 21:17:42 +08:00
@wwqgtxx 说实话,发现了更核心点的代码应该是在 call_soon 中传进去的 self.__step 这个函数中,有一段:
try:
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
super().set_exception(futures.CancelledError())
else:
super().set_result(exc.value)

里面的 coro.send(None)是启动协程的关键,#29 有提到了
另外你之前的答案我也有看,但是一下子没理解下来,现在打算重新梳理一下,无论如何,十分感谢你对我的帮助~
wwqgtxx
2019-07-22 22:51:04 +08:00
@waibunleung 关于__step 这个地方可能看一下 Python 的生成器那块会有更深的了解,最初的 asyncio 就是用生成器完成的,只是在后续版本中才逐步把 coroutines 和 generators 分离了,后续版本还有了异步生成器的概念,这些内容你可以看一下相关 PEP 文档,那里有解释为什么会采用这样的设计。
https://www.python.org/dev/peps/pep-0492/
https://www.python.org/dev/peps/pep-0525/
renmu
2019-07-23 07:47:47 +08:00
遇到 await 就阻塞了,只有等 await 那个变量执行完才会继续往下运行
congeec
2019-07-23 07:59:15 +08:00
上次没跟你讲清楚我觉得很失败

你还是拿起编辑器抄一个协程的实现吧,也就几百行,本质状态机
waibunleung
2019-07-23 09:44:42 +08:00
@congeec 不要有这样的想法呀,可能只是我反应比较慢一点。其实大家对我的帮助很多了,真的谢谢你们
lieh222
2019-07-23 09:48:41 +08:00
我个人的理解
代码一
event_loop 里面只有一个任务 main,CPU 执行代码顺序就是
main()
main: print('before await')
main: await worker_1()
worker_1()
worker_1: await asyncio.sleep(1)到这里的时候有 IO 事件,让出 CPU 给 event_loop,但是 event_loop 没有其他的任务,所说 CPU 会空置等待 asyncio.sleep(1)完成再切换到 worker_1 中继续执行,接下来都是这样,所以这整个过程是同步执行的

代码二
一开始有一个 main 任务
main()
main: task1 = asyncio.create_task(worker_1())
main: task2 = asyncio.create_task(worker_2())
main: print('before await') 这里 main 并没有让出 CPU,所以先打印 before await
main: await task1 这时 CPU 直接切换到了 task1,接下来
worker_1: print('worker_1 start')
worker_1: await asyncio.sleep(1)直到这里出现 IO 事件才会把 CPU 让出给 event_loop,event_loop 中 main 和 task1 都是 await 状态,CPU 切换到 task2 任务
worker_2: print('worker_2 start')
worker_2: await asyncio.sleep(2)出现 IO 时间切回 event_loop,三个任务都在 await,所以 CPU 会空置 1S 等待 asyncio.sleep(1),然后
worker_1: print('worker_1 done')
main: print('awaited worker_1')
main: await task2 等待 asyncio.sleep(2)结束
worker_2: print('worker_2 done')
main: print('awaited worker_2')
rocketman13
2019-07-23 10:50:19 +08:00
@lieh222 代码二你把 await task1 注释了再跑一次就会发现自己理解错了,await 不是你所理解的 cpu 切换或者启动任务,是主进程等待任务完成
waibunleung
2019-07-23 11:23:49 +08:00
@wwqgtxx
@dbow
@guokeke

讲真的,我试着整理了一下过程:

create_task->tasks.Task()的__init__->self._loop.call_soon[self.__step 函数作为参数传进去,里面的 result = coro.send(None)是启动协程的关键]
->_call_soon[self._ready.append(handle),这表示准备执行的任务队列]
在事件循环里:
base_events.py 里 run_forever->_run_once[handle = self._ready.popleft()取出一个任务(task/future),然后 handle._run()执行任务]

像之前说的 create_task 在将一个 coroutine 转化成 task 之后,将自己放进去了 event loop 中准备在下一轮循环中执行,那问题是是什么时候会进入下一轮循环的呢?

是不是这样的过程:
asyncio.run(main())之后,首先将 main()这个协程 coroutine 转化成 task 并加入 event loop,事件循环的第一轮执行 main 这个 task,执行期间将在 main 中创建的
两个 task(worker_1,worker_2)加入到 event loop 中,运行到 await task1 后让出控制权并检查事件循环里有没有其他任务,发现有刚刚新添加的两个任务,就转而去执行其他任务,
在其他任务中遇到了 await asyncio.sleep()再跳出来去执行另外的任务....直到所有任务执行完毕。

但如果是这样的话,好像只是在一轮循环里面就执行完了呀...总感觉哪里不对,是我理解错了吗?
wwqgtxx
2019-07-23 11:37:38 +08:00
@waibunleung 作为事件循环,当你 await task1 把控制权交还的时候,这一轮循环已经结束,随后立即开始下一轮的事件循环,检查有没有其他任务这已经是下一轮循环干的事了
wwqgtxx
2019-07-23 11:45:01 +08:00
@rocketman13 根据 python 的协程实现原理,只有在一个 coroutine 主动调用 await 或者 return 的时候才会发生 task 切换,虽然这里的 await task1 的目的是为了等待 task1 结束,但如果你在主 task 中永远不 await 任何一个 future 或者 return 的话(比如跑个死循环),其他 task 是永远不会被执行的。这点和线程可以剥夺式调度完全不同。( ps: python3.6 开始是可以检测一个 coroutine 执行了太长时间而发出警告的,但仍然不可主动剥夺)
wwqgtxx
2019-07-23 11:52:42 +08:00
@waibunleung 有个建议,你可以尝试自己实现一下如何在单线程下完成“多任务”,基本上就是楼上所说有限状态机,很多单片机程序就是一个大的 while 循环套几组 switch,一组 switch 就是一个 task,每个 switch 的 flag 是一个标志量,内部执行到需要等待的时候就把 flag 值改成下一个代码块,随后主动退出,去让下一组 task 执行,这样实现“伪并发”。当然由于单片机的程序单一,硬件资源极为有限(很多单片机 ram 只有 1kb 甚至更小),这里的 task 是写死的。而 python 中相当于每次 while 循环的时候从队列头取出一组 switch(task)来执行,执行完后再进行下一轮 while (还有些 io 相关的操作暂时省略)
waibunleung
2019-07-23 13:42:12 +08:00
@wwqgtxx 十分感谢!!
lieh222
2019-07-23 13:45:53 +08:00
@rocketman13 测试了一下,确实没有切换或启动任务的意思
j0hnj
2019-07-23 14:38:06 +08:00
赞楼主刨根问底的精神,有点像我当初为了搞清楚 asyncio 工作原理莽着源码看的样子。当看到 Task.__step 之后就明白整个链条是怎么串起来的了,有种醍醐灌顶的感觉,后来用 asyncio 写异步就得心应手了。
wwqgtxx
2019-07-23 15:24:31 +08:00
@j0hnj 我自己学习的时候是用 yield 和 yield from 手写了一个协程的实现(在 py3.4 的时候,那是 asyncio 还未加入标准库),不过理解的根源还是写“多任务”的单片机程序😂
waibunleung
2019-07-23 17:03:16 +08:00
@j0hnj 我看到 Task.__step 的时候也是跟你一样的感觉啊!!
waibunleung
2019-07-23 17:08:07 +08:00
@wwqgtxx
@j0hnj
其实最近提问的主题都被不少人收藏了,可能大家都会对这个感兴趣但是没人问出来并且刨进去,所以真的很感谢大家的帮忙,在这里的解答不仅是帮助到了我,还能照顾到其他收藏了或者点击进来的人,所以才希望有多点有建设性的评论,现在真的是莽着来看源码的,希望看着看着就没那么莽了吧。

借楼感谢上面这么多的评论的帮助!
zzth370
2019-07-23 22:45:57 +08:00
async def main():
task1 = asyncio.create_task(worker_1())
task2 = asyncio.create_task(worker_2())
print('before await')
await asyncio.sleep(2)
print('awaited worker_1')
await asyncio.sleep(1)
print('awaited worker_2')
结果:
before await
worker_1 start
worker_2 start
worker_1 done
awaited worker_1
worker_2 done
awaited worker_2


我把 main 里面的稍微改了下,得到跟你一样的结果,我感觉是 create_task 时就已经把两个任务添加到协程的任务列表里面去了,然后后面遇到阻塞就切换,然后就会得到跟 1 不一样的结果

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/585164

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX