Test2(Soldier[] soldiers) { QueueChannel<int> channel = new QueueChannel<int>() { Timer = new Job.JobTimer { Interval = 3000 }, BufferAlert = 10 }; DirectiveProduce producer = new DirectiveProduce { Instance = new DirectiveAction() { } }; channel.AddProducer(producer); foreach (var item in soldiers) { channel.AddConsumer(new QueueConsumeService<int>() { Instance = item, Timer = new Job.JobTimer { Interval = 1000 } }); } channel.Init(); channel.Start(); while (producer.Run()) { } }
和前面代码区别很小,只是给队列频道(QueueChannel)加了一个时间调度(Timer),另外增加了一个属性BufferAlert(值为10,表示如果队列的长度超过这个值就报警(可能触发减少生产者增加消费者策略))
很明显,现在除了生产者和消费要消耗cpu外,队列频道也要消耗cpu;有人说你这样能更快鬼才信
哈哈,这里面“水”就深了。队列频道的调度不是白加的,会定时检查队列的情况
如果队列在增加,它会尝试减少生产者增加消费者,甚至停掉所有生产者
如果队列在较少,它会尝试减少消费者增加生产者,甚至停掉所有消费者
什么个意思呢,cpu也是资源,优化需要从开源和节流两方面下手,你一味增加逻辑线程物理线程不够用了只有害处没有好处;这个时候停掉一些其他的逻辑线程来释放物理线程就比增加逻辑线程更加有效
4、继续看测试结果
01:09:44.553 请输入指令:1 I say 1 01:09:46.631 士兵7向左转 01:09:47.662 请输入指令:4 I say 4 01:09:48.647 士兵3立正 01:09:50.100 请输入指令:2 I say 2 01:10:58.621 士兵9向右转 01:11:00.277 请输入指令:1 I say 1 01:11:04.701 士兵9向左转 01:11:05.967 请输入指令:9 I say 9 01:11:14.842 士兵9未知指令(9)
A:前两条还是9个消费者“抢着”执行
B:后面三条就都是最后一个消费(士兵9)一个执行了,另外8个消费者被“优化”掉了
有的人可能看不明白了,既然9个消费者你要“优化”掉8个,你还不如只定义一个消费者
哈哈别急,另外8个消费者还在呢,随时待命呢,我们来个模拟短暂并发
5、先看看模拟代码
public class DirectiveProduce : Produce<int> { public override object SendMessage(Data.IEntityAdd<int> channel, int message) { if (message < 10) return base.SendMessage(channel, message); for (int i = message; i > 0; i--) { base.SendMessage(channel, i % 8); } return message; } }
我们对生产者做了改动,如果传了的消息>=10,我们就把这个消息拆分为n份来发送,这样必然导致消息堆积
6、检测一下消息堆积后的效果
01:20:48.089 请输入指令:2 I say 2 01:23:19.149 士兵9向右转 01:23:20.899 请输入指令:4 I say 4 01:23:21.180 士兵9立正 01:23:22.914 请输入指令:19 I say 19 01:23:36.337 士兵9向后转 01:23:37.353 士兵9向右转 01:23:38.291 请输入指令:01:23:38.353 士兵9向左转 01:23:38.431 士兵6向左看齐 01:23:38.431 士兵7稍息 01:23:38.431 士兵2向右转 01:23:38.431 士兵8看前看 01:23:38.431 士兵4向右看齐 01:23:38.431 士兵3立正 01:23:38.431 士兵1向左转 01:23:38.431 士兵5向后转 01:23:39.369 士兵9看前看 01:23:39.447 士兵4向左看齐 01:23:39.447 士兵1向右看齐 01:23:39.447 士兵5向右转 01:23:39.447 士兵7稍息 01:23:39.447 士兵8立正 01:23:39.447 士兵6向后转 01:23:39.447 士兵3向左转 5 I say 5 01:25:47.964 士兵9稍息 01:25:49.230 请输入指令:6 I say 6 01:25:49.980 士兵9向右看齐
A:程序启动我故意等了一会再输入,就是等把消费者线程优化后再测试
B:前面两个指令是“士兵9”执行的毫无疑问
C:后面我批量模拟了19个指令,其他消费者陆续都出来了
消费者的周期是1秒,19个指令9个消费者全开启了,3秒就全部执行完了,效果是不是杠杠的
D:后来我又等了一会,再输入两个指令又都只有“士兵9”在执行,也就是说执行批处理后,其他8个消费者又陆续被“优化”掉了
这个是优化消费者,如果是生产者也是时间调取也会被优化,效果是一样,这里就不举例了
PS:这里我还是发现一个问题,时间调度(Timer)的问题,如果时间调度周期设置太短,执行任务是更加及时,但是cpu耗费太多,设置太长效率又太低,而并发总是在不经意发生,虽然可以动态增加生产者和消费者,但是生产者和消费的线程还是可能浪费严重。
这个我深有感触,我原来开发消息队列初期这个周期是写死,有的业务任务是每天甚至是每周、每月执行一次,周期设太小浪费cpu;有的业务偶发每秒10多个甚至几十个,每秒一个调度都远不够用
为了这个问题,我修改了多次调度周期的值,都无法满足各种需求,而且消息队列还时常成为cpu和内存的杀手,偶发cpu满载、内存泄露、进程假死,一段悲催的时光
后来,我”发明“了一个东西,我给它取名为”双向变速齿轮“,这个是我加班坐滚梯去餐厅吃饭时想到的。你看着滚梯在慢速运行,你一踩上去电梯就加速了,你下来后不久,电梯又变慢了。我就尝试在自己的消息队列中实现这样的逻辑。等这个逻辑成熟了,每种任务都基本上相安无事了,结果是So Perfect。
7、变速齿轮消息队列例子代码