目前需要从一个 FTP 服务器下载 3 万多个小文件,之前用 multiprocessing 总是下一部分之后就停了,所以尝试用异步加快下载:
class pdb:
def __init__(self):
self.ids = []
self.dl_id = []
self.err_id = []
async def download_file(self, session, url):
try:
with async_timeout.timeout(10):
async with session.get(url) as remotefile:
if remotefile.status == 200:
data = await remotefile.read()
return {"error": "", "data": data}
else:
return {"error": remotefile.status, "data": ""}
except Exception as e:
return {"error": e, "data": ""}
async def unzip(self, session, work_queue):
while not work_queue.empty():
queue_url = await work_queue.get()
print(queue_url)
data = await self.download_file(session, queue_url)
id = queue_url[-11:-7]
ID = id.upper()
if not data["error"]:
saved_pdb = os.path.join("./pdb", ID, f'{ID}.pdb')
if ID not in self.dl_id:
self.dl_id.append(ID)
with open(f"{id}.ent.gz", 'wb') as f:
f.write(data["data"].read())
with gzip.open(f"{id}.ent.gz", "rb") as inFile, open(saved_pdb, "wb") as outFile:
shutil.copyfileobj(inFile, outFile)
os.remove(f"{id}.ent.gz")
else:
self.err_id.append(ID)
def download_queue(self, urls):
loop = asyncio.get_event_loop()
q = asyncio.Queue(loop=loop)
[q.put_nowait(url) for url in urls]
con = aiohttp.TCPConnector(limit=10)
with aiohttp.ClientSession(loop=loop, connector=con) as session:
tasks = [asyncio.ensure_future(self.unzip(session, q)) for _ in range(len(urls))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
if __name__ == "__main__":
x = pdb()
urls = ['ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/nf/pdb4nfn.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/ny/pdb4nyj.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/mn/pdb2mnz.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/ra/pdb4ra4.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/x5/pdb4x5w.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/dm/pdb2dmq.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/n7/pdb2n7r.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/om/pdb2omv.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/oy/pdb3oy8.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/fe/pdb3fej.ent.gz', 'ftp://ftp.wwpdb.org/pub/pdb/data/structures/divided/pdb/hw/pdb2hw9.ent.gz']
x.download_queue(urls)
报错信息如下:
Traceback (most recent call last):
File "test.py", line 111, in <module>
x.download_queue(urls)
File "test.py", line 99, in download_queue
loop.run_until_complete(asyncio.gather(*tasks))
File "/home/yz/miniconda3/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
return future.result()
File "test.py", line 73, in unzip
data = await self.download_file(session, queue_url)
File "test.py", line 65, in download_file
return {"error": remotefile.status, "data": ""}
File "/home/yz/miniconda3/lib/python3.6/site-packages/async_timeout/init.py", line 46, in exit
raise asyncio.TimeoutError from None
concurrent.futures._base.TimeoutError
请大家帮忙看看。谢谢!
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.