/** * @filename:RubbishUsers.java * * Newland Co. Ltd. All rights reserved. * * @Description:敏感用户实体定义 * @author tangjie * @version 1.0 * */ package newlandframework.storm.model; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import java.io.Serializable; public class RubbishUsers implements Serializable { Integer homeCity; Integer userId; Integer msisdn; // 短信内容 String smsContent; String HOMECITY_COLUMNNAME = "home_city"; String USERID_COLUMNNAME = "user_id"; String MSISDN_COLUMNNAME = "msisdn"; String SMSCONTENT_COLUMNNAME = "sms_content"; Integer[] SENSITIVE_HOMECITYS = new Integer[] { 591, 592 }; String SENSITIVE_KEYWORD1 = "Bad"; String SENSITIVE_KEYWORD2 = "racketeer"; String[] SENSITIVE_KEYWORDS = new String[] { SENSITIVE_KEYWORD1, SENSITIVE_KEYWORD2 }; public Integer getHomeCity() { return homeCity; } public void setHomeCity(Integer homeCity) { this.homeCity = homeCity; } public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public Integer getMsisdn() { return msisdn; } public void setMsisdn(Integer msisdn) { this.msisdn = msisdn; } public String getSmsContent() { return smsContent; } public void setSmsContent(String smsContent) { this.smsContent = smsContent; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("homeCity", homeCity).append("userId", userId) .append("msisdn", msisdn).append("smsContent", smsContent) .toString(); } }
现在,我们看下敏感信息数据源组件SensitiveFileReader的具体实现,它负责从服务器的指定目录下面,读取疑似垃圾短信用户的垃圾短信内容文件,然后把每一行的数据,发送给下一个处理的Bolt(SensitiveFileAnalyzer),每个文件全部发送结束之后,在当前目录中,把原文件重命名成后缀bak的文件(当然,你可以重新建立一个备份目录,专门用来存储这种处理结束的文件),SensitiveFileReader的具体实现如下:
/** * @filename:SensitiveFileReader.java * * Newland Co. Ltd. All rights reserved. * * @Description:用户短信采集器 * @author tangjie * @version 1.0 * */ package newlandframework.storm.spout; import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.FileFilterUtils; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class SensitiveFileReader extends BaseRichSpout { String InputFuZhouPath = "/home/tj/data/591"; String InputXiaMenPath = "/home/tj/data/592"; String FinishFileSuffix = ".bak"; private String sensitiveFilePath = ""; private SpoutOutputCollector collector; public SensitiveFileReader(String sensitiveFilePath) { this.sensitiveFilePath = sensitiveFilePath; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { Collection<File> files = FileUtils.listFiles( new File(sensitiveFilePath), FileFilterUtils.notFileFilter(FileFilterUtils .suffixFileFilter(FinishFileSuffix)), null); for (File f : files) { try { List<String> lines = FileUtils.readLines(f, "GBK"); for (String line : lines) { System.out.println("[SensitiveTrace]:" + line); collector.emit(new Values(line)); } FileUtils.moveFile(f, new File(f.getPath() + System.currentTimeMillis() + FinishFileSuffix)); } catch (IOException e) { e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sensitive")); } }