通过上面的分析,我们知道了,Producer发送一个消息到Broker时,Broker会写两次磁盘。一次是现将消息本身写入磁盘(Message Chunk里),另一次是将消息的写入位置写入到磁盘(Queue Chunk里)。细心的朋友可能会问,假如我第一次写入成功,但第二次写入时失败,比如正好机器断电或者当前Broker服务器正好出啥问题 了,没有写入成功。那怎么办呢?这个没有什么大的影响。因为首先这种情况会被认为是消息发送失败。所以Producer还会重新发送该消息,然后Broker收到消息后还会再做一次这两个写入操作。也就是说,第一次写入的消息内容永远也不会用到了,因为那个写入位置永远也不会在Queue Chunk里有记录。
下面的代码展示了写消息到文件的核心代码:
//消息写文件需要加锁,确保顺序写文件 MessageStoreResult result = null; lock (_syncObj) { var queueOffset = queue.NextOffset; var messageRecord = _messageStore.StoreMessage(queueId, queueOffset, message); queue.AddMessage(messageRecord.LogPosition, message.Tag); queue.IncrementNextOffset(); result = new MessageStoreResult(messageRecord.MessageId, message.Code, message.Topic, queueId, queueOffset, message.Tag); }
StoreMessage方法内部实现:
public MessageLogRecord StoreMessage(int queueId, long queueOffset, Message message) { var record = new MessageLogRecord( message.Topic, message.Code, message.Body, queueId, queueOffset, message.CreatedTime, DateTime.Now, message.Tag); _chunkWriter.Write(record); return record; }
queue.AddMessage方法的内部实现:
public void AddMessage(long messagePosition, string messageTag) { _chunkWriter.Write(new QueueLogRecord(messagePosition + 1, messageTag.GetHashcode2())); }
ChunkWriter的内部实现:
public long Write(ILogRecord record) { lock (_lockObj) { if (_isClosed) { ); } (_currentChunk.IsCompleted) { _currentChunk = _chunkManager.AddNewChunk(); } result = _currentChunk.TryAppend(record); (!result.Success) { //结束当前文件 _currentChunk.Complete(); //新建新的文件 _currentChunk = _chunkManager.AddNewChunk(); //再尝试写入新的文件 result = _currentChunk.TryAppend(record); (!result.Success) { ); } } (_chunkManager.Config.SyncFlush) { _currentChunk.Flush(); } result.Position; } }
当然,我上面为了简化问题的复杂度。所以没有引入关于如何根据某个全局的MessagePosition找到其在哪个Message Chunk的问题。这个其实也很好做,我们首先固定好每个Message Chunk文件的大小。比如大小为256MB,然后我们为每个Chunk文件设计一个ChunkHeader,每个Chunk文件总是先把这个ChunkHeader写入文件,这个Header里记录了这个文件的起始位置和结束位置,以及文件的大小。这样我们根据某个MessagePosition计算其在哪个Chunk文件时,只需要把这个MessagePositon对Chunk的大小做取摸操作即可。根据数据的位置找其在哪个Chunk的代码看起来如下面这样这样: