求助 Python 异步多线程下载又拍云图片问题

2017-09-03 21:10:06 +08:00
 asuraa

代码

import asyncio
import base64
import os
import urllib

import aiohttp

# -----------------------
# -----------------------
bucket = 'xxx'
username = 'xxx'
password = 'xxxxxx'
hostname = "xxxxxx"
base_save_path = 'f:'
# -----------------------

headers = {}
auth = base64.b64encode(f'{username}:{password}'.encode(encoding='utf-8'))
headers['Authorization'] = 'Basic ' + str(auth)
headers['User-Agent'] = "UPYUN_DOWNLOAD_SCRIPT"
headers['x-list-limit'] = '300'

thread_sleep = 1


def is_dic(url):
    """判断 key 是否是目录 根据是否有后缀名判断"""
    url = url.replace('http://v0.api.upyun.com/', '')
    # print(len(url.split('.')))
    if len(url.split('.')) == 1:
        return True
    else:
        return False


class Crawler:
    def __init__(self, init_key, hostname, max_tasks=10, pic_tsak=50):
        '''初始化爬虫'''
        self.loop = asyncio.get_event_loop()
        self.max_tries = 4 # 每个图片重试次数
        self.max_tasks = max_tasks # 接口请求进程数
        self.key_queue = asyncio.Queue(loop=self.loop) # 接口队列
        self.pic_queue = asyncio.Queue(loop=self.loop) # 图片队列
        self.session = aiohttp.ClientSession(loop=self.loop) #接口异步 http 请求
        self.pic_session = aiohttp.ClientSession(loop=self.loop) #图片异步 http 请求
        self.key_queue.put_nowait({'key': init_key, 'x-list-iter': None, 'hostname': hostname}) #初始化接口队列 push 需要下载的目录
        self.pic_tsak = pic_tsak #图片下载进程数(接口有调用频率限制,http 下载没有限制)

    def close(self):
        """回收 http session"""
        self.session.close()
        self.pic_session.close()

    async def work(self):
        """接口请求队列消费者"""
        try:
            while True:
                url = await self.key_queue.get()
                # print('key 队列数量:' + await self.key_queue.qsize())
                await self.handle(url)
                self.key_queue.task_done()
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            pass

    async def work_pic(self):
        """图片请求队列消费者"""
        try:
            while True:
                url = await self.pic_queue.get()
                await self.handle_pic(url)
                self.pic_queue.task_done()
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            pass

    async def handle_pic(self, key):
        """处理图片请求"""
        url = (lambda x: x[0] == '/' and x or '/' + x)(key['key'])
        url = url.encode('utf-8')
        url = urllib.parse.quote(url)

        pic_url = key['hostname'] + url + '!s400'

        tries = 0
        while tries < self.max_tries:
            try:
                response = await self.pic_session.get(pic_url)
                break
            except aiohttp.ClientError:
                pass
            tries += 1
        try:
            if is_dic(url):
                # print('图片线程-目录 :{}'.format(url))
                content = await response.text()
                try:
                    iter_header = response.headers.get('x-upyun-list-iter')
                except Exception as e:
                    iter_header = 'g2gCZAAEbmV4dGQAA2VvZg'

                list_json_param = content + "`" + str(response.status) + "`" + str(iter_header)
                self.do_file(self.get_list(list_json_param), key['key'], key['hostname'])
            else:
                # print('图片线程-文件:{}'.format(key['save_path']))
                with open(key['save_path'], 'wb') as f:
                    f.write(await response.read())
        finally:
            await response.release()

    async def handle(self, key):

        """处理接口请求"""
        url = '/' + bucket + (lambda x: x[0] == '/' and x or '/' + x)(key['key'])
        url = url.encode('utf-8')
        url = urllib.parse.quote(url)

        if key['x-list-iter'] is not None:
            if key['x-list-iter'] is not None or not 'g2gCZAAEbmV4dGQAA2VvZg':
                headers['X-List-Iter'] = key['x-list-iter']

        tries = 0
        while tries < self.max_tries:
            try:
                response = await self.session.get("http://v0.api.upyun.com" + url, headers=headers)
                break
            except aiohttp.ClientError:
                pass
            tries += 1
        try:
            if is_dic(url):
                # print('目录线程-目录 :{}'.format(url))
                content = await response.text()
                try:
                    iter_header = response.headers.get('x-upyun-list-iter')
                except Exception as e:
                    iter_header = 'g2gCZAAEbmV4dGQAA2VvZg'

                list_json_param = content + "`" + str(response.status) + "`" + str(iter_header)
                self.do_file(self.get_list(list_json_param), key['key'], key['hostname'])
            else:
                # print('目录线程-文件:{}'.format(key['save_path']))
                with open(key['save_path'], 'wb') as f:
                    f.write(await response.read())
        finally:
            await response.release()

    def get_list(self, content):
        # print(content)
        if content:
            content = content.split("`")
            items = content[0].split('\n')
            content = [dict(zip(['name', 'type', 'size', 'time'], x.split('\t'))) for x in items] + content[1].split() + \
                      content[2].split()
            return content
        else:
            return None

    def do_file(self, list_json, key, hostname):
        """处理接口数据"""
        for i in list_json[:-2]:
            if not i['name']:
                continue
            new_key = key + i['name'] if key == '/' else key + '/' + i['name']
            try:
                if i['type'] == 'F':
                    self.key_queue.put_nowait({'key': new_key, 'x-list-iter': None, 'hostname': hostname})
                else:
                    try:
                        if not os.path.exists(bucket + key):
                            os.makedirs(bucket + key)
                    except OSError as e:
                        print('新建文件夹错误:' + str(e))
                    save_path = base_save_path + '/' + bucket + new_key
                    if not os.path.isfile(save_path):
                        print(f'请求图片:', new_key)
                        self.pic_queue.put_nowait(
                            {'key': new_key, 'save_path': save_path, 'x-list-iter': None, 'hostname': hostname})
                    else:
                        print(f'文件已存在:{save_path}')
            except Exception as e:
                print('下载文件错误!:' + str(e))
                with open('download_err.txt', 'a') as f:
                    f.write(new_key + '\n')
        if list_json[-1] != 'g2gCZAAEbmV4dGQAA2VvZg':
            self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname})
            # self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname})

    async def run(self):
        """初始化任务进程"""
        workers = [asyncio.Task(self.work(), loop=self.loop)
                   for _ in range(self.max_tasks)]

        workers_pic = [asyncio.Task(self.work_pic(), loop=self.loop)
                       for _ in range(self.pic_tsak)]

        await self.key_queue.join()
        await self.pic_queue.join()

        workers.append(workers_pic)
        for w in workers:
            w.cancel()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    crawler = Crawler('/', hostname, max_tasks=5, pic_tsak=150)
    loop.run_until_complete(crawler.run())

    crawler.close()

    loop.close()

