环境 pika 0.10 RabbitMQ 3.5.4, Erlang 18.0
生产者
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
queue_name="route_test"
channel.exchange_declare(exchange='logs')
channel.basic_qos(prefetch_count=1)
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
i = 0
while i < 10:
channel.basic_publish(exchange='logs',
routing_key=queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=2
))
i += 1
print(" [x] Sent %r" % message)
connection.close()
消费者
def on_start():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
queue_name="route_test"
channel.exchange_declare(exchange='logs')
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='logs',
queue=queue_name)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
return channel
def callback(ch, method, properties, body):
print(" [x] %r" % body)
time.sleep(3)
while True:
try:
channel = on_start()
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
except:
print "connect error"
我已经在 channel 中设置了channel.basic_qos(prefetch_count=1)
,但是我如果先执行消费者,然后再执行生产者,就会一次性的把所有的消息都扔给消费者,这样,我如果手动再启动一个消费者,就无法获得还没有被执行的消息了。
请问,这是为什么,或者怎么样能够达到我想要的效果,即每次消费者都只获得一个消息,剩下的都保存在消息队列里面。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.