HTML5技术

Storm构建分布式实时处理应用初探 - Newland(3)

字号+ 作者:H5之家 来源:博客园 2016-04-24 17:00 我要评论( )

监控短信内容拆解分析器SensitiveFileAnalyzer,这个Bolt组件,接收到数据源SensitiveFileReader的数据之后,就按照上面定义的格式,对文件中每一行的内容进行解析,然后把解析完毕的内容,继续发送给下一个Bolt组

  监控短信内容拆解分析器SensitiveFileAnalyzer,这个Bolt组件,接收到数据源SensitiveFileReader的数据之后,就按照上面定义的格式,对文件中每一行的内容进行解析,然后把解析完毕的内容,继续发送给下一个Bolt组件:SensitiveBatchBolt(敏感信息采集处理)。现在,我们来看下SensitiveFileAnalyzer这个Bolt组件的实现:

/** * @filename:SensitiveFileAnalyzer.java * * Newland Co. Ltd. All rights reserved. * * @Description:监控短信内容拆解分析 * @author tangjie * @version 1.0 * */ package newlandframework.storm.bolt; import java.util.Map; import newlandframework.storm.model.RubbishUsers; import org.apache.storm.guava.base.Splitter; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class SensitiveFileAnalyzer extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { String line = input.getString(0); Map<String, String> join = Splitter.on("&").withKeyValueSeparator("=").split(line); collector.emit(new Values((String) join .get(RubbishUsers.HOMECITY_COLUMNNAME), (String) join .get(RubbishUsers.USERID_COLUMNNAME), (String) join .get(RubbishUsers.MSISDN_COLUMNNAME), (String) join .get(RubbishUsers.SMSCONTENT_COLUMNNAME))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(RubbishUsers.HOMECITY_COLUMNNAME, RubbishUsers.USERID_COLUMNNAME, RubbishUsers.MSISDN_COLUMNNAME, RubbishUsers.SMSCONTENT_COLUMNNAME)); } }

  最后一个Bolt组件SensitiveBatchBolt(敏感信息采集处理)根据上游Bolt组件SensitiveFileAnalyzer发送过来的数据,然后跟业务规定的敏感关键字进行匹配,如果匹配成功,说明这个用户,就是我们要重点监控的用户,我们把他,通过hibernate采集到MySQL数据库,统一管理。最后要说明的是,SensitiveBatchBolt组件还实现了一个监控的功能,就是定期打印出,我们已经采集到的敏感信息用户数据。现在给出SensitiveBatchBolt的实现:

