而我在设计时,希望能再严谨一点,取消Checkpoint的设计,而是采用在初始化Ongoing状态的Chunk文件时,从文件的头开始不断往下读,当最后无法往下读时,我们就知道这个文件我们当前写入到哪里了。那怎么知道无法往下读了呢?也就是说怎么确定后续的文件内容不是我们写入的?也很简单。对于不固定数据长度的Chunk来说,由于我们每次写入一个数据时都是同时在前后写入这个数据的长度;所以我们再初始化读取这个文件时,可以借助这一点来校验,但出现不符合这个规则的数据时,就认为后续不是正常的数据了。对于固定长度的Chunk来说,我们只要保证每次写入的数据的数据是非0了。而对于EQueue的场景,固定数据的Chunk里存储的都是消息在Message Chunk中的全局位置,一个Long值;但这个Long值我们正常是从0开始的,怎么办呢?很简单,我们写入MessagePosition时,总是加1即可。即假如当前的MessagePosition为0,那我们实际写入1,如果为100,则实际写入的值是101。这样我们就能确保这个固定长度的Chunk文件里每个数据都是非0的。然后我们在初始化这样的Chunk文件时,只要不断读取固定长度(8个字节)的数据,当出现读取到的数据为0时,就认为已经到头了,即后续的不是我们写入的数据了。然后我们就能知道接下来要从哪里开始读取了哦。
如何尽量避免读文件?上面我介绍了如何读文件的思路。我们也知道了,我们是在消费者要消费消息时,从文件读取消息的。但对从文件读取消息总是没有比从内存读取消息来的快。我们前面的设计都没有把内存好好利用起来。所以我们能否考虑把未来可能要消费的Chunk文件的内容直接缓存在内存呢?这样我们就可以避免对文件的读取了。肯定可以的。那怎么做呢?前面我提高多,曾经我们用托管内存中的ConcurrentDictionary<long, Message>这样的字典来缓存消息。我也提到这会带来垃圾回收而影响性能的问题。所以我们不能直接这样简单的设计。经过我的一些尝试,以及从EventStore中的源码中学到的,我们可以使用非托管内存来缓存Chunk文件。我们可以使用Marshal.AllocHGlobal来申请一块完整的非托管内存,然后再需要释放时,通过Marshal.FreeHGlobal来释放。然后,我们可以通过UnmanagedMemoryStream来访问这个非托管内存。这个是核心思路。那么怎样把一个Chunk文件缓存到非托管内存呢?很简单了,就是扫描这个文件的所有内容,把内容都写入内存即可。代码如下:
private void LoadFileChunkToMemory() { using (var fileStream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 8192, FileOptions.None)) { var cachedLength = (int)fileStream.Length; var cachedData = Marshal.AllocHGlobal(cachedLength); try { using (var unmanagedStream = new UnmanagedMemoryStream((byte*)cachedData, cachedLength, cachedLength, FileAccess.ReadWrite)) { fileStream.Seek(0, SeekOrigin.Begin); var buffer = new byte[65536]; int toRead = cachedLength; while (toRead > 0) { int read = fileStream.Read(buffer, 0, Math.Min(toRead, buffer.Length)); if (read == 0) { break; } toRead -= read; unmanagedStream.Write(buffer, 0, read); } } } catch { Marshal.FreeHGlobal(cachedData); throw; } _cachedData = cachedData; _cachedLength = cachedLength; } }
代码很简单,不用多解释了。需要注意的是,上面这个方法针对的是Completed状态的Chunk,即已经写入完成的Chunk的。已经写入完全的Chunk是只读的,不会再发生更改,所以我们可以随便缓存在内存中。
那对于新创建出来的Chunk文件呢?正常情况下,消费者来得及消费时,我们总是在不断的写入最新的Chunk文件,也在不断的从这个最新的Chunk文件读取消息。那我们怎么确保消费最新的消息时,也不需要从文件读取呢?也很简单,就是在新建一个Chunk文件时,如果内存足够,也同时创建一个一样大小的基于非托管内存的Chunk。然后我们再写入消息到文件Chunk成功后,再同时写入这个消息到非托管内存的Chunk。这样,我们在消费消息,读取消息时总是首先判断当前Chunk是否关联了一个非托管内存的Chunk,如果有,就优先从内存读取即可。如果没有才从文件Chunk读取。