async def run_task(self, tasks, task_handler):
consumer = asyncio.ensure_future(self.consume(self.task_queue))
await self.produce(tasks, task_handler, self.task_queue)
await self.task_queue.join()
consumer.cancel()
async def produce(self, tasks, task_handler, queue):
n = len(tasks)
for i, task in enumerate(tasks):
item = (i + 1, n, task, task_handler)
await queue.put(item)
async def consume(self, queue):
async with ClientSession() as session:
while 1:
item = await queue.get()
index = item[0]
count = item[1]
handler = item[3]
rs = await handler(item[2], i=index, n=count, session=session)
print('consuming {}/{} {}...'.format(index, count, rs))
queue.task_done()
# 只要加了这句程序就挂起了
await self.produce(rs, self.detail_handler, self.result_queue)
loop = asyncio.get_event_loop()
loop.run_until_complete(self.run_task(url_list, self.page_handler))
loop.close()
打印
consuming 1/5 ...
consuming 2/5 ...
await self.produce(rs, self.detail_handler, self.result_queue)