/** * @filename:SensitiveBatchBolt.java * * Newland Co. Ltd. All rights reserved. * * @Description:敏感信息采集处理 * @author tangjie * @version 1.0 * */ package newlandframework.storm.bolt; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import org.apache.commons.collections.Predicate; import org.apache.commons.collections.iterators.FilterIterator; import org.apache.commons.lang.StringUtils; import org.hibernate.Criteria; import org.hibernate.HibernateException; import org.hibernate.Session; import org.hibernate.SessionFactory; import org.hibernate.criterion.MatchMode; import org.hibernate.criterion.Restrictions; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import newlandframework.storm.model.RubbishUsers; public class SensitiveBatchBolt implements IBasicBolt { String HIBERNATE_APPLICATIONCONTEXT = "newlandframework/storm/resource/jdbc-hibernate-bean.xml"; ApplicationContext hibernate = new ClassPathXmlApplicationContext( HIBERNATE_APPLICATIONCONTEXT); SessionFactory sessionFactory = (SessionFactory) hibernate .getBean("sessionFactory"); public SensitiveBatchBolt() throws SQLException { super(); } private static List list = new ArrayList(Arrays.asList(RubbishUsers.SENSITIVE_KEYWORDS)); SensitivePredicate implements Predicate { private String sensitiveWord = null; SensitivePredicate(String sensitiveWord) { this.sensitiveWord = sensitiveWord; } public boolean evaluate(Object object) { return this.sensitiveWord.contains((String) object); } } SensitiveMonitorThread implements Runnable { private int sensitiveMonitorTimeInterval = 0; private Session session = null; SensitiveMonitorThread(int sensitiveMonitorTimeInterval) { this.sensitiveMonitorTimeInterval = sensitiveMonitorTimeInterval; session = sessionFactory.openSession(); } public void run() { while (true) { try { Criteria criteria1 = session.createCriteria(RubbishUsers.class); criteria1.add(Restrictions.and(Restrictions.or(Restrictions .like("smsContent", StringUtils .center(RubbishUsers.SENSITIVE_KEYWORD1, RubbishUsers.SENSITIVE_KEYWORD1 .length() + 2, "%"), MatchMode.ANYWHERE), Restrictions.like( "smsContent", StringUtils .center(RubbishUsers.SENSITIVE_KEYWORD2, RubbishUsers.SENSITIVE_KEYWORD2 .length() + 2, "%"), MatchMode.ANYWHERE)), Restrictions.in("homeCity", RubbishUsers.SENSITIVE_HOMECITYS))); List<RubbishUsers> rubbishList = (List<RubbishUsers>) criteria1.list(); System.out.println(StringUtils.center("[SensitiveTrace 敏感用户清单如下]", 40, "-")); if (rubbishList != null) { System.out.println("[SensitiveTrace 敏感用户数量]:" + rubbishList.size()); for (RubbishUsers rubbish : rubbishList) { System.out.println(rubbish + rubbish.getSmsContent()); } } else { System.out.println("[SensitiveTrace 敏感用户数量]:0"); } } catch (HibernateException e) { e.printStackTrace(); } try { Thread.sleep(sensitiveMonitorTimeInterval * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } save(Tuple input) { Session session = sessionFactory.openSession(); try { RubbishUsers users = new RubbishUsers(); users.setUserId(Integer.parseInt(input .getStringByField(RubbishUsers.USERID_COLUMNNAME))); users.setHomeCity(Integer.parseInt(input .getStringByField(RubbishUsers.HOMECITY_COLUMNNAME))); users.setMsisdn(Integer.parseInt(input .getStringByField(RubbishUsers.MSISDN_COLUMNNAME))); users.setSmsContent(input .getStringByField(RubbishUsers.SMSCONTENT_COLUMNNAME)); Predicate isSensitiveFileAnalysis = new SensitivePredicate( (String) input.getStringByField(RubbishUsers.SMSCONTENT_COLUMNNAME)); FilterIterator iterator = new FilterIterator(list.iterator(),isSensitiveFileAnalysis); if (iterator.hasNext()) { session.beginTransaction(); // 入库MySQL session.save(users); session.getTransaction().commit(); } } catch (HibernateException e) { e.printStackTrace(); session.getTransaction().rollback(); } finally { session.close(); } } // 很多情况下面storm运行期执行报错,都是由于execute有异常导致的,重点观察execute的函数逻辑 // 最经常报错的情况是报告:ERROR backtype.storm.daemon.executor - java.lang.RuntimeException:java.lang.NullPointerException // backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java ...) // 类似这样的错误,有点莫名其妙,开始都运行的很正常,后面忽然就报空指针异常了,我开始以为是storm部署的问题, // 后面jstack跟踪发现,主要还是execute逻辑的问题,所以遇到这类的问题不要手忙脚乱,适当结合jstack跟踪定位 @Override public void execute(Tuple input, BasicOutputCollector collector) { save(input); } public Map<String, Object> getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { final int sensitiveMonitorTimeInterval = Integer.parseInt(stormConf .get("RUBBISHMONITOR_INTERVAL").toString()); SensitiveMonitorThread montor = new SensitiveMonitorThread( sensitiveMonitorTimeInterval); new Thread(montor).start(); } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } }

  由于是通过hibernate入库到MySQL,所以给出hibernate配置,首先是:hibernate.cfg.xml

 

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

相关文章
  • 探索 vuex 2.0 以及使用 vuejs 2.0 + vuex 2.0 构建记事本应用 - nzbin

    探索 vuex 2.0 以及使用 vuejs 2.0 + vuex 2.0 构建记事本应用 - nzb

    2017-04-25 09:02

  • Session分布式共享 = Session + Redis + Nginx - 傲翼飞寒

    Session分布式共享 = Session + Redis + Nginx - 傲翼飞寒

    2017-03-10 16:00

  • 如何一秒钟从头构建一个 ASP.NET Core 中间件 - Savorboard

    如何一秒钟从头构建一个 ASP.NET Core 中间件 - Savorboard

    2017-02-23 16:01

  • 构建自动化前端样式回归测试——BackstopJS篇 - laden666666

    构建自动化前端样式回归测试——BackstopJS篇 - laden666666

    2017-02-13 16:03

网友点评