最近利用闲暇时间,又重新研读了一下Storm。认真对比了一下Hadoop,前者更擅长的是,实时流式数据处理,后者更擅长的是基于HDFS,通过MapReduce方式的离线数据分析计算。对于Hadoop,本身不擅长实时的数据分析处理。两者的共同点都是分布式的架构,而且,都类似有主/从关系的概念。本文中我就不具体阐述Storm集群和Zookeeper集群如何部署的问题,我想通过一个实际的案例切入,分析一下如何利用Storm,完成实时分析处理数据的。
Storm本身是Apache托管的开源的分布式实时计算系统,它的前身是Twitter Storm。在Storm问世以前,处理海量的实时数据信息,大部分是类似于使用消息队列,加上工作进程/线程的方式。这使得构建这类的应用程序,变得异常的复杂。很多的业务逻辑中,你不得不考虑消息的发送和接收,线程之间的并发控制等等问题。而其中的业务逻辑可能只是占据整个应用的一小部分,而且很难做到业务逻辑的解耦。但是Storm的出现改变了这种局面,它首先抽象出数据流Stream的抽象概念,一个Stream指的是tuples组成的无边界的序列。后面又继续提出Spouts、Bolts的概念。Spouts在Storm里面是数据源,专门负责生成流。而Bolts则是以流作为输入,并重新生成流作为输出,并且Bolts还会继续指定它输入的流应该如何划分。最后Storm是通过拓扑(Topology)这种抽象概念,组织起若干个Spouts、Bolts构成的分布式数据处理网络。Storm设计的时候,就有意的把Spouts、Bolts组成的拓扑(Topology)网络通过Thrift服务方式进行封装,这个做法,使得Storm的Spouts、Bolts组件可以通过目前主流的任意语言实现,使得整个框架的兼容性和扩展性更加的优秀。
在Storm里面拓扑(Topology)的概念,非常类似Hadoop里面MapReduce的Job的概念。不同的是Storm的拓扑(Topology)只要你启动了,它就会一直运行下去,除非你kill掉;而MapReduce的Job最终它是会结束的。基于这样的模式,使得Storm非常适合处理实时性的数据分析,持续计算,DRPC(分布式RPC)等。
好了,我就结合实际的案例,设计分析一下,如何利用Storm改善应用的处理性能。
移动公司的垃圾短信监控平台,实时地上传每个省的疑似垃圾短信用户的垃圾短信内容文件,每个省则根据文件中垃圾短信的内容,解析过滤出,包含指定敏感关键字的垃圾短信进行入库。被入库的垃圾短信用户被列为敏感用户,是重点监控对象,毕竟乱发这些垃圾短信是非常不对的。垃圾短信监控平台生成的文件速度非常惊人,原来的传统做法是,根据每个省的每一个地市,对应一个独立应用,串行化地解析、过滤敏感关键字,来进行入库处理。但是,从现状来看,程序处理的性能并不高效,常常造成文件积压,没有及时处理入库。
现在,我们就通过Storm,来重新梳理、组织一下上述的应用场景。
首先,我先说明一下,该案例中,Storm集群和Zookeeper集群的部署情况,如下图所示:
Nimbus对应的主机是192.168.95.134是Storm主节点,其余两台从节点Supervisor对应的主机分别是192.168.95.135(主机名:slave1)、192.168.95.136(主机名:slave2)。同样的,Zookeeper集群也是部署在上述节点上。Storm集群和Zookeeper集群会互相通信,因为Storm就是基于Zookeeper的。然后先启动每个节点的Zookeeper服务,其次分别启动Storm的Nimbus、Supervisor服务。具体可以到Storm安装的bin目录下面启动服务,启动命令分别为storm nimbus > /dev/null 2 > &1 &和storm supervisor > /dev/null 2 > &1 &。然后用jps观察启动的效果。没有问题的话,在Nimbus服务对应的主机上启动Storm UI监控对应的服务,在Storm安装目录的bin目录输入命令:storm ui >/dev/null 2>&1 &。然后打开浏览器输入:{Nimbus服务对应的主机ip}:8080,这里就是输入::8080/。观察Storm集群的部署情况,如下图所示:
可以发现,我们的Storm的版本是0.9.5,它的从节点(Supervisor)有2个,分别是slave1、slave2。一共的woker的数量是8个(Total slots)。Storm集群我们已经部署完毕,也启动成功了。现在我们就利用Storm的方式,来重新改写一下这种敏感信息实时监控过滤的应用。首先看下,Storm方式的拓扑结构图:
其中的SensitiveFileReader-591、SensitiveFileReader-592(用户短信采集器,分地市)代表的是Storm中的Spouts组件,表示一个数据的源头,这里是表示从服务器的指定目录下,读取疑似垃圾短信用户的垃圾短信内容文件。当然Spouts的组件你可以根据实际的需求,扩展出许多Spouts。
然后读取出文件中每一行的内容之后,就是分析文件的内容组件了,这里是指:SensitiveFileAnalyzer(监控短信内容拆解分析),它负责分析出文件的格式内容。
为了简单演示起见,我这里定义文件的格式为如下内容(随便写一个例子):home_city=591&user_id=5911000&msisdn=10000&sms_content=abc-slave1。每个列之间用&进行连接。其中home_city=591表示疑似垃圾短信的用户归属地市编码,591表示福州、592表示厦门;user_id=5911000表示疑似垃圾短信的用户标识;msisdn=10000表示疑似垃圾短信的用户手机号码;sms_content=abc-slave1代表的就是垃圾短信的内容了。SensitiveFileAnalyzer代表的就是Storm中的Bolt组件,用来处理Spouts“流”出的数据。
最后,就是我们根据解析好的数据,匹配业务规定的敏感关键字,进行过滤入库了。这里我们是把过滤好的数据存入MySQL数据库中。负责这项任务的组件是:SensitiveBatchBolt(敏感信息采集处理),当然它也是Storm中的Bolt组件。好了,以上就是完整的Storm拓扑(Topology)结构了。
现在,我们对于整个敏感信息采集过滤监控的拓扑结构,有了一个整体的了解之后,我们再来看下如何具体编码实现!先来看下整个工程的代码层次结构,它如下图所示:
首先来看下,我们定义的敏感用户的数据结构RubbishUsers,假设,我们要过滤的敏感用户的短信内容中,要包含“racketeer”、“Bad”等敏感关键字。具体代码如下: