请教这段 Python 协程代码还能如何优化?

2019-12-18 15:48:49 +08:00
 princelai

这是一个被我修改过的是生产 /消费者模型,由于需要消费者返回数据,所以不得已用了两个 Queue,第二个用于把结果写回去,最后在 main 函数里取出返回,下面的代码在我的 python3.7 和 python3.8 环境都可以正常运行,先上代码,请大家过目。

import asyncio
import pandas as pd
import random
import re


async def crawler(u):
    """
    模拟爬虫返回结果
    :param u: fake url
    :return:
    """
    i = int(re.search(r"\d+", u).group(0))
    await asyncio.sleep(random.random())
    return {'url': u, 'result': i}


async def worker(qin, qout, w):
    """
    消费者异步函数
    :param qin: Queue1,用于生产者写入和消费者读出
    :param qout: Queue2,用于让消费者回写结果
    :param w: worker id
    :return:
    """
    while True:
        if qin.empty():
            break
        u = await qin.get()
        print(f"Worker-{w} crawling {u}")
        resp = await crawler(u)
        await qout.put(resp)
        qout.task_done()


async def generate_url(url, qin):
    """
    生产者异步函数
    :param url:
    :param qin: Queue1,写出生产者产出的(这里为传入的) url
    :return:
    """
    await qin.put(url)
    print(f"Queue size = {qin.qsize()}")
    # qin.task_done()


async def main(qmax=20):
    q_in = asyncio.Queue(qmax)
    q_out = asyncio.Queue()
    urls = [f"url{i}" for i in range(100)]

    producers = [asyncio.create_task(generate_url(u, q_in)) for u in urls]
    # producers = [await q_in.put(u) for u in urls]
    consumers = [asyncio.create_task(worker(q_in, q_out, i)) for i in range(1, qmax // 2 + 1)]
    await asyncio.gather(*consumers)
    await asyncio.gather(*producers)
    # await q_in.join()
    await q_out.join()
    for c in consumers:
        c.cancel()
    return [await q_out.get() for _ in range(q_out.qsize())]


if __name__ == "__main__":
    result = asyncio.run(main(30))
    df = pd.DataFrame(result).set_index('url')

目前有三个问题,

  1. 被注释掉的 qin.join,qin.task_done 是需要?还是让第二个 queue 阻塞就好了?
  2. main 函数中的 producers 是我写的异步列表推导式,想用于替代 generate_url 函数,但是不能正常运行
  3. 是否还有更优雅的实现方式?
5091 次点击
所在节点    Python
19 条回复
princelai
2019-12-18 17:09:45 +08:00
没有大佬来指点下吗?
GoLand
2019-12-18 17:34:22 +08:00
queue 是多余的。直接:

urls = [f"url{i}" for i in range(100)]
tasks = [crawler(url) for url in urls]
results = await asyncio.gather(*tasks)

就可以。
princelai
2019-12-18 17:43:34 +08:00
@GoLand #2 谢谢,不过你说这个我知道可以,我这么设计的目的我忘说了,因为 url 是本地生成的,所以会很快,如果一次性把 url 全部创建为 task,那么 gather 后会一次性创建非常多的链接链接目标网站,我怕网站受不了,也怕自己 IP 被封,所以才不得已使用生产 /消费者,用输入的 Queue 的最大容量限制爬取速度。
ClericPy
2019-12-18 17:53:42 +08:00
限频可以用 semphore 和 sleep, Queue 这么用有点怪
princelai
2019-12-18 18:00:53 +08:00
@ClericPy #4 那就如 2 楼的代码,不需要用 Queue,请问信号量应该在哪里写啊?
superrichman
2019-12-18 18:20:41 +08:00
@princelai 搜一下 aiohttp 和 async with semaphore 就知道怎么写了
ClericPy
2019-12-18 18:29:55 +08:00
@princelai #5 信号量可以粗暴的理解成并发锁, 类似 golang 里 channel 的那个数字, 也就是同时并发的个数, 只有申请到的人才有资格执行, 其他人等待, 楼上已经给你看了
你那个 producers 是想用 async for 么
xiaozizayang
2019-12-18 21:40:21 +08:00
楼主,不知道我写的异步爬虫框架能不能帮到你:

https://github.com/howie6879/ruia

楼上说的信号量也支持~
gwy15
2019-12-18 22:48:59 +08:00
楼主你要的这个功能能抽象出来啊,没必要写这么复杂。

我写的库(不推荐,当时没发现成熟第三方库)

https://github.com/gwy15/async_pool

调用方式:
```
with Pool(4) as pool:
results = pool.map(fetch, urls)
```

更好更全的第三方库:

https://github.com/h2non/paco

调用方式:
```
responses = await paco.map(fetch, urls, limit=3)
```
princelai
2019-12-19 11:39:24 +08:00
@gwy15 #9 感谢,这个库试了下,写出来很简洁,就是可能是我的 py 版本太高,在 pycharm 里有错误提示,但是稍微修改下可以正常运行。

```python
import asyncio
import random
import re

import paco


async def crawler(u):
i = int(re.search(r"\d+", u).group(0))
await asyncio.sleep(random.random() * 3)
print(f"crawled {u}")
return i


async def main():
urls = [f"url{i}" for i in range(100)]
gather = await paco.map(crawler, urls, limit=20)
return gather


if __name__ == "__main__":
result = asyncio.run(main())

```
princelai
2019-12-19 11:41:06 +08:00
@ClericPy #7
@superrichman #6

感谢二位,用信号量的代码写出来了,比原来好很多

```python
import asyncio
import random
import re


async def crawler(u, sem):
async with sem:
i = int(re.search(r"\d+", u).group(0))
await asyncio.sleep(random.random() * 5)
print(f"crawled {u}")
return i


async def main():
sem = asyncio.Semaphore(20)
urls = [f"url{i}" for i in range(100)]
tasks = [crawler(u, sem) for u in urls]
gather = await asyncio.gather(*tasks)
return gather


if __name__ == "__main__":
result = asyncio.run(main())

```
princelai
2019-12-19 11:41:51 +08:00
不支持 markdown 吗,格式全乱了
yedashuai
2019-12-19 12:59:15 +08:00
@princelai queue 的使用是不是也有一点好处,可以限制内存的使用量,如果数据量很大,所有的 tasks 都存在 list 里
sxd96
2019-12-20 23:59:20 +08:00
@yedashuai 这些都相当于是 generator 吧?不会把所有数据都直接放进内存的吧
sxd96
2019-12-21 10:01:40 +08:00
@princelai 想问下如果爬虫 async 的话,requests 支持嘛?好像是要换用 httpx 或者 aiohttp ?这俩哪个比较好用?
princelai
2019-12-21 19:49:04 +08:00
@sxd96 如果所有都并发开始了在那就不是生成器,就已经在内存中运行了,我一般都用官方的 aiohttp,没用过另一个
sxd96
2019-12-21 21:17:59 +08:00
@princelai 哦哦是这样啊。那我的需求如果是从数据库里拿 url 出来给 crawler,也就是说那边 coroutine 在跑,然后生产者在产生新的 url,是不是还是得用 asyncio.Queue ?
princelai
2019-12-21 22:37:29 +08:00
@sxd96 应该是,生产者 aiomysql 取数据存入 queue,我这么写主要是因为我的 URL 是本地生成但又想控制速度,和你的不一样
sxd96
2019-12-22 11:41:26 +08:00
@princelai 明白了,感谢啦。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/630207

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX