pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=)) channel = connection.channel() channel.exchange_declare(exchange=, type=) message = channel.basic_publish(exchange=, routing_key='', body=message) % message) connection.close()
发布者
pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=)) channel = connection.channel() channel.exchange_declare(exchange=, type=) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=, queue=queue_name) ) def callback(ch, method, properties, body): % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
订阅者 5、关键字发送第4步实例中,发送消息必须明确指定某个队列并向其中发送消息,当然,RabbitMQ 还支持根据关键字发送(队列绑定关键字),发送者将消息发送到 exchange,exchange 根据关键字 判定应该将数据发送至指定队列。
exchange type = direct
pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=)) channel = connection.channel() channel.exchange_declare(exchange=, type=) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write(% sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange=, queue=queue_name, routing_key=severity) ) def callback(ch, method, properties, body): % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
消费者
pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=)) channel = connection.channel() channel.exchange_declare(exchange=, type=) severity = sys.argv[1] message = channel.basic_publish(exchange=, routing_key=severity, body=message) % (severity, message)) connection.close()
生产者 6、模糊匹配
exchange type = topic
在 topic 类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到 exchange,exchange 将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
匹配基本规则及示例:
发送者路由值 队列中 * -- 不匹配 suoning
pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=)) channel = connection.channel() channel.exchange_declare(exchange=, type=) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write(% sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange=, queue=queue_name, routing_key=binding_key) ) def callback(ch, method, properties, body): % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
消费者