尝试用 aiohttp 写爬虫,但这么写不知道该怎么停止循环?

2018-10-25 20:31:07 +08:00
 zhijiansha

使用 aiohttp 试着写了一个爬虫,但是发现可能会出现 一级页面还在抓取的时候,由于队列为空,直接退出的情况。不知该如何去做这个判断?另外不知以下代码这么写是否有其他的问题??

# coding:utf-8

import asyncio

import aiohttp


class Task(object):
    def __init__(self, info, priority):
        self.priority = priority
        self.info = info

    def __lt__(self, other):
        return self.priority < other.priority


class Spider(object):

    def __init__(self, loop=None):
        self.loop = loop
        conn = aiohttp.TCPConnector(limit=3)
        self.session = aiohttp.ClientSession(loop=loop, connector=conn)
        self.queue = asyncio.PriorityQueue()

    def start(self, page):
        task_info = {'callback': self.parse_1, 'page': page}
        return task_info

    async def set_item(self, task):
        pass

    async def fetch(self, task):
        await asyncio.sleep(2)
        task['callback'](task['page'])

    async def worker(self):
        while True:
            next_task = await self.queue.get()
            if next_task.info.get('type') == 'item':
                asyncio.ensure_future(self.set_item(next_task.info))
            else:
                asyncio.ensure_future(self.fetch(next_task.info))
            self.queue.task_done()

            # if self.queue.empty():
            #     await asyncio.sleep(1)
            #     if self.queue.empty():
            #         break

    def run(self):
        for page in range(1, 10):
            self.queue.put_nowait(Task(self.start(page), 0))

        self.loop.run_until_complete(self.worker())

    def close(self):
        if not self.session.closed:
            if self.session._connector_owner:
                self.session._connector.close()
            self.session._connector = None

    def parse_1(self, meta):
        print('parse_1-----', meta)
        for page in range(20, 30):
            task = {'callback': self.parse_2, 'page': page}
            self.queue.put_nowait(Task(task, 1))

    def parse_2(self, meta):
        print('parse2----', meta)
        for page in range(30, 40):
            task = {'callback': self.parse_3, 'page': page}
            self.queue.put_nowait(Task(task, 0))

    def parse_3(self, meta):
        print('parse3----', meta)


loop = asyncio.get_event_loop()
sp = Spider(loop=loop)
sp.run()
sp.close()

2759 次点击
所在节点    Python
16 条回复
jimmyczm
2018-10-25 20:37:22 +08:00
我看别人写的是先把队列放到 redis 里,对 redis 进行判断从而进行开始或终止
zhijiansha
2018-10-25 21:07:43 +08:00
@jimmyczm 感觉用 redis 应该也会有这个问题。。。除非是标记一下任务状态
AlisaDestiny
2018-10-25 23:45:37 +08:00
woker 函数里别 while True;弄一个标记,如果为 True 继续,为 False 就停止,
zhijiansha
2018-10-26 00:03:33 +08:00
@AlisaDestiny 但爬取何时完成,无法预料,也就无法给设置为 False 啊
binux
2018-10-26 00:51:01 +08:00
加个 work in progress 标记
so1n
2018-10-26 01:03:02 +08:00
手机看的好难受,没看完,你试试 work 那里使用 wait_for 替换 ensure_future
Yourshell
2018-10-26 09:02:40 +08:00
判断任务队列用 join 而不是 empty
zhijiansha
2018-10-26 10:22:17 +08:00
@binux 请教一下这个标记的动作应该放在哪里执行??
@so1n 替换后无法执行。。。

@Yourshell 呃,判断队列为空不是 empty 么?
Yourshell
2018-10-26 10:35:53 +08:00
@zhijiansha empty 是判断队列是否为空,join 是阻塞至所有任务完成,也就是调用 task_done。你用 empty 判断队列为空,只是所有的任务都被 get 了,不代表已经完成了。你可以看看官方的例子 https://docs.python.org/3/library/asyncio-queue.html
binux
2018-10-26 11:08:09 +08:00
@zhijiansha #8 任何 worker 没处理完之前都是 WIP 啊,除了要判断队列是否为空,还要判断是否有任务是 WIP
zhijiansha
2018-10-26 23:05:16 +08:00
@Yourshell #9 谢回复!代码中是在 while 中从队列获取任务直接注册到事件循环,然后就执行了 task_done。这么如果用 join 去判断的话应该也是一样的,我尝试用链接中的例子去改写上面的爬虫代码,但是好像也是行不通,会一直阻塞。不知何故。

@binux #10 谢回复!判断任务状态,在官方文档中找到了 all_tasks 和 current_task 这两个方法,但是好像不好使,即使任务全部完成也不为 None,导致判断失败。。
zhijiansha
2018-10-29 11:37:33 +08:00
```
async def worker(self):
"""
任务调度
:return:
"""
while True:
if not self.queue.empty():
next_data = await self.queue.get()
task_id = uuid.uuid1()
self.task_running_list.append(task_id)
if isinstance(next_data, Item):
asyncio.create_task(self.set_item(task_id, next_data.info))
else:
asyncio.create_task(self.fetch(task_id, next_data.info))
self.queue.task_done()
else:
await asyncio.sleep(0)
if self.queue.empty() and not self.task_running_list:
break
```
根据 @binux 的提示,这么处理可解决该问题。
binux
2018-10-29 11:41:02 +08:00
@zhijiansha #12 处理完要把 task_id 删掉啊
zhijiansha
2018-10-29 12:32:55 +08:00
@binux 嗯嗯,删除操作是在 fetch 方法里面执行的
a65420321a
2018-10-30 16:11:57 +08:00
怎么贴的代码?
Harlaus
2018-11-14 16:06:23 +08:00
建议 aio+mq

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/501174

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX