最近花了我几个月的业余时间,对EQueue做了一个重大的改造,消息持久化采用本地写文件的方式。到现在为止,总算完成了,所以第一时间写文章分享给大家这段时间我所积累的一些成果。
昨天,我写过一篇关于EQueue 2.0性能测试结果的文章,有兴趣的可以看看。
文章地址:
为什么要改为文件存储? SQL Server的问题之前EQueue的消息持久化是采用SQL Server的。一开始我觉得没什么问题,采用的是异步定时批量持久化,使用SqlBulkCopy的方法,这个方法测试下来,批量插入消息的性能还不错,就决定使用了。一开始我并没有在使用到EQueue后做集成的性能测试。在功能上确实没什么问题了。而且使用DB持久化也有很多好处,比如消息查询很简单,DB天生支持各种方式的查询。删除消息也非常简单,一条DELETE语句即可。所以功能实现比较顺利。但后来当我对EQueue做性能测试时,发现一些问题。当数据库服务器和Broker本身部署在不同的服务器上时,持久化消息也会走网卡,消耗带宽,影响消息的发送和消费的TPS。而如果数据库服务器部署在Broker同一台服务器上,则因为SQLServer本身也会消耗CPU以及内存,也会影响Broker的消息发送和消费的TPS。另外SqlBulkCopy的速度,再本身机器正在接收大量的发送消息和拉取消息的请求时,会不太稳定。经过一些测试,发现整个EQueue Broker的性能不太理想。然后又想想,Broker服务器有有一个硬件一直没有好好利用起来,那就是硬盘。假设我们的消息是持久化到本地硬盘的,顺序写文件,就应该能解决SQL Server的问题了。所以,开始调研如何实现文件持久化消息的方案了。
消息缓存在托管内存的GC的问题之前消息存储在SQL Server,如果消费者每次读取消息时,总是从数据库去读取,那对数据库就是不断的写入和读取,性能不太理想。所以当初的思路是,尽量把最近可能要被消费的消息缓存在本地内存中。当初的做法是设计了一个很大的ConcurrentDictionary<long, Message>,这个字典就是存放了所有可能会被消费的消息。如果要消费的消息当前不在这个字典里,就批量从DB拉取一批出来消费。这个设计可以尽可能的避免读取DB的情况。但是带来了另一个问题。就是我们对这个字典在高并发不断的写入和读取。且这个字典里缓存的消息又很多,到到达几百上千万时,GC的压力过大,导致很多线程都会被阻塞。严重影响Broker的TPS。
所以,基于上面的两个主要原因,我想到了两个思路来解决:1)采用写文件的方式来持久化消息;2)使用非托管内存来缓存将要被消费的消息;下面我们来看看这两个设计的一些关键问题的设计思路。
文件存储的关键问题设计 心路背景之前一直无法驾驭写文件的设计。因为精细化的将数据写入文件,并能要精确的读取想要的数据,实在没什么经验。之前虽然也知道阿里的RocketMQ的消息持久化也是采用顺序写文件的方式的,但是看了代码,发现设计很复杂,一下子也比较难懂。尝试看了多次也无法完全理解。所以一直无法掌握这种方式。有一天不经意间想到之前看过的EventStore这个开源项目中,也有写文件的设计。这个项目是CQRS架构之父greg young所主导的开源项目,是一个专门为ES(Event Sourcing)设计模式中提供保存事件流支持的事件流存储系统。于是下定决心专研其源码,看C#代码肯定还是比Java容易,呵呵。经过一段时间的摸索之后,基本学到了它是如何写文件以及如何读文件的。了解了很多设计思路。然后,在看懂了EventStore的文件存储设计之后,再去看RocketMQ的文件持久化的设计,发现惊人的相似。原来看不懂的代码现在也能看懂了,因为思路差不多的。所以,这给我开始动手提供了很大的信心。经过自己的一些准备(文件读写的性能验证)和设计思路整理后,终于开始动手了。
如何写消息到文件?其实说出来也很简单。之前一直以为写文件就是一个消息一行呗。这样当我们要找哪个消息时,只需要知道行号即可。确实,理论上这样也挺好。但上面这两个开源项目都不是这样做的,而是都是采用更精细化的直接写二进制的方式。搞清楚写入的格式之后,还要考虑一个文件写不下的时候怎么办?因为一个文件总是有大小的,比如1G,那超过1G后,必然要创建新的文件,然后把消息写入新的文件。所以,我们就又有了Chunk的概念。一个Chunk就是一个文件,假设我们现在实现了一个FileMessageStore,表示对文件持久化的封装,那这个FileMessageStore肯定维护了一堆的Chunk。然后我们也很容易想到一点,就是Chunk有3种状态:1)New,表示刚创建的Chunk,这种Chunk我们可以写入新消息进去;2)Completed,已写入完成的Chunk,这种Chunk是只读的;3)OnGoing的Chunk,就是当FileMessageStore初始化时,要从磁盘的某个chunk的目录下加载所有的Chunk文件,那不难理解,最后一个文件之前的Chunk文件应该都是Completed的;最后一个Chunk文件可能写入了一半,就是之前没完全用完的。所以,本质上New和Ongoing的Chunk其实是一样的,只是初始化的方式不同。
至此,我们知道了写文件的两个关键思路:1)按二进制写;2)拆分为Chunk文件,且每个Chunk文件有状态;按二进制写主要的思路是,假如我们当前要写入的消息的二进制数组大小为100个字节,也就是说消息的长度为100,那我们可以先把消息的长度写入文件,再接着写入消息本身。这样我们读取消息时,只要知道了写入消息长度时的那个Position,就能先读取到消息的长度,然后就能知道接下来要读取多少字节为消息内容。从而能正确读取消息出来。
另外再分享一点,EventStore中,写入一个事件到文件中时,还会在写入消息内容后再写入这个消息的长度到文件里。也就是说,写入一个数据到文件时,会在头尾都写入该数据的长度。这样做的好处是什么呢?就是当我们想从后往前读数据时,也能方便的做到,因为每个数据的前后都记录了该数据的长度。这点应该不难理解吧?而EventStore是一个面向流的存储系统,我们对事件流确实可能从前往后读,也可能是从后往前读。另外这个设计还有一个好处,就是起到了校验数据合法性的目的。当我们根据长度读取数据后,再数据之后再读取一个长度,如果这两个长度一致,那数据应该就没问题的。在RocketMQ中,是通过CRC校验的方式来保证读取的数据没有问题。我个人还是比较喜欢EventStore的做法。所以EQueue里现在写入数据就是这样做的。
上面我介绍了一种写入不定长数据到文件的设计思路,这种设计是为了解决写入消息到文件的情况,因为消息的长度是不定的。在EQueue中,我们还有一另一种写文件的场景。就是队列信息的持久化。EQueue的架构是一个Topic下有多个Queue,每个Queue里有很多消息,消费者负载均衡是通过给消费者分配均匀数量的Queue的方式来达到的。这样我们只要确保写入Queue的消息是均匀的,那每个Consumer消费到的消息数就是均匀的。那一个Queue里记录的是什么呢?就是一个消息和其在队列的位置的对应关系。假设消息写入在文件的物理位置为10000,然后这个消息在Queue里的索引是100,那这个队列就会把这两个位置对应起来。这样当我们要消费这个Queue中索引为100的消息时,就能找到这个消息在文件中的物理位置为10000,就能根据这个位置找到消息的内容了。如果是托管内存,我们只需要弄一个Dictionary,key是消息在队列中的Offset,value是消息在文件中的物理Offset即可。这样我们有了这个dict,就能轻松建立起对应关系了。但上面我说过,这种巨大的dict是要占用内存的,会有GC的问题。所以更好的办法是,把这个对应关系也写入文件,那怎么做呢?这时就又需要更精细化的设计了。想到了其实也很简单,这个设计我是从RocketMQ中学到的。就是我们设计一种固定长度的结构体,这个结构体里就存放一个数据,就是消息在文件的物理位置(为了后面好表达,我命名为MessagePosition),一个Long值,一个Long的长度是8个字节。也就是说,这个文件中,每个写入的数据的长度都是8个字节。假设我们一个文件要保存100W个MessagePosition。那这个文件的长度就是100W * 8这么多字节,大概为7.8MB。那么这样做有什么好处呢?好处就是,假如我们现在要消费这个Queue里的第一个消息,那这个消息的MessagePosition在这个文件中的位置0,第二个消息在这个文件中的位置是8,第三个就是16,以此类推,第N 个消息就是(N-1) * 8。也就是说,我们无须显式的把消息在队列中的位置信息也写入到文件,而是通过这样的固定算法,就能精确的算出Queue中某个消息的MessagePosition是写入在文件的哪个位置。然后拿到了MessagePosition之后,就能从Message的Chunk文件中读取到这个消息了。