上面是代码

问题

  1. 以上代码执行后没什么问题。但是当长时间执行后会卡主。。百思不得其解(猜测可能是队列问题?但是无法验证)。为何到达一定时间(大约 5 小时以上)脚本会卡死?
  2. 此脚本目的为了下载又拍云所有图片保存到本地。图片量非常大(大约 10T) 3 亿张左右。目前机器的下载宽带大概在 300M/下载速度大约 30M/S ,多次联系又拍云,又拍云表示只能这样下载。无法通过邮寄硬盘直接拷贝。我们也在杭州。但是又拍云无法拷贝 还有什么特殊方法可以快速下载所有图片?
4507 次点击
所在节点    Python
33 条回复
asuraa
2017-09-03 21:12:28 +08:00
asuraa
2017-09-03 21:16:17 +08:00
昂 这么硬的问题木有人给解答下吗
asuraa
2017-09-03 21:19:26 +08:00
难道是保存文件的时候引起的?
asuraa
2017-09-03 21:19:39 +08:00
在线等。。。。急
asuraa
2017-09-03 21:20:02 +08:00
asuraa
2017-09-03 21:23:04 +08:00
更正:
```python
def is_dic(url):
"""判断 key 是否是目录 根据是否有后缀名判断"""
url = url.replace('http://v0.api.upyun.com/', '')
# print(len(url.split('.')))
if len(url.split('.')) > 1:
return True
else:
return False
```
asuraa
2017-09-03 21:29:02 +08:00
擦。。。看错了 帖子主题的代码是正确的。。
Lax
2017-09-03 21:35:24 +08:00
脚本运行时,用 strace 看一下卡在哪里了

strace -p <pid>
asuraa
2017-09-03 21:36:14 +08:00
@Lax windows 下有这个玩意么
asuraa
2017-09-03 21:36:54 +08:00
@Lax 脚本是跑在 windows 下的。。因为硬盘要用 ntfs 格式给某部门送过去
Lax
2017-09-03 21:40:34 +08:00
另外一个比较值得怀疑的一点是,你的所有文件操作没有关闭,有可能用尽 open files 限制。
可以对比一下这三个值:
用户限制:ulimit -n
进程限制:cat /proc/<pid>/limits
实际使用:ls /proc/<pid>/fd | wc -l
Lax
2017-09-03 21:41:11 +08:00
当我没说。。
Lax
2017-09-03 21:42:13 +08:00
cygwin / mingw 之类的可能有 strace
asuraa
2017-09-03 21:43:51 +08:00
@Lax 这里使用了 with 语句,应该能保证 with 语句执行完毕后已经关闭了打开的文件句柄。应该不是这个问题呀。
mengskysama
2017-09-03 21:53:26 +08:00
加个 timeout 试试,要等 tcp 的 timeout 机制触发要好久的
asuraa
2017-09-03 21:55:42 +08:00
@mengskysama 是在每次 http 请求的时候加的吗?
asuraa
2017-09-03 21:58:30 +08:00
@mengskysama 但是是异步的啊 应该会等待请求完成的啊
mengskysama
2017-09-03 21:58:52 +08:00
asuraa
2017-09-03 21:59:56 +08:00
@mengskysama 对于单个进程而言 会等待的啊
mengskysama
2017-09-03 22:01:56 +08:00
@luodaoyi 一个可能是池子里面 task 全塞死了,

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/387891

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX