监控短信内容拆解分析器SensitiveFileAnalyzer,这个Bolt组件,接收到数据源SensitiveFileReader的数据之后,就按照上面定义的格式,对文件中每一行的内容进行解析,然后把解析完毕的内容,继续发送给下一个Bolt组件:SensitiveBatchBolt(敏感信息采集处理)。现在,我们来看下SensitiveFileAnalyzer这个Bolt组件的实现:
/** * @filename:SensitiveFileAnalyzer.java * * Newland Co. Ltd. All rights reserved. * * @Description:监控短信内容拆解分析 * @author tangjie * @version 1.0 * */ package newlandframework.storm.bolt; import java.util.Map; import newlandframework.storm.model.RubbishUsers; import org.apache.storm.guava.base.Splitter; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class SensitiveFileAnalyzer extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { String line = input.getString(0); Map<String, String> join = Splitter.on("&").withKeyValueSeparator("=").split(line); collector.emit(new Values((String) join .get(RubbishUsers.HOMECITY_COLUMNNAME), (String) join .get(RubbishUsers.USERID_COLUMNNAME), (String) join .get(RubbishUsers.MSISDN_COLUMNNAME), (String) join .get(RubbishUsers.SMSCONTENT_COLUMNNAME))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(RubbishUsers.HOMECITY_COLUMNNAME, RubbishUsers.USERID_COLUMNNAME, RubbishUsers.MSISDN_COLUMNNAME, RubbishUsers.SMSCONTENT_COLUMNNAME)); } }
最后一个Bolt组件SensitiveBatchBolt(敏感信息采集处理)根据上游Bolt组件SensitiveFileAnalyzer发送过来的数据,然后跟业务规定的敏感关键字进行匹配,如果匹配成功,说明这个用户,就是我们要重点监控的用户,我们把他,通过hibernate采集到MySQL数据库,统一管理。最后要说明的是,SensitiveBatchBolt组件还实现了一个监控的功能,就是定期打印出,我们已经采集到的敏感信息用户数据。现在给出SensitiveBatchBolt的实现:
/** * @filename:SensitiveBatchBolt.java * * Newland Co. Ltd. All rights reserved. * * @Description:敏感信息采集处理 * @author tangjie * @version 1.0 * */ package newlandframework.storm.bolt; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import org.apache.commons.collections.Predicate; import org.apache.commons.collections.iterators.FilterIterator; import org.apache.commons.lang.StringUtils; import org.hibernate.Criteria; import org.hibernate.HibernateException; import org.hibernate.Session; import org.hibernate.SessionFactory; import org.hibernate.criterion.MatchMode; import org.hibernate.criterion.Restrictions; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import newlandframework.storm.model.RubbishUsers; public class SensitiveBatchBolt implements IBasicBolt { String HIBERNATE_APPLICATIONCONTEXT = "newlandframework/storm/resource/jdbc-hibernate-bean.xml"; ApplicationContext hibernate = new ClassPathXmlApplicationContext( HIBERNATE_APPLICATIONCONTEXT); SessionFactory sessionFactory = (SessionFactory) hibernate .getBean("sessionFactory"); public SensitiveBatchBolt() throws SQLException { super(); } private static List list = new ArrayList(Arrays.asList(RubbishUsers.SENSITIVE_KEYWORDS)); SensitivePredicate implements Predicate { private String sensitiveWord = null; SensitivePredicate(String sensitiveWord) { this.sensitiveWord = sensitiveWord; } public boolean evaluate(Object object) { return this.sensitiveWord.contains((String) object); } } SensitiveMonitorThread implements Runnable { private int sensitiveMonitorTimeInterval = 0; private Session session = null; SensitiveMonitorThread(int sensitiveMonitorTimeInterval) { this.sensitiveMonitorTimeInterval = sensitiveMonitorTimeInterval; session = sessionFactory.openSession(); } public void run() { while (true) { try { Criteria criteria1 = session.createCriteria(RubbishUsers.class); criteria1.add(Restrictions.and(Restrictions.or(Restrictions .like("smsContent", StringUtils .center(RubbishUsers.SENSITIVE_KEYWORD1, RubbishUsers.SENSITIVE_KEYWORD1 .length() + 2, "%"), MatchMode.ANYWHERE), Restrictions.like( "smsContent", StringUtils .center(RubbishUsers.SENSITIVE_KEYWORD2, RubbishUsers.SENSITIVE_KEYWORD2 .length() + 2, "%"), MatchMode.ANYWHERE)), Restrictions.in("homeCity", RubbishUsers.SENSITIVE_HOMECITYS))); List<RubbishUsers> rubbishList = (List<RubbishUsers>) criteria1.list(); System.out.println(StringUtils.center("[SensitiveTrace 敏感用户清单如下]", 40, "-")); if (rubbishList != null) { System.out.println("[SensitiveTrace 敏感用户数量]:" + rubbishList.size()); for (RubbishUsers rubbish : rubbishList) { System.out.println(rubbish + rubbish.getSmsContent()); } } else { System.out.println("[SensitiveTrace 敏感用户数量]:0"); } } catch (HibernateException e) { e.printStackTrace(); } try { Thread.sleep(sensitiveMonitorTimeInterval * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } save(Tuple input) { Session session = sessionFactory.openSession(); try { RubbishUsers users = new RubbishUsers(); users.setUserId(Integer.parseInt(input .getStringByField(RubbishUsers.USERID_COLUMNNAME))); users.setHomeCity(Integer.parseInt(input .getStringByField(RubbishUsers.HOMECITY_COLUMNNAME))); users.setMsisdn(Integer.parseInt(input .getStringByField(RubbishUsers.MSISDN_COLUMNNAME))); users.setSmsContent(input .getStringByField(RubbishUsers.SMSCONTENT_COLUMNNAME)); Predicate isSensitiveFileAnalysis = new SensitivePredicate( (String) input.getStringByField(RubbishUsers.SMSCONTENT_COLUMNNAME)); FilterIterator iterator = new FilterIterator(list.iterator(),isSensitiveFileAnalysis); if (iterator.hasNext()) { session.beginTransaction(); // 入库MySQL session.save(users); session.getTransaction().commit(); } } catch (HibernateException e) { e.printStackTrace(); session.getTransaction().rollback(); } finally { session.close(); } } // 很多情况下面storm运行期执行报错,都是由于execute有异常导致的,重点观察execute的函数逻辑 // 最经常报错的情况是报告:ERROR backtype.storm.daemon.executor - java.lang.RuntimeException:java.lang.NullPointerException // backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java ...) // 类似这样的错误,有点莫名其妙,开始都运行的很正常,后面忽然就报空指针异常了,我开始以为是storm部署的问题, // 后面jstack跟踪发现,主要还是execute逻辑的问题,所以遇到这类的问题不要手忙脚乱,适当结合jstack跟踪定位 @Override public void execute(Tuple input, BasicOutputCollector collector) { save(input); } public Map<String, Object> getComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { final int sensitiveMonitorTimeInterval = Integer.parseInt(stormConf .get("RUBBISHMONITOR_INTERVAL").toString()); SensitiveMonitorThread montor = new SensitiveMonitorThread( sensitiveMonitorTimeInterval); new Thread(montor).start(); } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } }
由于是通过hibernate入库到MySQL,所以给出hibernate配置,首先是:hibernate.cfg.xml