Python 异步队列问题

2020-05-11 15:48:48 +08:00
 fangwenxue
    async def run_task(self, tasks, task_handler):
        consumer = asyncio.ensure_future(self.consume(self.task_queue))

        await self.produce(tasks, task_handler, self.task_queue)
        await self.task_queue.join()

        consumer.cancel()

    async def produce(self, tasks, task_handler, queue):
        n = len(tasks)
        for i, task in enumerate(tasks):
            item = (i + 1, n, task, task_handler)
            await queue.put(item)

    async def consume(self, queue):
        async with ClientSession() as session:
            while 1:
                item = await queue.get()
                index = item[0]
                count = item[1]
                handler = item[3]
                rs = await handler(item[2], i=index, n=count, session=session)
                print('consuming {}/{} {}...'.format(index, count, rs))
                queue.task_done()
                # 只要加了这句程序就挂起了
                await self.produce(rs, self.detail_handler, self.result_queue)
                
loop = asyncio.get_event_loop()
loop.run_until_complete(self.run_task(url_list, self.page_handler))
loop.close()

打印

consuming 1/5 ...
consuming 2/5 ...
await self.produce(rs, self.detail_handler, self.result_queue)
641 次点击
所在节点    问与答
0 条回复

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/670583

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX