2. 广东工业大学 网络信息与现代教育技术中心,广东 广州 510006
2. Center of Internet Information and Modern Education, Guangdong University of Technology, Guangzhou 510006, China
在当前这个信息爆炸的时代,互联网上的数据正以几何级的速度增长.截止2012年1月,新浪微博注册用户数已超过3亿,用户日平均在线时长60 min,平均每天发布超过1亿条微博[1].在这种背景下,云计算(Cloud Computing)的概念被正式提出,立即引起了学术界和产业界的广泛关注和参与.Google是云计算最早的倡导者,随后各类大型软件公司都争先在“云计算”领域进行了一系列的研究部署工作[2].目前最流行的莫过于Apache的开源项目Hadoop分布式计算平台,Hadoop专注于大规模数据存储和处理.这种模型对以往的许多情形虽已足够,如系统日志分析、网页索引建立(它们往往都是把过去一段时间的数据进行集中处理),但在实时大数据处理方面, Hadoop的MapReduce却显得力不从心.业务场景中需要低延迟的响应,希望在秒级或者毫秒级完成分析,得到响应,并希望能够随着数据量的增大而拓展[3].此时,Twitter公司推出开源分布式、容错的实时流计算系统Storm,它的出现使得大规模数据实时处理成为可能,填补了该领域的空白.
1 Storm简介Storm是一个开源的分布式实时计算系统,可以简单可靠地处理大量数据流[4].其主要应用场景为实时分析、在线机器学习、持续计算、ETL、分布式RPC等[5].此外,Storm支持水平扩展,具有高容错性,可以确保每个消息都被处理到,而且具有很高的处理速度,在一个小的Storm集群中,每个结可以达到每秒数以百万计消息的速度.
与其他大数据解决方案相比,Storm有着不同的处理方式.本质上来看Hadoop[6-7]是一个批处理系统,数据被引入Hadoop HDFS,然后分发到各个节点进行处理,当处理任务完成之后,数据结果会再次返回到HDFS供始发者使用.Storm通过创建拓扑结构来处理没有终点的数据流,与Hadoop作业不同的是,这些转换工作会一直进行,持续处理数据流中新到达的数据[8].
1.1 Storm的工作机制Storm主要有两种类型的节点:主节点(Master)和工作节点(Worker).主节点通常会运行一个后台程序,称为Nimbus.它负责发送代码到集群,分配工作任务给每一个工作节点,并监控其运行状态,作用类似于Hadoop中的Job Tracker.工作节点会运行一个名为Supervisor的后台程序,Supervisor负责监听从Nimbus分配给它执行的任务,据此启动或停止执行任务的工作进程[9].在集群系统中,一般一个节点上运行一个或多个工作进程,每一个工作进程都会执行一个Topology任务的子集.一个Topology任务往往需要分布在不同工作节点上的多个工作进程来执行.
如图 1所示,当一个Topology定义好后被提交,首先会由Storm提供的方法把jar包上传到Nimbus,它会对Storm本身和Topology进行校验,主要检查Storm的状态是否为Active以及Topology是否有同名的实例在运行.接着,Nimbus对每个Topology都会做出详细的预算,如工作量(多少个Task),它会根据Topology中定义的parallelism hint参数,来给Spout/Bolt设定Task数目,并且分配与其对应的Task-id,再把分配好Task的信息写入Zookeeper [10]上的/task目录下.然后Nimbus会给Supervisor分配工作,方法是把任务信息写在Zookeeper的/assignments.Supervisor每隔一定时间都会查看/assignments目录,检查Nimbus是否有新任务分配,当有新提交的任务时,它会先下载代码,然后根据任务信息安排Worker执行这些任务.
|
图 1 Topology提交的流程图 Figure 1 The flow chart of Topology |
如图 2所示,在Storm集群中Nimbus和Supervisor都是无状态的,并且两个模块之间没有直接的数据交互,所有的状态都是保存在Zookeeper,Nimbus通过写入Zookeeper来发布指令.而Supervisor则通过读取Zookeeper节点信息来执行这些指令.同时Supervisor和Task会定时发送心跳信息到Zookeeper,使得Nimbus可以监控整个Storm集群的状态.当有Task节点挂掉时Nimbus能够快速使之重启.这种工作方式使得整个Storm集群十分健壮,任何一台工作机器突然失效都不会影响到整个系统的正常运行,只需重启失效节点后再从Zookeeper上面重新获取状态信息即可.
|
图 2 Storm数据交互图 Figure 2 The data interaction diagram of Storm |
Storm是Twitter开源的一个实时数据处理框架,Twitter每天约3.4亿条的推文正是用Storm进行实时分析处理. Storm实现了一种流式处理模型,流是一组有顺序并连续到达的数据序列[11].在Storm设计思想中,把流中的事件抽象为Tuple即元组,把源头处理抽象为Spout,把流的处理抽象为Bolt.这种思想大大简化了分布式实时并行处理程序的开发难度.
在Storm计算模型中,主要有两种类计算过程,源头处理过程Spout和中间处理过程Bolt.因此需要用户去实现ISpout和IBolt这两种类型的接口.作为Storm中的消息源,Spout用于Topology生产消息,一般会不断地从外部数据源(如Message Queue、NoSQL、RDBMS、Log File)读取然后发送消息给(Tuple元组)Topology.之后消息会以某种方式传给Bolt, 作为Storm的消息处理者,Bolt可以执行过滤、聚合、数据库查询等操作或与外部实体通信,可以根据情况选择储存数据,或是把数据传给下一级Bolt.
Bolt既可实现传统MapReduce之类的功能,也可实现更复杂的操作,如过滤、聚合等.如果对两个组件数据发送有特殊要求,例如在应用场景中需要把相同key值的元组发送到同一个Bolt下进行统计,可使用Storm提供的数据流分发(Stream Grouping)策略来解决这一问题.为提高处理效率,可以在一个流上加入多个Spout和多个Bolt.典型的Storm拓扑结构会实现多个转换,因此需要多个具有独立元组流的Spout.如图 3所示,Storm集群是由许多Bolt组件组成的链式处理结构,每个Bolt对Spout发射出的数据进行各种转换操作.
|
图 3 Storm数据流网络图 Figure 3 The data flow network diagram of Storm |
使用Storm可以轻松地实现MapReduce功能[12].如图 4所示,Spout生成文本数据流,Bolt实现Map功能(令牌化一个流中的各个单词).来自“map” Bolt的流然后流入一个实现Reduce功能的Bolt中.
|
图 4 Storm实现MapReduce功能 Figure 4 Implementation of the function of MapReduce with Storm |
作为一个新生的网络社交平台,微博成功吸引了数以万计的粉丝进行互动交流.Bernard J. Jansen, Mimi Zhang等对150 000条微博进行分析,发现19%的微博会提及某个品牌,其中大约20%的微博会包含一些观点,因此快速并准确地掌握民众实时关注的热点信息对企业在未来的市场竞争中占领先机具有重要作用[13-14].以下为微博热词分析统计系统的设计,通过系统日志实时分析出当前热门词汇.
2.1 算法实现流程用一个Bolt对流进行全局分组,并在内存中维护一个Top N的列表.由于整个流都会发送到一个节点进行处理,所以这种方式对于大的数据量显然没有可伸缩性.一种更好的方式是并行计算流的各部分Top N,然后把局部的Top N合并在一起,再计算出一个全局的Top N.计算流程如图 5所示,计算步骤如下:
|
图 5 热词分析流程图 Figure 5 The flow chart of hot word analysis |
(1) Spout获取微博内容后把消息发送出;
(2) 运行若干个Bolt进行count统计,即每个key值出现的次数,然后把统计结果发射;
(3) 运行一组Bolt进行Rank计算,它们负责一部分数据的Top N计算, 然后把结果数据发送到下一级Bolt;
(4) 利用一个全局的Bolt来合并这些机器上计算出来的局部Top N结果,合并后得到最终全局的Top N结果.
2.2 Spout实现Spout()是topology的数据源Spout,在服务器上不间断的有微博产生,本实例从Redis读取记录, 产生数据流“keyword”,输出Fields是“keyword”,核心代码如下:
public void nextTuple()
{
……
String content=jedis.rpop(“record”); //从Redis读取记录
if(content= =null||”nil”.equals(content)) {
睡眠300微秒等待}
try
{
JSONObject obj=JSONValue.parse(content);
String keyword =obj.get(“keyword”);
collector.emit(new Values(keyword)); //提交keyword
}
……
}
2.3 Count实现“word”流入Count这个Bolt中进行keyword count计算,为了保证同一个keyword的数据被发送到同一个Bolt中进行处理,按照“keyword”字段进行field grouping;在Count中会计算各个keyword的出现次数,然后产生“count”流,输出“keywordcount”和“count”两个Field,核心代码如下:
public void execute (Tuple tuple) {
/*如果keyword不在哈希表中, 则加入哈希表, 如果在其中, 则把计数加1*/
if(!counters.containsKey(str))
{
counters.put (str, 1);
} else
{
Integer c = counters.get (str) + 1;
counters.put (str, c);
}
collector.emit(new Values(obj, totalObjects(obj)));
//确认此元组处理完
collector.ack (input);
}
2.4 Rank实现Rank这个Bolt按照“count”流的“keywordcount”字段进行field grouping;在Bolt内维护Top N个有序的链接,如果超过Top N个链接,则将排在最后的链接数据移除,同时每隔一定时间(2 s)产生“rank”流,输出“list”字段,计算出的局部Top N结果会到下一级数据流“merge”流,核心代码如下:
public void execute(Tuple tuple, BasicOutputCollector collector)
{
Object tag = tuple.getValue(0);
Integer existingIndex = _find(tag);
//如果元素不存在, 再加入集合中, 如果存在就更新值
if (null!= existingIndex)
{
rankings.set(existingIndex,tuple.getValues());
} else
{
rankings.add(tuple.getValues());
}
对rankings列表进行排序
if (rankings.size() > _count) {
移除末尾的元素
}
long currentTime = System.currentTimeMillis();
if(时间过去2 s)
发射列表,更新当前时间
}
2.5 Merge实现Merge这个Bolt会按照“rank”流的进行全局的grouping,即所有上一级Bolt产生的局部Top N结果“rank”流都流到这个“merge”流进行处理;Merge的计算逻辑和Rank类似,只是将各个Merge的Bolt合并后计算得到最终全局的Top N结果,代码不再赘述.
3 实验结果与分析 3.1 实验环境本文使用了5台计算机搭建实验平台,建立了一套Storm集群系统进行分布式并行计算,各个计算节点所用的操作系统为Linux CentOS6.3,内核版本为2.6.18,使用的Storm版本为0.9,JDK版本为jdk1.7.0_40.其中一台计算机用来运行Nimbus守护进程和Supervisor守护进程,其余4台只运行Supervisor,每个Supervisor节点最多启动6个Worker进程.
3.2 性能测试为方便比较,本实验使用Java语言开发了一个普通分布式版本的处理系统[15],运行相同的线程数,在相同机器配置上进行微博热词统计测试.实验结果如图 6所示.
|
图 6 Strom并行数与处理时间对照表 Figure 6 The time table of storm parallel processing |
从图 6中可以看出,Storm版本的处理性能优于普通分布式版本, 处理相同的一批任务,用Storm实现的版本需要花费的时间更短.
3.3 横向扩展测试为测试Storm横向扩展给系统性能带来的提升,笔者进行了分组实验.实验分为3组,每组分配给Topology不同的Worker数目, 实验中Executors和Tasks的个数不变.每组分配的任务数不同,统计每组任务平均消耗的时间,实验结果取3次的平均值.
由图 7可知,随着处理节点数的增加,集群的处理能力也会随着线性增长.遇到性能瓶颈时,可以通过提高瓶颈节点的并行任务数量来提高性能. Storm适用于解决高度并行性的海量数据型实时计算问题,并能将计算结果实时反馈给用户.使用Storm处理并行计算问题时,用户只需把任务分解成单个节点的计算单元,利用传统的串行算法写出计算函数和接收函数.
|
图 7 水平扩展性能测试 Figure 7 The performance test for extending |
本文分析了基于Storm应用的工作机制和编程模型,并基于Storm成功实现了实时微博热词分析统计系统, 实现了在低端机组成的集群上进行的分布式算.
| [1] |
新浪微博数据中心. 2012年新浪微博用户发展报告[EB/OL]. [2014-06-15]. http://data.weibo.com/report/report?copy_ref=AEdhAAT9K9K7&_key=INEZOM.
|
| [2] |
陈康, 郑纬民. 云计算:系统实例与研究现状[J].
软件学报, 2009, 05(5): 1337-1348.
Chen K, Zheng W M. Cloud computing: system instances and current research[J]. Journal of Software, 2009, 05(5): 1337-1348. |
| [3] |
Leibiusky J, Eisbruch G, Simonassi D. Getting Started With Storm[M]. US: O'Reilly Media, 2012.
|
| [4] |
The Apaehe Foundation. Storm official website[EB/OL]. [2014-04-08]. http://storm-project.net/.
|
| [5] |
Github Inc. Storm Wiki[EB/OL]. [2013-12-07 ]. https://github.com/nathanmarz/storm/wiki.
|
| [6] |
The Apaehe Foundation. Apache Hadoop[EB/OL]. [2014-03-03]. http://hadoop.apache.org/. White T. Hadoop: The definitive guide[M]. US: O'Reilly Media, 2012.
|
| [7] |
White T. Hadoop: The definitive guide[M]. US: O'Reilly Media, 2012.
|
| [8] |
金晓军. Trident Storm与流计算经验[J].
程序员, 2012(10): 99-103.
Jin X J. Trident Storm and flow calculation experience[J]. Journal of Programmers, 2012(10): 99-103. |
| [9] |
Petko V. Integrating parallel application development with performance analysis in periscope[J].
IPDPS Workshops, 2010: 1-8.
|
| [10] |
The Apaehe Foundation. Apache ZooKeeper[EB/OL]. [2014-03-18]. http://zookeeper.apache.org/.
|
| [11] |
顾伟. 分布式流数据实时计算框架的研究和开发[D]. 杭州: 浙江理工大学信息电子学院, 2013.
http://cdmd.cnki.com.cn/Article/CDMD-10338-1013290539.htm
|
| [12] |
Dean J, Ghemawat S. MapReduce: simplified data processing on large clusters[J].
Communications of the ACM, 2008, 51(1): 107-113.
DOI: 10.1145/1327452. |
| [13] |
白晓晴. 微博互动营销的优势及策略分析[J].
企业技术开发, 2010, 29(005): 20-21.
Bai X Q. Research on the advantages and strategies of micro-blog interactive marketing[J]. Technological Development of Enterprise, 2010, 29(005): 20-21. |
| [14] |
孙擎. 浅析国内微博营销面临的挑战[J].
中国商贸, 2011(3): 28-29.
Sun Q. Research on the challenges of domestic micro-blog marketing[J]. China Business & Trade, 2011(3): 28-29. |
| [15] |
林昊.
分布式Java应用基础与实践[M]. 北京: 电子工业出版社, 2010.
|
2014, Vol. 31