迫于机器配置太低,用多进程多线程,一秒钟才处理几百条 uri ,于是想着来用异步协程来试下,看着文档写出了这样一个丑陋的代码,搞了几万条 uri 测试了下,好像也没啥问题,不打印结果到屏幕的话,一秒钟差不多可以处理 1000 条,大概有这么几个步骤:
我现在的困惑是:
耽误大佬周五下午一点点时间,帮忙瞅一眼,不胜感激!
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
from asyncio import Semaphore
from aiohttp import ClientSession
from itertools import islice
def get_lines_iterator(filename, n=1000):
with open(filename) as fp:
while True:
lines = list(islice(fp, n))
if lines:
yield lines
else:
break
async def delete_file(uri: str,
session: ClientSession,
sem: Semaphore) -> int:
headers = {'Authorization': 'xxxxxxxxxxx'}
url = api + uri
async with sem, session.delete(url, headers=headers) as response:
return uri, response.status
# 写法 1:
# async def main(uris):
# sem = Semaphore(100)
# async with ClientSession() as session:
# tasks = [delete_file(uri, session, sem) for uri in uris]
# await asyncio.gather(*tasks)
# 写法 2:
async def main(uris):
sem = Semaphore(100)
async with ClientSession() as session:
async with asyncio.TaskGroup() as group:
result = [group.create_task(delete_file(
uri, session, sem)) for uri in uris]
return result
if __name__ == '__main__':
for lines in get_lines_iterator("uris.txt"):
uris = [uri.strip() for uri in lines]
result = asyncio.run(main(uris))
for x in result:
print(x.result())
1
fzzff 2023-07-28 15:56:48 +08:00 1
以下是对代码进行优化的建议:
1. 使用异步文件读取:可以使用`aiofiles`库来实现异步文件读取,从而避免阻塞事件循环。这将使得文件的读取操作也能并发进行,提高效率。 2. 使用异步上下文管理器:`aiohttp`支持异步上下文管理器,你可以使用`async with`语法来创建`ClientSession`,这样会更加简洁,而且会在任务完成后自动关闭会话。 3. 使用`asyncio.as_completed`:在并发执行任务时,可以使用`asyncio.as_completed`来获取已完成的任务,而不是等待所有任务都完成再处理结果。这样可以更早地得到一部分结果,并在需要时立即处理。 4. 异常处理:对于异步代码,异常处理十分重要。可以在任务执行时捕获异常,并记录错误信息,以便后续分析和处理。 下面是优化后的代码: ```python import asyncio import aiofiles from aiohttp import ClientSession async def delete_file(session: ClientSession, sem: asyncio.Semaphore, uri: str): headers = {'Authorization': 'xxxxxxxxxxx'} url = api + uri try: async with sem: async with session.delete(url, headers=headers) as response: return uri, response.status except Exception as e: # 处理异常,比如记录错误日志 print(f"Error occurred while processing {uri}: {str(e)}") return uri, None async def main(uris): sem = asyncio.Semaphore(100) async with ClientSession() as session: tasks = [delete_file(session, sem, uri) for uri in uris] for future in asyncio.as_completed(tasks): uri, status = await future if status is not None: print(f"{uri}: {status}") else: print(f"{uri}: Error") async def read_uris(filename): async with aiofiles.open(filename, mode='r') as fp: async for line in fp: yield line.strip() if __name__ == '__main__': asyncio.run(main(read_uris("uris.txt"))) ``` 在优化后的代码中,我们使用`aiofiles`库来异步读取文件,并使用`async for`来逐行获取 URI 。同时,我们使用`asyncio.as_completed`来处理已完成的任务,这样在某些任务执行较慢时,可以更早地输出结果,提高实时性。另外,我们在`delete_file`函数中增加了异常处理,确保在出现异常时不会导致整个任务中断。 |
2
fzzff 2023-07-28 16:01:37 +08:00
以上是 chatgpt 给出的代码优化建议, 另外个人建议你把请求库替换成 httpx
|
3
zong400 2023-07-28 16:03:11 +08:00
api 端只能并发 100 ?那你代码优化得再好也没用啊,多部署几个在不同 ip 的服务器吧
|
4
wuwukai007 2023-07-28 16:05:50 +08:00
api 并发 100 ,线程 100 并发不是绰绰有余吗?
|
5
fzzff 2023-07-28 16:14:01 +08:00 1
asyncio.run(main(uris))无论如何不应该放到循环里, 每次 asyncio.run()会创建一个新的事件循环, 你这个代码跑完创建了一堆事件循环同时运行性能必然受损, 而且你的 result 接收到的也不是完整的结果, 如果你想并发运行一组任务应该用 asyncio.gather
|
6
ohayoo OP @zong400 没有,api 端没有说限制 100 ,只是我自己觉得太高了,估计会 ban 我,所以先填个 100 试一试
|
10
zzl22100048 2023-07-28 17:03:46 +08:00 1
|
11
fzinfz 2023-07-28 17:12:49 +08:00 via iPhone 1
|
12
Maerd 2023-07-28 17:17:59 +08:00 1
最好的方案是使用 asyncio.Queue 来实现协程队列,不要把 asyncio.run 放到循环里,最好是从循环中往队列中塞 url ,然后使用 asyncio.create_task 来控制并发协程数量,从协程中不断读取队列内容实现并发,Semaphore 的实现是较为丑陋且性能低下的,可以看出你的代码风格还是受到了往进程池线程池硬塞内容风格的影响(我同事也喜欢这么写)
|
13
ClericPy 2023-07-28 20:33:45 +08:00 1
1. 每次读 1000 行,避免内存占用过多
1000 行内存太小了, 给你个简单的换算公式: 1 万条 URL 大约 1MB 内存 如果 api 响应较慢, 你会很多时候在等最后少数几条结束才能开始下一轮. 不如像他们说的丢异步队列, 然后开 100 个 task 去消费直到 Queue 空 2. 利用 Semaphore 来控制并发数量为 100 ,避免 API 端把我给 ban 了 100 并发实际上很大, 一般 nginx 上带点限流的, 一秒超过 2 个请求就给你 ban 了 其次还要考虑目标网站承载能力, 否则只会导致 dos 然后失败率骤增. 不要敲打服务器 是爬虫第一课 如果这服务器是自己家的... 千万行, 考虑改造下批量任务, 甚至这种请求量的就不像 HTTP 场景 3. 复用 session 没啥问题, 早期 aiohttp 还有 bug 不关 Session 一大堆事, 你分段来做也还好 4. asyncio.run 与 get_event_loop 建议找个 IDE 点 goto definition 看看源码, python 很多内置库源码都值得学习一下. 一般情况下 asyncio.run 只运行一次, 其他各种逻辑放到 main 函数里就好了. 启动多次也没啥事 挺简单个爬虫为啥那么纠结, 写个十几万行啥问题都没了. 你都用上 TaskGroup 了, 看看官方文档吧 PS: 不是大佬, 辨别乱 Q... |
15
ohayoo OP 感谢以上大佬的指教,我结合各位提到的点 再去琢磨琢磨
|
16
julyclyde 2023-07-29 21:11:05 +08:00
没看明白为什么还有 uris=[ ] 这一行
你输入文件里每行有多个 uri 吗? |
17
ohayoo OP @Maerd #12 特别感谢大佬的指导,用 threading 的时候知道要用 queue ,用协程却不知道应该去看看 asyncio 的 Queue ,我的我的,感谢大佬点醒我
|
18
zyxbcde 2023-08-02 13:04:02 +08:00 via iPhone
都是些伪需求啊
文件那么点完全没必要分开读。 也没必要卡信号量,aiohttp 在创建 session 时候有个参数可以限制最大连接数。 另外我个人不太喜欢用上下文管理 session ,都是自己手动关闭。 |