HTML5技术

Storm构建分布式实时处理应用初探 - Newland(2)

字号+ 作者:H5之家 来源:博客园 2016-04-24 17:00 我要评论( )

/** * @filename:RubbishUsers.java * * Newland Co. Ltd. All rights reserved. * * @Description:敏感用户实体定义 * @author tangjie * @version 1.0 * */ package newlandframework.storm.model; import org.ap

/** * @filename:RubbishUsers.java * * Newland Co. Ltd. All rights reserved. * * @Description:敏感用户实体定义 * @author tangjie * @version 1.0 * */ package newlandframework.storm.model; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import java.io.Serializable; public class RubbishUsers implements Serializable { Integer homeCity; Integer userId; Integer msisdn; // 短信内容 String smsContent; String HOMECITY_COLUMNNAME = "home_city"; String USERID_COLUMNNAME = "user_id"; String MSISDN_COLUMNNAME = "msisdn"; String SMSCONTENT_COLUMNNAME = "sms_content"; Integer[] SENSITIVE_HOMECITYS = new Integer[] { 591, 592 }; String SENSITIVE_KEYWORD1 = "Bad"; String SENSITIVE_KEYWORD2 = "racketeer"; String[] SENSITIVE_KEYWORDS = new String[] { SENSITIVE_KEYWORD1, SENSITIVE_KEYWORD2 }; public Integer getHomeCity() { return homeCity; } public void setHomeCity(Integer homeCity) { this.homeCity = homeCity; } public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public Integer getMsisdn() { return msisdn; } public void setMsisdn(Integer msisdn) { this.msisdn = msisdn; } public String getSmsContent() { return smsContent; } public void setSmsContent(String smsContent) { this.smsContent = smsContent; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("homeCity", homeCity).append("userId", userId) .append("msisdn", msisdn).append("smsContent", smsContent) .toString(); } }

  现在,我们看下敏感信息数据源组件SensitiveFileReader的具体实现,它负责从服务器的指定目录下面,读取疑似垃圾短信用户的垃圾短信内容文件,然后把每一行的数据,发送给下一个处理的Bolt(SensitiveFileAnalyzer),每个文件全部发送结束之后,在当前目录中,把原文件重命名成后缀bak的文件(当然,你可以重新建立一个备份目录,专门用来存储这种处理结束的文件),SensitiveFileReader的具体实现如下:

/** * @filename:SensitiveFileReader.java * * Newland Co. Ltd. All rights reserved. * * @Description:用户短信采集器 * @author tangjie * @version 1.0 * */ package newlandframework.storm.spout; import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.FileFilterUtils; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class SensitiveFileReader extends BaseRichSpout { String InputFuZhouPath = "/home/tj/data/591"; String InputXiaMenPath = "/home/tj/data/592"; String FinishFileSuffix = ".bak"; private String sensitiveFilePath = ""; private SpoutOutputCollector collector; public SensitiveFileReader(String sensitiveFilePath) { this.sensitiveFilePath = sensitiveFilePath; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { Collection<File> files = FileUtils.listFiles( new File(sensitiveFilePath), FileFilterUtils.notFileFilter(FileFilterUtils .suffixFileFilter(FinishFileSuffix)), null); for (File f : files) { try { List<String> lines = FileUtils.readLines(f, "GBK"); for (String line : lines) { System.out.println("[SensitiveTrace]:" + line); collector.emit(new Values(line)); } FileUtils.moveFile(f, new File(f.getPath() + System.currentTimeMillis() + FinishFileSuffix)); } catch (IOException e) { e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sensitive")); } }

 

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

相关文章
  • 探索 vuex 2.0 以及使用 vuejs 2.0 + vuex 2.0 构建记事本应用 - nzbin

    探索 vuex 2.0 以及使用 vuejs 2.0 + vuex 2.0 构建记事本应用 - nzb

    2017-04-25 09:02

  • Session分布式共享 = Session + Redis + Nginx - 傲翼飞寒

    Session分布式共享 = Session + Redis + Nginx - 傲翼飞寒

    2017-03-10 16:00

  • 如何一秒钟从头构建一个 ASP.NET Core 中间件 - Savorboard

    如何一秒钟从头构建一个 ASP.NET Core 中间件 - Savorboard

    2017-02-23 16:01

  • 构建自动化前端样式回归测试——BackstopJS篇 - laden666666

    构建自动化前端样式回归测试——BackstopJS篇 - laden666666

    2017-02-13 16:03

网友点评