HTML5技术

Storm构建分布式实时处理应用初探 - Newland(5)

字号+ 作者:H5之家 来源:博客园 2016-04-24 17:00 我要评论( )

到此为止,我们已经完成了敏感信息实时监控的所有的Storm组件的开发。现在,我们来完成Storm的拓扑(Topology),由于拓扑(Topology)又分为本地拓扑和分布式拓扑,因此封装了一个工具类StormRunner(拓扑执行器)

  到此为止,我们已经完成了敏感信息实时监控的所有的Storm组件的开发。现在,我们来完成Storm的拓扑(Topology),由于拓扑(Topology)又分为本地拓扑和分布式拓扑,因此封装了一个工具类StormRunner(拓扑执行器),对应的代码如下:

/** * @filename:StormRunner.java * * Newland Co. Ltd. All rights reserved. * * @Description:拓扑执行器 * @author tangjie * @version 1.0 * */ package newlandframework.storm.topology; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; StormRunner { MILLIS_IN_SEC = 1000; runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, topology); Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC); cluster.killTopology(topologyName); cluster.shutdown(); } runTopologyRemotely(StormTopology topology, String topologyName, Config conf) throws AlreadyAliveException, InvalidTopologyException { StormSubmitter.submitTopology(topologyName, conf, topology); } }

  好了,现在我们把上面所有的Spouts/Bolts拼接成“拓扑”(Topology)结构,我们这里用的是分布式拓扑,来进行部署运行。具体的SensitiveTopology(敏感用户监控Storm拓扑)代码如下:

/** * @filename:SensitiveTopology.java * * Newland Co. Ltd. All rights reserved. * * @Description:敏感用户监控Storm拓扑 * @author tangjie * @version 1.0 * */ package newlandframework.storm.topology; import java.sql.SQLException; import newlandframework.storm.bolt.SensitiveBatchBolt; import newlandframework.storm.bolt.SensitiveFileAnalyzer; import newlandframework.storm.model.RubbishUsers; import newlandframework.storm.spout.SensitiveFileReader; import org.apache.commons.lang.StringUtils; import backtype.storm.Config; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class SensitiveTopology { String SensitiveSpoutFuZhou = "SensitiveSpout591"; String SensitiveSpoutXiaMen = "SensitiveSpout592"; String SensitiveBoltAnalysis = "SensitiveBoltAnalysis"; String SensitiveBoltPersistence = "SensitiveBolPersistence"; main(String[] args) throws SQLException { System.out.println(StringUtils.center("SensitiveTopology", 40, "*")); TopologyBuilder builder = new TopologyBuilder(); // 构建spout,分别设置并行度为2 builder.setSpout(SensitiveSpoutFuZhou, new SensitiveFileReader( SensitiveFileReader.InputFuZhouPath), 2); builder.setSpout(SensitiveSpoutXiaMen, new SensitiveFileReader( SensitiveFileReader.InputXiaMenPath), 2); // 构建bolt设置并行度为4 builder.setBolt(SensitiveBoltAnalysis, new SensitiveFileAnalyzer(), 4) .shuffleGrouping(SensitiveSpoutFuZhou) .shuffleGrouping(SensitiveSpoutXiaMen); // 构建bolt设置并行度为4 SensitiveBatchBolt persistenceBolt = new SensitiveBatchBolt(); builder.setBolt(SensitiveBoltPersistence, persistenceBolt, 4) .fieldsGrouping( SensitiveBoltAnalysis, new Fields(RubbishUsers.HOMECITY_COLUMNNAME, RubbishUsers.USERID_COLUMNNAME, RubbishUsers.MSISDN_COLUMNNAME)); Config conf = new Config(); conf.setDebug(true); // 设置worker,集群里面最大就8个slots了,全部使用上 conf.setNumWorkers(8); // 3秒监控一次敏感信息入库MySQL情况 conf.put("RUBBISHMONITOR_INTERVAL", 3); { StormRunner.runTopologyRemotely(builder.createTopology(),"SensitiveTopology", conf); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } }

  到此为止,所有的Storm组件已经开发完毕!现在,我们把上述工程打成jar包,放到Storm集群中运行,具体可以到Nimbus对应的Storm安装目录下面的bin目录,输入:storm jar + {jar路径}。

 

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

相关文章
  • 探索 vuex 2.0 以及使用 vuejs 2.0 + vuex 2.0 构建记事本应用 - nzbin

    探索 vuex 2.0 以及使用 vuejs 2.0 + vuex 2.0 构建记事本应用 - nzb

    2017-04-25 09:02

  • Session分布式共享 = Session + Redis + Nginx - 傲翼飞寒

    Session分布式共享 = Session + Redis + Nginx - 傲翼飞寒

    2017-03-10 16:00

  • 如何一秒钟从头构建一个 ASP.NET Core 中间件 - Savorboard

    如何一秒钟从头构建一个 ASP.NET Core 中间件 - Savorboard

    2017-02-23 16:01

  • 构建自动化前端样式回归测试——BackstopJS篇 - laden666666

    构建自动化前端样式回归测试——BackstopJS篇 - laden666666

    2017-02-13 16:03

网友点评