序
好像,今天已经是 2 月 28 号了。
听说,29、30、31 号放假。
据说,有图,有真相。
目录
简介
环境搭建
示例一:简单的 Hello World
示例二:发布/订阅模式
尝试发现 - 新物种 EasyNetQ
简介
RabbitMQ:一个消息系统,基于 AMQP 系统协议,由 Erlang 语言开发。
优点:健壮、使用简单、开源和支持各种流行的语言(如 Python、java、.NET)等。
MQ(Message Queue):消息队列的简称,是一种应用程序之间的通信机制。
作用:将部分无需立即回调获取结果,并且耗时的操作,使用异步处理的方式提高服务器的吞吐量及性能。如:日志记录。
图:简单的通信方式,及加入 MQ 后的变化
A 端:生产者将消息写(插)入队列;
MQ(队列) :中间件,消息的载体;
B 端:消费者从队列读(取)出消息。
MQ 特点:消费者 - 生产者模型的一种表现形式。
环境搭建
1.官网下载安装包: ;
2.安装时会提示你下载 Erlang 语言环境;
3.启动安装完的服务:RabbitMQ;
4.在 cmd 中指向 sbin 目录,并输入以下命令,才能打开 WEB 管理界面:
rabbitmq-plugins enable rabbitmq_management
5.默认 url::15672/#/
示例一:简单的 Hello World
P(Producer):生产者,意味着发送;
Queue:队列,本质上是一个无限的缓冲区,可以储存尽可能多的信息;
C(Consumer):消费者,等待并接收消息。
【备注】生产者和消费者不需要驻留在同一台服务器上。
Producer.cs
Producer 2 { Send() 4 { }; (var connection = factory.CreateConnection()) 9 { (var channel = connection.CreateModel()) 12 { channel.QueueDeclare(queue: , durable: exclusive: autoDelete: arguments: null); message = ; 21 var body = Encoding.UTF8.GetBytes(message); routingKey: , basicProperties: null, 26 body: body); 27 } 28 } 29 } 30 }
【备注】队列名如果已存在,将不会重复创建。假设队列已存在,修改 channel.QueueDeclare() 方法内的参数后启动会出现异常。
【备注】消息内容是一个字节数组。
Consumer.cs
1 class Consumer 2 { Receive() 4 { }; (var connection = factory.CreateConnection()) 8 { 9 using (var channel = connection.CreateModel()) 10 { , 12 durable: false, 13 exclusive: false, 14 autoDelete: false, 15 arguments: null); consumer = new EventingBasicConsumer(channel); 19 consumer.Received += (model, ea) => 20 { message = Encoding.UTF8.GetString(body); , message); 24 }; channel.BasicConsume(queue: , noAck: consumer: consumer); 30 31 Console.Read(); 32 } 33 } 34 } 35 }
【疑问】在消费者的类里面为什么会再次声明队列(channel.QueueDeclare())呢?-- 因为接收方可能会在发送方启动前启动,这是出于保险起见。
示例二:发布/订阅模式
1.Exchange 交换机和 Exchange Type 交换类型
RabbitMQ 消息传递模型的核心思想是,生产者不会直接将消息发给队列。
这里我们将引入新的名词 Exchange(交换机)。交换机传递消息的类型也有很多种:direct, topic, headers(不常用) 和 fanout,我们称之为交换类型。
图:Direct
图:Fanout
图:Topic
--上述 3 张图来源:?id=52262850
这里,创建一个名为 “logs” 的交换机,它的类型为广播类型(fanout:可以将收到的所有消息,广播给所有已知的队列)。
channel.ExchangeDeclare(exchange: , //交换机名 type: ); //交换类型
2.临时队列
作为消费者,我们有时候只需要一些新的(或者空的)队列,此时,更好的方式就是让它自动生成一个随机名字的队列;其次,当队列连接中断时会选择自动删除对应的消费者。
创建一个非持久,有排他性和自动删除特性的队列(无参时)。
var queueName = channel.QueueDeclare().QueueName;
3.Binding 绑定
【疑问】有了 Exchange 和 channel,这时,还需要什么东西呢?-- 我们要创建 Exchange 和 channel 关系的桥梁,这个桥梁称之为 Binding(绑定)。
channel.QueueBind(queue: queueName, exchange: , routingKey: "");