Python 协程任务卡住不动

2020-05-27 09:25:23 +08:00
 dawnzhu

在使用 python 协程下载图片中,最终协程的任务数卡在 97 一直循环,不知道哪里出了问题,有大佬知道什么情况吗,困扰我好久

这个是运行的结果,在任务数为 80 一直卡着

队列是否为空.... 80
队列是否为空.... 80
.
.
.

队列是否为空.... 80
队列是否为空.... 80

下面贴上代码

from lxml import etree
import os
import pandas as pd
import asyncio
import aiohttp
from random import randint
import cchardet
import aiofiles
import logging


class sikupicture_Spider(object):
    def __init__(self):
        # self.seens_url = []
        self.loop = asyncio.get_event_loop()
        self.queue = asyncio.PriorityQueue()
        self._workers = 0  # 当前工作数
        self._max_workers = 150  # 最大工作数
        self.overtime = {}  # {url: times,} 记录失败的 URL 的次数
        self.overtime_threshold = 4
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36",
        }
        self.list_content = []

    async def init_url(self):
        info = pd.read_excel(r"{}".format(os.path.abspath('moban.xlsx'))).fillna('')
        for ite in info.itertuples():
            await self.queue.put((randint(1, 5), getattr(ite, 'url')))

    async def fetch(self, session, url, timeout, headers=None, binary=False, proxy=None):
        _headers = self.headers
        if headers:
            _headers = headers
        try:
            async with session.get(url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
                status_code = resp.status
                if status_code == 403:
                    print("url-403", url)
                    if url in self.overtime:
                        self.overtime[url] += 1
                        if self.overtime[url] > self.overtime_threshold:
                            pass
                        await self.queue.put((randint(1, 5), url))
                    else:
                        self.overtime[url] = 1
                        await self.queue.put((randint(1, 5), url))
                    status_code = 0
                    html = None
                if binary:
                    text = await resp.read()
                    encoding = cchardet.detect(text)
                    html = text.encode(encoding, errors='ignore')
                else:
                    html = await resp.text()

        except TimeoutError:
            print("url-overtime", url)
            if url in self.overtime:
                self.overtime[url] += 1
                if self.overtime[url] > self.overtime_threshold:
                    pass
                await self.queue.put((randint(1, 5), url))
            else:
                self.overtime[url] = 1
                await self.queue.put((randint(1, 5), url))
            status_code = 0
            html = None
        return status_code, html

    async def download_img(self, session, img_url, timeout, url, headers=None, binary=True, proxy=None):
        _headers = self.headers
        if headers:
            _headers = headers
        try:
            async with session.get(img_url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
                status_code = resp.status
                if binary:
                    html = await resp.read()
                else:
                    html = await resp.text()
        except TimeoutError:
            print("url-overtime", img_url)
            if url in self.overtime:
                self.overtime[url] += 1
                if self.overtime[url] > self.overtime_threshold:
                    pass
                else:
                    await self.queue.put((randint(1, 5), url))
            else:
                self.overtime[url] = 1
                await self.queue.put((randint(1, 5), url))
            status_code = 0
            html = None
        return status_code, html

    def parse_source(self, source):
        try:
            response_1 = etree.HTML(source)
        except Exception as err:
            logging.error(f'parse error:{err}')
            url = ""
        else:
            img_url = response_1.xpath("//a[@href='javascript:;']/@supsrc")[0] if len(
                response_1.xpath("//a[@href='javascript:;']/@supsrc")[0]) else ""
        return img_url

    async def process(self, session, url, timeout):
        status, source = await self.fetch(session, url, timeout)
        file_name = url.replace("http://item.secoo.com/", "").replace(".shtml", "")
        if status == 200:
            img_url = self.parse_source(source)
            img_status, img_source = await self.download_img(session, img_url, timeout, url)
            if img_status == 200:
                async with aiofiles.open("F:\\dawnzhu\\picture\\"+file_name+".jpg", "wb") as f:
                    await f.write(img_source)
            self._workers -= 1
            print("任务完成", self._workers, "url_status", status, "img_status", img_status)
        else:
            self._workers -= 1
            print("任务完成", self._workers, "url_status", status,)

    async def loop_crawl(self):
        await self.init_url()
        timeout = aiohttp.ClientTimeout(total=20)
        conn = aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True)
        session = aiohttp.ClientSession(connector=conn, timeout=timeout)
        while True:
            if self._workers >= self._max_workers:
                print("work 的判断")
                await asyncio.sleep(5)
                continue
            if self.queue.empty():
                print("队列是否为空....", self._workers)
                await asyncio.sleep(5)
                if self._workers == 0:
                    break
                continue
            _, url = await self.queue.get()
            asyncio.ensure_future(self.process(session, url, timeout))
            self._workers += 1
            print("队列剩余数量", self.queue.qsize(), self._workers)
        await session.close()

    def run(self):
        try:
            self.loop.run_until_complete(self.loop_crawl())
        except KeyboardInterrupt:
            self.loop.close()

if __name__ == '__main__':
    sp = sikupicture_Spider()
    sp.run()
2087 次点击
所在节点    问与答
6 条回复
itskingname
2020-05-27 09:37:02 +08:00
试一试把 conn = aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True) 里面的 limit 参数调整到 500
dawnzhu
2020-05-27 09:41:11 +08:00
@itskingname 谢谢大佬。我试下,这个是什么原因呢,并发数量少?
dawnzhu
2020-05-27 09:48:23 +08:00
@itskingname 不行的
Vegetable
2020-05-27 09:53:42 +08:00
fetch 中捕获了超时,其他异常还是有可能向上抛出的,而协程中的异常未处理异常是不会终止程序,只是会输出一段
Task exception was never retrieved
这样的信息。
process 里并没有捕获异常,一旦出现异常会出现_worker 不能正确扣减,while 循环就无法跳出了
目前只看到这个可能。你这个代码写的很有意思,工工整整的,但是很多地方都挺底层的,比如手动管理 worker 数量而不是 Semaphore,用 aiohttp 而不是 httpx
dawnzhu
2020-05-27 09:57:38 +08:00
@Vegetable 明白了,应该是这个问题,谢谢了
ruanimal
2020-05-27 14:15:50 +08:00
@Vegetable 估计是 c 程序员出身

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/675837

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX