这里,你也可以使用多个 CapSubscribe[""] 来同时订阅多个不同的消息 :
[CapSubscribe("xxx.services.bar")] [CapSubscribe("xxx.services.foo")] public void BarAndFooMessageProcessor() { }其中,xxx.services.bar 为订阅的消息名称,内部实现上,这个名称在不同的消息队列具有不同的代表。 在 Kafka 中,这个名称即为 Topic Name。 在RabbitMQ 中,为 RouteKey。
RabbitMQ 中的 RouteKey 支持绑定键表达式写法,有两种主要的绑定键:
*(星号)可以代替一个单词.
# (井号) 可以代替0个或多个单词.
比如在下面这个图中(P为发送者,X为RabbitMQ中的Exchange,C为消费者,Q为队列)
在这个示例中,我们将发送一条关于动物描述的消息,也就是说 Name(routeKey) 字段中的内容包含 3 个单词。第一个单词是描述速度的(celerity),第二个单词是描述颜色的(colour),第三个是描述哪种动物的(species),它们组合起来类似:“..”。
然后在使用 CapSubscribe 绑定的时候,Q1绑定为 CapSubscribe["*.orange.*"], Q2 绑定为 CapSubscribe["*.*.rabbit"] 和 [CapSubscribe["lazy.#]。
那么,当发送一个名为 "quick.orange.rabbit" 消息的时候,这两个队列将会同时收到该消息。同样名为 lazy.orange.elephant的消息也会被同时收到。另外,名为 "quick.orange.fox" 的消息将仅会被发送到Q1队列,名为 "lazy.brown.fox" 的消息仅会被发送到Q2。"lazy.pink.rabbit" 仅会被发送到Q2一次,即使它被绑定了2次。"quick.brown.fox" 没有匹配到任何绑定的队列,所以它将会被丢弃。
另外一种情况,如果你违反约定,比如使用 4个单词进行组合,例如 "quick.orange.male.rabbit",那么它将匹配不到任何的队列,消息将会被丢弃。
但是,假如你的消息名为 "lazy.orange.male.rabbit",那么他们将会被发送到Q2,因为 #(井号)可以匹配 0 或者多个单词。
在 CAP 中,我们把每一个拥有 CapSubscribe[]标记的方法叫做订阅者,你可以把订阅者进行分组。
组(Group),是订阅者的一个集合,每一组可以有一个或者多个消费者,但是一个订阅者只能属于某一个组。同一个组内的订阅者订阅的消息只能被消费一次。
如果你在订阅的时候没有指定组,CAP会将订阅者设置到一个默认的组 cap.default.group。
以下是使用组进行订阅的示例:
[CapSubscribe("xxx.services.foo", Group = "moduleA")] () { }
2.2.1 例外情况这里有几种情况可能需要知道:
① 消息发布的时候订阅方还未启动
Kafka:
当 Kafka 中,发布的消息存储于持久化的日志文件中,所以消息不会丢失,当订阅者所在的程序启动的时候会消费掉这些消息。
RabbitMQ:
在 RabbitMQ 中,应用程序首次启动会创建具有持久化的 Exchange 和 Queue,CAP 会针对每一个订阅者Group会新建一个消费者队列,由于首次启动时候订阅者未启动的所以是没有队列的,消息无法进行持久化,这个时候生产者发的消息会丢失。
针对RabbitMQ的消息丢失的问题,有两种解决方式:
i. 部署应用程序之前,在RabbitMQ中手动创建具有durable特性的Exchange和Queue,默认情况他们的名字分别是(cap.default.topic, cap.default.group)。
ii. 提前运行一遍所有实例,让Exchange和Queue初始化。
我们建议采用第 ii 种方案,因为很容易做到。
② 消息没有任何订阅者
如果你发送了一条个没有被任何订阅者订阅的消息,那么此消息将会被丢弃。
3、配置Cap 使用 Microsoft.Extensions.DependencyInjection 进行配置的注入,你也可以依赖于 DI 从json文件中读取配置。
3.1 Cap Options你可以使用如下方式来配置 CAP 中的一些配置项,例如
services.AddCap(capOptions => { capOptions.FailedCallback = //... });
CapOptions 提供了一下配置项:
NAME DESCRIPTION TYPE DEFAULT
PollingDelay 处理消息的线程默认轮询等待时间(秒) int 15 秒
QueueProcessorCount 启动队列中消息的处理器个数 int 2
FailedMessageWaitingInterval 轮询失败消息的间隔(秒) int 180 秒
FailedCallback 执行失败消息时的回调函数,详情见下文 Action NULL
CapOptions 提供了 FailedCallback 为处理失败的消息时的回调函数。当消息多次发送失败后,CAP会将消息状态标记为Failed,CAP有一个专门的处理者用来处理这种失败的消息,针对失败的消息会重新放入到队列中发送到MQ,在这之前如果FailedCallback具有值,那么将首先调用此回调函数来告诉客户端。
FailedCallback 的类型为 Action<MessageType,string,string>,第一个参数为消息类型(发送的还是接收到),第二个参数为消息的名称(name),第三个参数为消息的内容(content)。
3.2 RabbitMQ OptionsCAP 采用的是针对 CapOptions 进行扩展来实现RabbitMQ的配置功能,所以针对 RabbitMQ 的配置用法如下:
services.AddCap(capOptions => { capOptions.UseRabbitMQ(rabbitMQOption=>{ // rabbitmq options. }); });
RabbitMQOptions 提供了有关RabbitMQ相关的配置:
NAME DESCRIPTION TYPE DEFAULT
HostName 宿主地址 string localhost
UserName 用户名 string guest
Password 密码 string guest
VirtualHost 虚拟主机 string /
Port 端口号 int -1
TopicExchangeName CAP默认Exchange名称 string cap.default.topic
RequestedConnectionTimeout RabbitMQ连接超时时间 int 30,000 毫秒
SocketReadTimeout RabbitMQ消息读取超时时间 int 30,000 毫秒
SocketWriteTimeout RabbitMQ消息写入超时时间 int 30,000 毫秒
QueueMessageExpires 队列中消息自动删除时间 int (10天) 毫秒
3.3 Kafka OptionsCAP 采用的是针对 CapOptions 进行扩展来实现 Kafka 的配置功能,所以针对 Kafka 的配置用法如下: