celery 框架消费者 从 rabbitmq 中获取消息,因为单条消息大小超过 40m 而产生的 Connection reset by peer 该怎么解决

2020-12-08 23:45:35 +08:00
 stoopuak197
环境
linux ( centos 7 )
python3.8.6
- celery == 5.0.1
- rabbitmq == 3.8.9


问题:
我有一个使用了 celery 框架的应用,work2(消费者) 需要从 rabbitmq 中获取由 work1 (生产者)生产的数据。
但是现在出现了一个错误。由于业务需求生产者生产的数据大小剧增,导致其单条消息大小超过 40m 或者更大 。
但是部署在外网的 rabbitmq 服务器出口流量只有大约 600~700kb/s,我通过排查怀疑产生错误的原因可能是 work2 从 rabbitmq 读取消息时由于 rabbitmq 服务器出口流量限制,使 work2 不能在短时间内获取到完整的消息从而导致 socket.timeout 从而引发的错误。

为了验证我的猜想我在 rabbitmq 服务器运行了以下测试脚本由于没有了网络限制它能够很好的获取到队列中的消息

====================================
import pika

auth = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=auth, heartbeat=0))
channel = connection.channel()

channel.queue_declare(queue='b', durable=True)

def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
exit()

channel.basic_consume(on_message_callback=callback,
queue='b',
auto_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

====================================


我尝试阅读了 transport.py 的源码 并在 connection.py 第 525 行中修改了 timeout 参数的数值,发现问题还是不能解决
基于我 socket 编程基础烂到极致,可能产生错误的原因并不是我上面描述的那么回事。
1512 次点击
所在节点    Python
5 条回复
stoopuak197
2020-12-08 23:48:21 +08:00
由于发文限制存在的一些报错如下
while not self.blocking_read(timeout):
File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 527, in blocking_read
frame = self.transport.read_frame()
File "/usr/local/lib/python3.8/dist-packages/amqp/transport.py", line 286, in read_frame
payload = read(size)
File "/usr/local/lib/python3.8/dist-packages/amqp/transport.py", line 457, in _read
s = recv(n - len(rbuf))
ConnectionResetError: [Errno 104] Connection reset by peer
err1y
2020-12-09 08:33:17 +08:00
如果你能控制生产者端的话,生产者在产生数据的时候大数据放到其他存储里面,队列里面保存个数据地址,消费者再去通过地址获取数据
stoopuak197
2020-12-09 09:10:49 +08:00
@err1y 非常感谢,这是个非常棒的解决方法! 但是我还是想知道碰上这种问题该怎么解决
knightdf
2020-12-09 10:08:52 +08:00
两边的 timeout 都得改吧,然后得改 read timeout
stoopuak197
2020-12-09 19:57:21 +08:00
@knightdf 两边的 timeout 指的是? read timeout 我改过但是依旧无效

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/733533

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX