但是从文件读取时,可能会遇到一个问题。就是我们刚写入到文件的数据可能无法立即读取到。因为写入的数据没有立即刷盘,所以无法通过Reader读取到。所以,我们不能仅通过判断当前写入的位置来判断当前是否还有数据可以被读取,而是考虑当前的最后一次刷盘的位置。理论上只能读取刷盘之前的数据。但即便这样设计了,在如果当前硬盘不是SSD的情况下,好像也会出现读不到数据的问题。偶尔会报错,有朋友在测试时已经遇到了这样的问题。那怎么办呢?我想了一个办法。因为这种情况归根接地还是因为我们逻辑上认为已经写入到文件的数据由于未及时刷盘或者操作系统本身的内部缓存的问题,导致数据未能及时写入磁盘。出现这种情况一定是最近的一些数据。所以我们如果能够把比如最近写入的10000(可配置)个数据都缓存在本地托管内存中,然后读取时先看本地缓存的托管内存中有没有这个位置的数据,如果有,就不需要读文件了。这样就能很好的解决这个问题了。那怎么确保我们只缓存了最新的10000个数据且不会超出10000个呢?答案是环形队列。这个名字听起来很高大上,其实就是一个数组,数组的长度为10000,然后我们在写入数据时,我们肯定知道这个数据在文件中的位置的,我们可以把这个位置(一个long值)对10000取摸,就能知道该把这个数据缓存在这个数组的哪个位置了。通过这个设计确保缓存的数据不会超过10000个,且确保一定只缓存最新的数据,如果新的数据保存到数组的某个下标时,该下标已经存在以前已经保存过的数据了,就自动覆盖掉即可。由于这个数组的长度不是很长,所以每什么GC的问题。
但是光这样还不够,我们这个数组中的每个元素至少要记录这个元素对应的数据在文件中的位置。这个是为了我们在从数组中获取到数据后,进一步校验这个数据是否是我想要的那个位置的数据。这点大家应该可以理解的吧。下面这段代码展示了如何从环形数组中读取想要的数据:
if (_cacheItems != null) { var index = dataPosition % _chunkConfig.ChunkLocalCacheSize; var cacheItem = _cacheItems[index]; if (cacheItem != null && cacheItem.RecordPosition == dataPosition) { var record = readRecordFunc(cacheItem.RecordBuffer); if (record == null) { throw new ChunkReadException( , dataPosition, this)); } if (_chunkConfig.EnableChunkReadStatistic) { _chunkStatisticService.AddCachedReadCount(ChunkHeader.ChunkNumber); } return record; } }
_cacheItems是当前Chunk内的一个环形数组,然后假如当前我们要读取的数据的位置是dataPosition,那我们只需要先对环形数据的长度取摸,得到一个下标,即上面代码中的index。然后就能从数组中拿到对应的数据了,然后如果这个数据存在,就进一步判断这个数据dataPosition是否和要求的dataPosition,如果一致,我们就能确定这个数据确实是我们想要的数据了。就可以返回了。
所以,通过上面的两种缓存(非托管内存+托管内存环形数组)的设计,我们可以确保几乎不用再从文件读取消息了。那什么时候还是会从文件读取呢?就是在1)内存不够用了;2)当前要读取的数据不是最近的10000个;这两个前提下,才会从文件读取。一般我们线上服务器,肯定会保证内存是可用的。EQueue现在有两个内存使用的水位。一个是当物理内存使用到多少百分比(默认值为40%)时,开始清理已经不再活跃的Chunk文件的非托管内存Chunk;那什么是不活跃呢?就是在最近5s内没有发生过读写的Chunk。这个设计我觉得是非常有效的,因为假如一个Chunk有5s没有发生过读写,那一般肯定是没有消费者在消费它了。另一个水位是指,最多EQueue Broker最多使用物理内存的多少百分比(默认值为75%),这个应该好理解。这个水位是为了保证EQueue不会把所有物理内存都吃光,是为了确保服务器不会因为内存耗尽而宕机或导致服务不可用。
那什么时候会出现大量使用服务器内存的情况呢?我们可以推导出来的。正常情况下,消息写入第一个Chunk,我们就在读取第一个Chunk;写入第二个Chunk我们也会跟着读取第二个Chunk;假设当前写入到了第10个Chunk,那理论上前面的9个Chunk之前缓存的非托管内存都可以释放了。因为肯定超过5s没有发生读写了。但是假如现在消费者有很多,且每个消费者的消费进度都不同,有些很快,有些很慢,当所有的消费者的消费进度正好覆盖到所有的Chunk文件时,就意味着每个Chunk文件都在发生读取。也就是说,每个Chunk都是活跃的。那此时就无法释放任何一个Chunk的非托管内存了。这样就会导致占用大量非托管内存了。但由于75%的水位的设计,Broker内存的使用是不会超过物理内存75%的。在创建新的Chunk或者尝试缓存一个Completed的Chunk时,总是会判断当前使用的物理内存是否已经超过75%,如果已经超过,就不会分配对应的非托管内存了。
如何删除消息?删除消息的设计比较简单。主要的思路是,当我们的消息已经被所有的消费者都消费过了,且满足我们的删除策略了,就可以删除了。RocketMQ删除消息的策略比较粗暴,没有考虑消息是否经被消费,而是直接到了一定的时间就删除了,比如最多只保留2天。这个是RocketMQ的设计。EQueue中,会确保消息一定是被所有的消费者都消费了才会考虑删除。然后目前我设计的删除策略有两种:
实际上,应该可能还有一些需求希望能把两个策略合起来考虑的。这个目前我没有做,我觉得这两种应该够了。如果大家想做,可以自己扩展的。