private static void Test3(Soldier[] soldiers) { QueueChannel<int> channel = new QueueChannel<int>() { Timer = new Job.JobTimer { Interval = 3000 }, BufferAlert = 10 }; DirectiveProduce producer = new DirectiveProduce { Instance = new DirectiveAction() { } }; channel.AddProducer(producer); NumIncreaser increaser = new NumIncreaser { Min = 2000, Max = 60000 }; NumLower lower = new NumLower { Min = 5, Max = 1000 }; foreach (var item in soldiers) { channel.AddConsumer(new QueueConsumeService<int>() { Instance = item, Timer = new Job.AutoJobTimer { Interval = 1000, Increaser = increaser, Lower = lower } }); } channel.Init(); channel.Start(); while (producer.Run()) { } }
这个例子和前面例子区别很小,就是把JobTimer换成AutoJobTimer,AutoJobTimer有两个属性,Increaser是周期变长(减速齿轮),Lower是周期变短(加速齿轮)
Increaser是在2秒和1分钟之间变动
Lower是在5毫秒到1秒钟之间变动
也就是说,这样设置把消费者实际(一般情况下轮训)的周期从1秒拉长为1分钟,节省了98%的轮训cpu
但是却在高并发的情况下加速99%
字面上算起来就是这样,具体效果我们来测试一下
8、变速齿轮运行结果
A:以上例子是并发1000个消息,由于控制台一页已经显示不下了,直好把最前和最后各截图一张
B:前后13秒1000个任务执行完毕,对比前面没加变速齿轮的是3秒执行了19个任务,初算下来加速92%(这个测试会受到采集数据的精度及每次测试的偶发情况稍后不同)
以上都是进程内的消息队列测试,分布式的消息队列更高大上,能不能也支持很好呢?
当然可以,前面展示的很清楚,消息队列由生产者、消费者、消息频道(排队频道和订阅频道)三大块组成,且每一块都相互独立,要接入分布式按此模式即可,很多情况下只需要定义其中一块甚至只是其中一块的一部分
三、MSMQ(微软消息队列)的例子
1、先看生产者测试代码
public static void Test() { Fang.MSMQ.Queue<int> queue = new Fang.MSMQ.Queue<int>(); queue.Init(); Test1(queue); } private static void Test1(IEntityQueue<int> queue) { QueueChannel<int> channel = new QueueChannel<int>() { QueueBuffer = queue }; Produce<int> producer = new Produce<int>() { Instance = new DirectiveAction() { } }; channel.AddProducer(producer); channel.Init(); channel.Start(); while (producer.Run()) { } }
注:以上代码和前面的代码还是很相似,区别如下:
A:这个代码中只有频道和生产者,消息频道和生产者的类和以前的一样(使用MSMQ不需要定义新的消息频道和生产者)
B:消息频道指定了使用MSMQ的队列(封装MSMQ实现框架的IEntityQueue接口即可)
面向接口可扩展框架嘛,认得是接口并不是具体实现
2、再看消费者测试代码
public static void Test() { Fang.MSMQ.Queue<int> queue = new Fang.MSMQ.Queue<int>(); queue.Init(); Soldier[] soldiers = new Soldier[] { new Soldier { Name = "士兵1" } , new Soldier { Name = "士兵2" } , new Soldier { Name = "士兵3" } , new Soldier { Name = "士兵4" } , new Soldier { Name = "士兵5" } , new Soldier { Name = "士兵6" } , new Soldier { Name = "士兵7" } , new Soldier { Name = "士兵8" } , new Soldier { Name = "士兵9" } }; Test1(queue, soldiers); }
private static void Test1(IEntityQueue<int> queue, Soldier[] soldiers) { QueueChannel<int> channel = new QueueChannel<int>() { QueueBuffer = queue }; foreach (var item in soldiers) { channel.AddConsumer(new QueueConsumeService<int>() { Instance = item, Timer = new Job.JobTimer { Interval = 1000 } }); } channel.Init(); channel.Start(); }
以上生产者和消费在不同的控制台程序中测试,模拟两台计算机
A:这个代码中只有频道和消费者,消息频道和生产者的类和以前的一样(使用MSMQ也不需要定义新的消费者)
消息频道定义和生产者测试程序完全一样,其实是两台计算机(或者两个进程),写程序的时候就像是一台一样
B:消息频道指定了使用MSMQ的队列(封装MSMQ实现框架的IEntityQueue接口即可)
3、分别执行结果如下
生产者和消费者都在随时待命
4、输入几个命令玩玩
生产者和消费者的执行时间差在2秒左右,含我操作输入时间,算我输入1秒左右,生产者和消费者时间差在1秒左右
注:不要纠结生产者的第一个提示信息时间,因第一个提示信息后我并没有立即输入,等了一会导致较大时间差
5、为了说明真的是走的MSMQ,我们先把消费者关掉,看系统的消息队列列表
注:以上是我又生产的4个消息
注:以上是MSMQ消息队列截图
注:以上是最后一条消息的正文截图
可以看到,这里使用的xml序列化的方式,有人可能会说,xml不好,性能不好等云云,其实对于使用哪种序列化也是面向接口的,完全可以配置
5、用Json发个消息的代码