import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=)) channel = connection.channel() channel.queue_declare(queue=) def callback(ch, method, properties, body): % body) import time time.sleep(10) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue=, no_ack=False) ) channel.start_consuming()
消费者 2、durable 消息不丢失需要改两处地方
pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=)) channel = connection.channel() # make message persistent channel.queue_declare(queue=, durable=True) channel.basic_publish(exchange='', routing_key=, body=, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) ) connection.close()
生产者
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=)) channel = connection.channel() # make message persistent channel.queue_declare(queue=, durable=True) def callback(ch, method, properties, body): % body) import time time.sleep(10) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue=, no_ack=False) ) channel.start_consuming()
消费者 3、消息获取顺序默认情况下,消费者拿消息队列里的数据是按平均分配,例如:消费者1 拿队列中 奇数 序列的任务,消费者2 拿队列中 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列,这个性能较高的机器拿的任务就多
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host=)) channel = connection.channel() # make message persistent channel.queue_declare(queue=) def callback(ch, method, properties, body): % body) import time time.sleep(10) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=, no_ack=False) ) channel.start_consuming()
消费者 4、发布订阅发布订阅和简单的消息队列区别在于,发布订阅者会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ 实现发布订阅时,会为每一个订阅者创建一个队列,而发布者发布消息的时候,会将消息放置在所有相关的队列中。
exchange type = fanout