Scrapy 异步问题求助

340 天前
 kekeones

使用 Scrapy 爬取内容,使用 Pipeline 将处理后内容 POST 到远程。

使用request是同步的会有阻塞。

使用scrapy.FormRequest受限于全局DOWNLOAD_DELAY限制,每次 POST 都会有延迟。

使用 treq ,不确定如何获取内容

async def process_post(self, url, data):
    req = treq.post(url, data=data)
    res = await deferred_to_future(req)
    return res

如何获取 res 的内容,打印是<treq.response._Response 200 'text/html; charset=UTF-8' unknown size>

1316 次点击
所在节点    Python
7 条回复
encro
340 天前
不看源码?
看 treq.response._Response 的源码啊!!!
kekeones
340 天前
@encro 谢谢,搞定了,原来我缺少了编码参数
kekeones
338 天前
@encro 刚试了下,不对,res.content(),
打印出来的是:
<Deferred at 0x7fb3587834d0 current result: b'{"state":true}'>
ayugesheng
329 天前
@kekeones 既然都 async 了,推荐直接 aiomysql + aiohttp ,给出一个 aiohttp 的 pipeline 示例:

import asyncio

import aiohttp
from scrapy.utils.defer import deferred_from_coro


class DemoPipeline:
def __init__(self) -> None:
# 一些参数初始化
pass

def open_spider(self, spider):
# 这里可以写入一些非 async 的预备操作,把比如默认参数设置和日志配置等
return deferred_from_coro(self._open_spider(spider))

async def _open_spider(self, spider):
# 这里一般是连接池,async 连接等预备操作
await asyncio.sleep(0.1)

async def process_item(self, item, spider):
# 这里可以使用一些 async 存储库来实现存储逻辑
...
# 看你想 post 到 data 还是 form
# post_data = json.dumps('{"content": "test"}')
post_data = {"content": "test"}
async with aiohttp.ClientSession() as session:
async with session.post(
"http://httpbin.org/post", data=post_data
) as additional_response:
# 获取响应内容
additional_data = await additional_response.text()
print("additional_data:", additional_data)
return item

async def _close_spider(self):
# 这里一般是 async 连接或连接池关闭逻辑
await asyncio.sleep(0.1)

def close_spider(self, spider):
return deferred_from_coro(self._close_spider())

注意:
使用以上代码时,需要在 settings.py 中或者 custom_settings 中配置 "TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
ayugesheng
329 天前
@kekeones 推荐直接 aiomysql + aiohttp ,给出一个 aiohttp 的 pipeline 示例:

```
import asyncio

import aiohttp
from scrapy.utils.defer import deferred_from_coro


class DemoPipeline:
def __init__(self) -> None:
# 一些参数初始化
pass

def open_spider(self, spider):
# 这里可以写入一些非 async 的预备操作,把比如默认参数设置和日志配置等
return deferred_from_coro(self._open_spider(spider))

async def _open_spider(self, spider):
# 这里一般是连接池,async 连接等预备操作
await asyncio.sleep(0.1)

async def process_item(self, item, spider):
# 这里可以使用一些 async 存储库来实现存储逻辑
...
# 看你想 post 到 data 还是 form
# post_data = json.dumps('{"content": "test"}')
post_data = {"content": "test"}
async with aiohttp.ClientSession() as session:
async with session.post(
"http://httpbin.org/post", data=post_data
) as additional_response:
# 获取响应内容
additional_data = await additional_response.text()
print("additional_data:", additional_data)
return item

async def _close_spider(self):
# 这里一般是 async 连接或连接池关闭逻辑
await asyncio.sleep(0.1)

def close_spider(self, spider):
return deferred_from_coro(self._close_spider())
```

注意:
使用以上代码时,需要在 settings.py 中或者 custom_settings 中配置 "TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor"

以上代码乱了,无语,重发一次。
ayugesheng
329 天前
不好意思,v2ex 是不支持 markdown 吗,不怎么在论坛发东西。
kekeones
327 天前
好的,谢谢了哈,后面用了 treq 来处理了。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1000901

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX