到此为止,我们已经完成了敏感信息实时监控的所有的Storm组件的开发。现在,我们来完成Storm的拓扑(Topology),由于拓扑(Topology)又分为本地拓扑和分布式拓扑,因此封装了一个工具类StormRunner(拓扑执行器),对应的代码如下:
/** * @filename:StormRunner.java * * Newland Co. Ltd. All rights reserved. * * @Description:拓扑执行器 * @author tangjie * @version 1.0 * */ package newlandframework.storm.topology; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; StormRunner { MILLIS_IN_SEC = 1000; runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, topology); Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC); cluster.killTopology(topologyName); cluster.shutdown(); } runTopologyRemotely(StormTopology topology, String topologyName, Config conf) throws AlreadyAliveException, InvalidTopologyException { StormSubmitter.submitTopology(topologyName, conf, topology); } }
好了,现在我们把上面所有的Spouts/Bolts拼接成“拓扑”(Topology)结构,我们这里用的是分布式拓扑,来进行部署运行。具体的SensitiveTopology(敏感用户监控Storm拓扑)代码如下:
/** * @filename:SensitiveTopology.java * * Newland Co. Ltd. All rights reserved. * * @Description:敏感用户监控Storm拓扑 * @author tangjie * @version 1.0 * */ package newlandframework.storm.topology; import java.sql.SQLException; import newlandframework.storm.bolt.SensitiveBatchBolt; import newlandframework.storm.bolt.SensitiveFileAnalyzer; import newlandframework.storm.model.RubbishUsers; import newlandframework.storm.spout.SensitiveFileReader; import org.apache.commons.lang.StringUtils; import backtype.storm.Config; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class SensitiveTopology { String SensitiveSpoutFuZhou = "SensitiveSpout591"; String SensitiveSpoutXiaMen = "SensitiveSpout592"; String SensitiveBoltAnalysis = "SensitiveBoltAnalysis"; String SensitiveBoltPersistence = "SensitiveBolPersistence"; main(String[] args) throws SQLException { System.out.println(StringUtils.center("SensitiveTopology", 40, "*")); TopologyBuilder builder = new TopologyBuilder(); // 构建spout,分别设置并行度为2 builder.setSpout(SensitiveSpoutFuZhou, new SensitiveFileReader( SensitiveFileReader.InputFuZhouPath), 2); builder.setSpout(SensitiveSpoutXiaMen, new SensitiveFileReader( SensitiveFileReader.InputXiaMenPath), 2); // 构建bolt设置并行度为4 builder.setBolt(SensitiveBoltAnalysis, new SensitiveFileAnalyzer(), 4) .shuffleGrouping(SensitiveSpoutFuZhou) .shuffleGrouping(SensitiveSpoutXiaMen); // 构建bolt设置并行度为4 SensitiveBatchBolt persistenceBolt = new SensitiveBatchBolt(); builder.setBolt(SensitiveBoltPersistence, persistenceBolt, 4) .fieldsGrouping( SensitiveBoltAnalysis, new Fields(RubbishUsers.HOMECITY_COLUMNNAME, RubbishUsers.USERID_COLUMNNAME, RubbishUsers.MSISDN_COLUMNNAME)); Config conf = new Config(); conf.setDebug(true); // 设置worker,集群里面最大就8个slots了,全部使用上 conf.setNumWorkers(8); // 3秒监控一次敏感信息入库MySQL情况 conf.put("RUBBISHMONITOR_INTERVAL", 3); { StormRunner.runTopologyRemotely(builder.createTopology(),"SensitiveTopology", conf); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } } }
到此为止,所有的Storm组件已经开发完毕!现在,我们把上述工程打成jar包,放到Storm集群中运行,具体可以到Nimbus对应的Storm安装目录下面的bin目录,输入:storm jar + {jar路径}。