这个协程例子中, consumers 是怎么被执行的?

2020-11-23 16:45:52 +08:00
 killva4624
import asyncio


# 消费者
async def consumer(n, q):
    print("consumer {}: 进入商店".format(n))
    while True:
        item = await q.get()
        print("consumer {}: 购买产品 {}".format(n, item))
        await asyncio.sleep(0.15)
        q.task_done()
    print("consumer {}: ending".format(n))


# 生产者 A
async def producer_a(q, num_workers):
    print("生产者 A: 开始生产")
    for i in range(num_workers * 2):
        await q.put("A " + str(i))
        print("生产者 A: 上架产品 A {}".format(i))
        await asyncio.sleep(0.01)


# 生产者 B
async def producer_b(q, num_workers):
    print("生产者 B: 开始生产")
    for i in range(num_workers * 2):
        await q.put("B " + str(i))
        print("生产者 B: 上架产品 B {}".format(i))
        await asyncio.sleep(0.02)


# 任务调度
async def main(num_consumers, num_workers):
    q = asyncio.Queue(maxsize=num_consumers)
    print("初始化生产者")
    prod_a = [asyncio.create_task(producer_a(q, num_workers))]
    prod_b = [asyncio.create_task(producer_b(q, num_workers))]
    print("初始化消费者")
    consumers = [asyncio.create_task(consumer(i, q)) for i in range(num_consumers)]
    print("asyncio 调度开始")
    await asyncio.gather(*prod_a, *prod_b)
    print("生产者 All: ending")

    await q.join()
    print("消费者 All: ending")

    for c in consumers:
        c.cancel()


# main(消费者数量, 生产者数量)
asyncio.run(main(3, 2))

输出结果如下:

初始化生产者
初始化消费者
asyncio 调度开始
生产者 A: 开始生产
生产者 A: 上架产品 A 0
生产者 B: 开始生产
生产者 B: 上架产品 B 0
consumer 0: 进入商店
consumer 0: 购买产品 A 0
consumer 1: 进入商店
consumer 1: 购买产品 B 0
consumer 2: 进入商店
生产者 A: 上架产品 A 1
consumer 2: 购买产品 A 1
生产者 B: 上架产品 B 1
生产者 A: 上架产品 A 2
生产者 A: 上架产品 A 3
consumer 0: 购买产品 B 1
consumer 1: 购买产品 A 2
生产者 B: 上架产品 B 2
consumer 2: 购买产品 A 3
生产者 B: 上架产品 B 3
生产者 All: ending
consumer 0: 购买产品 B 2
consumer 1: 购买产品 B 3
消费者 All: ending

我理解 await asyncio.gather(*prod_a, *prod_b) 执行了 prod_aprod_b ,但 consumers 是怎么被触发的呢?

1642 次点击
所在节点    Python
3 条回复
mercurylanded
2020-11-23 16:55:48 +08:00
create_task 就触发了 gather 是等待结束
killva4624
2020-11-23 17:24:10 +08:00
@mercurylanded 谢谢,重新去看了官方文档和源码。
fasionchan
2020-11-24 08:28:44 +08:00
create_task 的时候,协程被创建,但还没被调度执行;
await gather 的时候,程序执行权被转移到 asyncio 内部的事件循环;
这时,先前创建的协程如果达到执行条件,将被事件循环调度执行;
直到 gather 等待的协程,也就是 producer 执行完毕,程序执行权回到当前代码;
await q.join 的时候,程序执行权又被转移到事件循环,comsumer 继续执行;
直到队列 q 为空,这时 consumer 肯定已经完成所有数据消费;

可以自己动手写一个极简的协程库加深对协程运行原理的理解,只需 100 来行代码,即可一举拿下协程、事件循环、IO 多路复用等核心概念:

https://www.imooc.com/read/76/article/1935

我只看 asyncio 文档时,总有一种似懂非懂的感觉;自己折腾过一遍后就清晰了,各种概念对号入座。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/728390

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX