HTML5技术

C#分布式消息队列 EQueue 2.0 发布啦 - netfocus(2)

字号+ 作者:H5之家 来源:H5之家 2015-11-02 19:00 我要评论( )

通过上面的分析,我们知道了,Producer发送一个消息到Broker时,Broker会写两次磁盘。一次是现将消息本身写入磁盘(Message Chunk里),另一次是将消息的写入位置写入到磁盘(Queue Chunk里)。细心的朋友可能会问

通过上面的分析,我们知道了,Producer发送一个消息到Broker时,Broker会写两次磁盘。一次是现将消息本身写入磁盘(Message Chunk里),另一次是将消息的写入位置写入到磁盘(Queue Chunk里)。细心的朋友可能会问,假如我第一次写入成功,但第二次写入时失败,比如正好机器断电或者当前Broker服务器正好出啥问题 了,没有写入成功。那怎么办呢?这个没有什么大的影响。因为首先这种情况会被认为是消息发送失败。所以Producer还会重新发送该消息,然后Broker收到消息后还会再做一次这两个写入操作。也就是说,第一次写入的消息内容永远也不会用到了,因为那个写入位置永远也不会在Queue Chunk里有记录。

下面的代码展示了写消息到文件的核心代码:

//消息写文件需要加锁,确保顺序写文件 MessageStoreResult result = null; lock (_syncObj) { var queueOffset = queue.NextOffset; var messageRecord = _messageStore.StoreMessage(queueId, queueOffset, message); queue.AddMessage(messageRecord.LogPosition, message.Tag); queue.IncrementNextOffset(); result = new MessageStoreResult(messageRecord.MessageId, message.Code, message.Topic, queueId, queueOffset, message.Tag); }

StoreMessage方法内部实现:

public MessageLogRecord StoreMessage(int queueId, long queueOffset, Message message) { var record = new MessageLogRecord( message.Topic, message.Code, message.Body, queueId, queueOffset, message.CreatedTime, DateTime.Now, message.Tag); _chunkWriter.Write(record); return record; }

queue.AddMessage方法的内部实现:

public void AddMessage(long messagePosition, string messageTag) { _chunkWriter.Write(new QueueLogRecord(messagePosition + 1, messageTag.GetHashcode2())); }

ChunkWriter的内部实现:

public long Write(ILogRecord record) { lock (_lockObj) { if (_isClosed) { ); } (_currentChunk.IsCompleted) { _currentChunk = _chunkManager.AddNewChunk(); } result = _currentChunk.TryAppend(record); (!result.Success) { //结束当前文件 _currentChunk.Complete(); //新建新的文件 _currentChunk = _chunkManager.AddNewChunk(); //再尝试写入新的文件 result = _currentChunk.TryAppend(record); (!result.Success) { ); } } (_chunkManager.Config.SyncFlush) { _currentChunk.Flush(); } result.Position; } }

当然,我上面为了简化问题的复杂度。所以没有引入关于如何根据某个全局的MessagePosition找到其在哪个Message Chunk的问题。这个其实也很好做,我们首先固定好每个Message Chunk文件的大小。比如大小为256MB,然后我们为每个Chunk文件设计一个ChunkHeader,每个Chunk文件总是先把这个ChunkHeader写入文件,这个Header里记录了这个文件的起始位置和结束位置,以及文件的大小。这样我们根据某个MessagePosition计算其在哪个Chunk文件时,只需要把这个MessagePositon对Chunk的大小做取摸操作即可。根据数据的位置找其在哪个Chunk的代码看起来如下面这样这样:

 

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

相关文章
  • 运用google-protobuf的IM消息应用开发(前端篇) - 子慕大诗人

    运用google-protobuf的IM消息应用开发(前端篇) - 子慕大诗人

    2017-03-30 18:00

  • Session分布式共享 = Session + Redis + Nginx - 傲翼飞寒

    Session分布式共享 = Session + Redis + Nginx - 傲翼飞寒

    2017-03-10 16:00

  • 【G】开源的分布式部署解决方案(一) - 开篇 - 寻找和谐

    【G】开源的分布式部署解决方案(一) - 开篇 - 寻找和谐

    2017-01-24 14:01

  • ASP.Net MVC4+Memcached+CodeFirst实现分布式缓存 - 奔跑吧!小郭

    ASP.Net MVC4+Memcached+CodeFirst实现分布式缓存 - 奔跑吧!小郭

    2017-01-01 13:00

网友点评
m