前言
很多同学想对CAP的机制以及用法等想有一个详细的了解,所以花了将近两周时间写了这份中文的CAP文档,对 CAP 还不知道的同学可以先看一下这篇文章。
本文档为 CAP 文献(Wiki),本文献同时提供中文和英文版本,英文版本目前还在翻译中,会放到Github Wiki 中。
目录 1、Getting Started 1.1 介绍CAP 是一个遵循 .NET Standard 标准库的C#库,用来处理分布式事务以及提供EventBus的功能,她具有轻量级,高性能,易使用等特点。
目前 CAP 使用的是 .NET Standard 1.6 的标准进行开发,目前最新预览版本已经支持 .NET Standard 2.0.
1.2 应用场景CAP 的应用场景主要有以下两个:
分布式事务是在分布式系统中不可避免的一个硬性需求,而目前的分布式事务的解决方案也无外乎就那么几种,在了解 CAP 的分布式事务方案前,可以阅读以下 这篇文章。
CAP 没有采用两阶段提交(2PC)这种事务机制,而是采用的 本地消息表+MQ 这种经典的实现方式,这种方式又叫做 异步确保。
CAP 实现了 EventBus 中的发布/订阅,它具有 EventBus 的所有功能。也就是说你可以像使用 EventBus 一样来使用 CAP,另外 CAP 的 EventBus 是具有高可用性的,这是什么意思呢?
CAP 借助于本地消息表来对 EventBus 中的消息进行了持久化,这样可以保证 EventBus 发出的消息是可靠的,当消息队列出现宕机或者连接失败的情况时,消息也不会丢失。
1.3 Quick Start使用一下命令来引用CAP的NuGet包:
PM> Install-Package DotNetCore.CAP根据使用的不同类型的消息队列,来引入不同的扩展包:
PM> Install-Package DotNetCore.CAP.RabbitMQ PM> Install-Package DotNetCore.CAP.Kafka根据使用的不同类型的数据库,来引入不同的扩展包:
PM> Install-Package DotNetCore.CAP.SqlServer PM> Install-Package DotNetCore.CAP.MySql在 ASP.NET Core 程序中,你可以在 Startup.cs 文件 ConfigureServices() 中配置 CAP 使用到的服务:
(IServiceCollection services) { services.AddDbContext<AppDbContext>(); services.AddCap(x => { // If your SqlServer is using EF for data operations, you need to add the following configuration: // Notice: You don't need to config x.UseSqlServer(""") again! x.UseEntityFramework<AppDbContext>(); // If you are using Dapper,you need to add the config: x.UseSqlServer("Your ConnectionStrings"); // If your Message Queue is using RabbitMQ you need to add the config: x.UseRabbitMQ("localhost"); // If your Message Queue is using Kafka you need to add the config: x.UseKafka("localhost"); }); }
在 Configure() 中配置启动 CAP :
public void Configure(IApplicationBuilder app) { app.UseCap(); } 2、API接口CAP 的 API 接口只有一个,就是 ICapPublisher 接口,你可以从 DI 容器中获取到该接口的实例进行调用。
2.1 发布/发送你可以使用 ICapPublisher 接口中的 Publish<T> 或者 PublishAsync<T> 方法来发送消息:
public class PublishController : Controller { private readonly ICapPublisher _publisher; (ICapPublisher publisher) { _publisher = publisher; } [Route("~/checkAccount")] public async Task<IActionResult> PublishMessage() { await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); return Ok(); } }
下面是PublishAsync这个接口的签名:
PublishAsync<T>(string name,T object)
默认情况下,在调用此方法的时候 CAP 将在内部创建事务,然后将消息写入到 Cap.Published 这个消息表。
2.1.1 事务事务在 CAP 具有重要作用,它是保证消息可靠性的一个基石。 在发送一条消息到消息队列的过程中,如果不使用事务,我们是没有办法保证我们的业务代码在执行成功后消息已经成功的发送到了消息队列,或者是消息成功的发送到了消息队列,但是业务代码确执行失败。
这里的失败原因可能是多种多样的,比如连接异常,网络故障等等。
只有业务代码和CAP的Publish代码必须在同一个事务中,才能够保证业务代码和消息代码同时成功或者失败。
以下是两种使用事务进行Publish的代码:
using (var transaction = dbContext.Database.BeginTransaction()) { await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); // 你的业务代码。 transaction.Commit(); }
你的业务代码可以位于 Publish 之前或者之后,只需要保证在同一个事务。
当CAP检测到 Publish 是在EF事务区域内的时候,将使用当前的事务上下文进行消息的存储。
其中,发送的内容会序列化为Json存储到消息表中。
var connString = "数据库连接字符串"; using (var connection = new MySqlConnection(connString)) { connection.Open(); using (var transaction = connection.BeginTransaction()) { await _publisher.PublishAsync("xxx.services.bar", new Person { Name = "Foo", Age = 11 }, connection, transaction); // 你的业务代码。 transaction.Commit(); } }
在 Dapper 中,由于不能获取到事务上下文,所以需要用户手动的传递事务上下文到CAP中。
2.2 订阅/消费注意:消息端在方法实现的过程中需要实现幂等性。
使用 CapSubscribeAttribute 来订阅 CAP 发布出去的消息。
[CapSubscribe("xxx.services.bar")] public void BarMessageProcessor() { }