这是一个被我修改过的是生产 /消费者模型,由于需要消费者返回数据,所以不得已用了两个 Queue,第二个用于把结果写回去,最后在 main 函数里取出返回,下面的代码在我的 python3.7 和 python3.8 环境都可以正常运行,先上代码,请大家过目。
import asyncio
import pandas as pd
import random
import re
async def crawler(u):
"""
模拟爬虫返回结果
:param u: fake url
:return:
"""
i = int(re.search(r"\d+", u).group(0))
await asyncio.sleep(random.random())
return {'url': u, 'result': i}
async def worker(qin, qout, w):
"""
消费者异步函数
:param qin: Queue1,用于生产者写入和消费者读出
:param qout: Queue2,用于让消费者回写结果
:param w: worker id
:return:
"""
while True:
if qin.empty():
break
u = await qin.get()
print(f"Worker-{w} crawling {u}")
resp = await crawler(u)
await qout.put(resp)
qout.task_done()
async def generate_url(url, qin):
"""
生产者异步函数
:param url:
:param qin: Queue1,写出生产者产出的(这里为传入的) url
:return:
"""
await qin.put(url)
print(f"Queue size = {qin.qsize()}")
# qin.task_done()
async def main(qmax=20):
q_in = asyncio.Queue(qmax)
q_out = asyncio.Queue()
urls = [f"url{i}" for i in range(100)]
producers = [asyncio.create_task(generate_url(u, q_in)) for u in urls]
# producers = [await q_in.put(u) for u in urls]
consumers = [asyncio.create_task(worker(q_in, q_out, i)) for i in range(1, qmax // 2 + 1)]
await asyncio.gather(*consumers)
await asyncio.gather(*producers)
# await q_in.join()
await q_out.join()
for c in consumers:
c.cancel()
return [await q_out.get() for _ in range(q_out.qsize())]
if __name__ == "__main__":
result = asyncio.run(main(30))
df = pd.DataFrame(result).set_index('url')
目前有三个问题,
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.