环境
linux ( centos 7 )
python3.8.6
- celery == 5.0.1
- rabbitmq == 3.8.9
问题:
我有一个使用了 celery 框架的应用,work2(消费者) 需要从 rabbitmq 中获取由 work1 (生产者)生产的数据。
但是现在出现了一个错误。由于业务需求生产者生产的数据大小剧增,导致其单条消息大小超过 40m 或者更大 。
但是部署在外网的 rabbitmq 服务器出口流量只有大约 600~700kb/s,我通过排查怀疑产生错误的原因可能是 work2 从 rabbitmq 读取消息时由于 rabbitmq 服务器出口流量限制,使 work2 不能在短时间内获取到完整的消息从而导致 socket.timeout 从而引发的错误。
为了验证我的猜想我在 rabbitmq 服务器运行了以下测试脚本由于没有了网络限制它能够很好的获取到队列中的消息
====================================
import pika
auth = pika.PlainCredentials('test', 'test')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=auth, heartbeat=0))
channel = connection.channel()
channel.queue_declare(queue='b', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
exit()
channel.basic_consume(on_message_callback=callback,
queue='b',
auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
====================================
我尝试阅读了
transport.py 的源码 并在
connection.py 第 525 行中修改了 timeout 参数的数值,发现问题还是不能解决
基于我 socket 编程基础烂到极致,可能产生错误的原因并不是我上面描述的那么回事。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
https://www.v2ex.com/t/733533
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.