public Chunk GetChunkFor(long dataPosition) { var chunkNum = (int)(dataPosition / _config.GetChunkDataSize()); return GetChunk(chunkNum); } public Chunk GetChunk(int chunkNum) { if (_chunks.ContainsKey(chunkNum)) { return _chunks[chunkNum]; } return null; }
代码很简单,就不多讲了。拿到了Chunk对象后,我们就可以把dataPosition传给Chunk,然后Chunk内部把这个全局的dataPosition转换为本地的一个位置,就能准确的定位到这个数据在当前Chunk文件的实际位置了。将全局位置转换为本地的位置的算法也很简单直接:
public int GetLocalDataPosition(long globalDataPosition) { if (globalDataPosition < ChunkDataStartPosition || globalDataPosition > ChunkDataEndPosition) { , globalDataPosition, ChunkDataStartPosition, ChunkDataEndPosition)); } return (int)(globalDataPosition - ChunkDataStartPosition); }
只需要把这个全局的位置减去当前Chunk的数据开始位置,就能知道这个全局位置相对于当前Chunk的本地位置了。
好了,上面介绍了消息如何写入的主要思路以及如何读取数据的思路。
另外一点还想提一下,就是关于刷盘的策略。一般我们写数据到文件后,是需要调用文件流的Flush方法的,确保数据最终刷入到了磁盘上。否则数据就还是在缓冲区里。当然,我们需要注意到,即便调用了Flush方法,数据可能也还没真正逻辑到磁盘,而只是在操作系统内部的缓冲区里。这个我们就无法控制了,我们能做到的是调用了Flush方法即可。那当我们每次写入一个数据到文件都要调用Flush方法的话,无疑性能是低下的,所以就有了所谓的异步刷盘的设计。就是我们写入消息后不立即调用Flush方法,而是采用一个独立的线程,定时调用Flush方法来实现刷盘。目前EQueue支持同步刷盘和异步刷盘,开发者可以自己配置决定采用哪一种。异步刷盘的间隔默认是100ms。当我们在追求高吞吐量时,应该考虑异步刷盘,但要求数据可靠性更高但对吞吐量可以低一点时,则可以使用同步刷盘。如果又要高吞吐又要数据高可靠,那就只有一个办法了,呵呵。就是多增加一些Broker机器,通过集群来弥补单台Broker写入数据的瓶颈。
如何从文件读取消息?假设我们现在要从一个文件读取数据,且是多线程并发的读取,要怎么设计?一个办法是,每次读取时,创建文件流,然后创建StreamReader,然后读取文件,读取完成后释放StreamReader并关闭文件流。但每次要读取文件的一个数据都要这样做的话性能不是太好,因为我们反复的创建这样的对象。所以,这里我们可以使用对象池的概念。就是Chunk内部,预先创建好一些Reader,当需要读文件时,获取一个可用的Reader,读取完成后,再把Reader归还到对象池里。基于这个思路,我设计了一个简单的对象池:
private readonly ConcurrentQueue<ReaderWorkItem> _readerWorkItemQueue = new ConcurrentQueue<ReaderWorkItem>(); private void InitializeReaderWorkItems() { for (var i = 0; i < _chunkConfig.ChunkReaderCount; i++) { _readerWorkItemQueue.Enqueue(CreateReaderWorkItem()); } _isReadersInitialized = true; } private ReaderWorkItem CreateReaderWorkItem() { var stream = default(Stream); if (_isMemoryChunk) { stream = new UnmanagedMemoryStream((byte*)_cachedData, _cachedLength); } else { stream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, _chunkConfig.ChunkReadBuffer, FileOptions.None); } return new ReaderWorkItem(stream, new BinaryReader(stream)); } private ReaderWorkItem GetReaderWorkItem() { ReaderWorkItem readerWorkItem; while (!_readerWorkItemQueue.TryDequeue(out readerWorkItem)) { Thread.Sleep(1); } return readerWorkItem; } private void ReturnReaderWorkItem(ReaderWorkItem readerWorkItem) { _readerWorkItemQueue.Enqueue(readerWorkItem); }
当一个Chunk初始化时,我们预先初始化好固定数量(可配置)的Reader对象,并把这些对象放入一个ConcurrentQueue里(对象池的作用),然后要读取数据时,从从ConcurrentQueue里拿一个可用的Reader即可,如果当前并发太高拿不到怎么办,就等待直到拿到为止,目前我是等待1ms后继续尝试拿,直到最后拿到为止。然后ReturnReaderWorkItem就是数据读取完之后归还Reader到对象池。就是不是很简单哦。这样的设计,可以避免不断的创建文件流和Reader对象,可以避免GC的副作用。
Broker重启时如何做?大家知道,当Broker重启时,我们是需要扫描磁盘上Chunk目录下的所有Chunk文件的。那怎么扫描呢?上面其实我也简单提到过。首先,我们可以对每个Chunk文件的文件名的命名定义一个规则,第一个Chunk文件的文件名比如为:message-chunk-000000000,第二个为:message-chunk-000000001,以此类推。那我们扫描时,只要先把所有的文件名获取到,然后对文件名升序排序。那最后一个文件之前的文件肯定都是写入完全了的,即上面我说的Completed状态的,而最后一个文件是还没有写入完的,还可以接着写。所以我们初始化时,只需要先初始化最后一个之前的所有Chunk文件,最后再初始化最后一个文件即可。这里我所说的初始化不是要把整个Chunk文件的内容都加载到内存,而是只是读取这个文件的ChunkHeader的信息维护在内存即可。有了Header信息,我们就可以为后续的数据读取提供位置计算了。所以,整个加载过程是很快的,读取100个Chunk文件的ChunkHeader也不过一两秒的时间,完全不影响Broker的启动时间。对于初始化Completed的Chunk比较简单,只需要读取ChunkHeader信息即可。但是初始化最后一个文件比较麻烦,因为我们还要知道这个文件当前写入到哪里了?从而我们可以从这个位置的下一个位置接着往下写。那怎么知道这个文件当前写入到哪里了呢?其实比较复杂。有很多技术,我看到RocketMQ和EventStore这两个开源项目中都采用了Checkpoint的技术。就是当我们每次写入一个数据到文件后,都会更新一下Checkpoint,即表示当前写入到这个文件的哪里了。然后这个Checkpoint值我们也是定时异步保存到某个独立的小文件里,这个文件里只保存了这个Checkpoint。这样的设计有一个问题,就是假如数据写入了,但由于Checkpoint的保存不是实时的,所以理论上会出现Checkpint值会小于实际文件写入的位置的情况。一般我们忽略这种情况即可,即可能会存在初始化时,下次写入可能会覆盖一定的之前已经写入的数据,因为Checkpoint可能是稍微老一点的。