2016年1月,中国互联网信息中心(China Internet Network Information Center,CNNIC)发布了《第37次CNNIC报告:中国互联网络发展状况统计》[1],报告统计了目前中国互联网大环境的发展情况。数据显示,截至2015年12月,中国的网民规模达6.88亿,全年共计新增网民3 951万人;互联网普及率为50.3%,较2014年底提升了2.4个百分点,同时根据IDC(International Data Corporation,IDC)发布的数字宇宙报告显示,至2020年数字宇宙将超出预期,达到40 ZB,相当于地球上人均产生5 247 GB的数据[2]。上述数据意味着大数据时代已经来临,大量的信息呈现在用户面前。比如国内最大的电商平台淘宝网每日访问用户达6 000万,每日在线商品数目已经超过了8亿件。面对急速增长的数据规模,用户正面临着“信息超载问题”,如果不借助于大数据分析系统和搜索引擎等辅助技术,用户从海量的互联网资源中找到自己真正感兴趣的信息是一件非常困难的事情,使得信息的有效利用率反而降低了。
在大数据时代,数据的来源已经不再是人们所关心的问题,如何从形态多样的海量数据中高效、快速、及时地挖掘出有用的信息这才是关键,这也大数据分析面临的难题。单机系统不能满足海量数据分析处理要求,Hadoop系统的开源解决了此难题;基于Google GFS(Google File System)[3]实现的HDFS(Hadoop Distributed File System)解决了海量数据的存储问题;MapReduce则实现了海量数据的分布式计算,这大幅度降低了大数据处理的门槛,使得海量数据处理成为一种可能。虽然Hadoop系统具有近乎线性的扩展能力、良好的容错性、分布式的计算能力等优势,是很多公司处理海量数据的首选方案,但是,它仍存在一个关键的缺陷——缺乏实时性。Hadoop主要用于海量数据的离线计算,因此,从数据的产生到得到最终的数据结果之间存在时间差,不能满足实时性要求高的应用要求。
实时化、内存计算、泛在化(普适应计算或环境智能化)、智能化是当今大数据技术的四大发展趋势[4]。实时化作为发展趋势之一,已经受到越来越多的关注。而数据的实时处理,首先需要在数据产生的时候将其转为源源不断的数据流,并将数据流发送给流处理系统,然后由流处理系统对数据流进行在线(实时,real-time)或近线(接近实时,near-real-time)分析。流数据处理系统的核心思想是:从不断流入的新数据中提取感兴趣的、有效的信息,缩减数据从产生到被利用的时间间隔,丢弃部分无效数据或者已经被处理后的原始数据,在获取有效信息的同时避免存储海量原始数据,降低数据的存储成本。
大数据环境下的数据流具有五大特点:实时性、易失性、无序性、突发性、无限性[5-6],因此,在大数据流处理过程中面临着一些新的挑战[7]:实时性要求高、流入数据量不可预知、数据和计算的持续性、数据计算要求高的可靠性。分布式流处理系统是解决大数据流最理想系统,它具备了分布式系统的可扩展性和容错性等优势,同时又很好地应对了大数据流的各种挑战。
分布流处理系统是一个很复杂的系统,它由多个子系统组成,并且需要不同的子系统之间相互分工、共同协作[3]。一个完整的流式数据处理系统由4部分组成:1) 数据收集系统(Data Collection System),用于收集、汇总原始数据。2) 消息队列系统(Message Management System),对收集到的实时数据进行转存、维护,并将数据传送给数据处理和分析系统,是原始数据的中转站、缓冲区。所谓的消息,是接收到的一条一条的数据。3) 数据处理系统(Data Processing System),是整个流式数据处理系统的核心。它主要负责从消息队列系统或其他数据发送系统中获取数据,并进行及时的业务逻辑处理和分析,然后将处理后的结果数据保存到数据库系统中或直接传送给其他业务处理系统,作进一步的分析和展示。4) 数据存储系统(Data Storage System),是处理后的数据的最终归属地,也是连接流式数据处理系统跟其他系统之间的桥梁。本文将对大数据环境下的一个完整分布式流处理系统四个组成部分所采用的技术进行介绍和探析,同时介绍一种分布式拒绝服务(Distributed Denial of Service,DDoS)攻击检测数据流处理系统结构,为大数据环境下的数据流处理理论研究和应用技术开发提供参考。
1 数据收集海量的数据是大数据出现的前提,而数据收集则是大数据的基石。日志数据收集在流数据收集中占有重要比重,许多公司的业务平台每天都会分散的产生大量日志数据,收集并汇总这些业务日志数据,供离线和在线的分析系统使用。日志收集系统所需考虑的基本特征包括:高可靠性、高可用性和高可扩展性。“分散收集,集中处理”是当前日志处理系统的一个主流思想。日志收集也是流式日志处理系统的前提和基础,日志只有被实时收集、汇总之后,才能进行后续的相关处理操作。下面针对当前流行的开源日志数据流收集系统,进行介绍和对比。
1.1 ScribeScribe[8]是Facebook为了满足内部大量日志处理而设计的日志收集系统。它能将分散在不同服务器上的不同应用日志汇总到中央存储系统,通常是将日志存入HDFS中,为日志的集中处理提供了有力的保障。
Scribe数据的传递依赖于Thrift[9],Thrift通过一个中间语言(接口定义语言),来定义远程过程调用(Remote Procedure Call,RPC)的接口和数据类型,然后通过编译器,生成不同语言的代码,是跨语言服务的部署框架)。其架构如图 1所示,由以下三部分构成:
![]() |
图 1 Scribe 体系架构 Figure 1 Scribe architecture |
1) Scribe Agent。位于日志产生的应用服务器上,实质是一个Thrift客户端,通过RPC负责将应用系统产生的日志,发送到汇总服务端。
2) Scribe Collector。完成多个Agent发送过来的数据接收,并将数据存入可靠的存储介质中,如:本地磁盘、HDFS等,此部分并不是Scribe日志收集系统中的必需部分,可以跳过Collector直接将日志从Agent存入到存储系统中。
3) 存储介质。Scribe已经实现了向不同类型的存储介质中写入数据的功能,包括文件系统(如HDFS,位于本地磁盘或共享式的存储系统中),网络(直接发送给其他Scribe),缓存(可满足故障恢复的要求,数据优先写入主存储中,若主存储故障,则存入到备份的存储中),多存储介质(同时将数据写入不同的存储系统中,达到数据备份的目的)。
从架构上分析,Scribe能在一定程度上保证数据不丢失。Scribe进程能将消息在内存中缓存一段时间,但是当Scribe Agent出现故障时,这些缓存的数据就会丢失,因此,从这方面来讲,Scribe不能严格保证数据可靠性。
1.2 FlumeFlume最初是由Cloudera的工程师设计用于合并日志数据的系统[10],后将其开源出来,并逐渐发展成为一款开源、高可靠、高扩展、易管理、支持客户扩展的分布式数据流采集系统,主要是用于日志数据的收集和聚合。
在原始的Flume版本中,一个完整的Flume系统由Agent(用于采集数据)、Master(配置及通信管理)、Collector(对数据进行聚合)构成。而重构后的新版Flume也称为Flume NG(Next Generation),其系统中只有Agent一种角色。图 2为Flume NG的架构,由分布在不同节点的Agent负责收集不同的应用所产生的数据,并发往汇总的Agent节点,最后存入大容量、高可靠的存储系统,如:HDFS。
![]() |
图 2 Flume架构 Figure 2 Flume architecture |
每一个Flume Agent的内部都是由Source、Channel以及Sink组成。Source即为要收集数据的来源,负责产生或接收数据,并发往Channel。Channel则是负责接收来自Source的数据,并传送到Sink,负责对数据提供可靠性保证。Sink则是从Channel拉取数据,并将数据写入到后端的存储系统中,已经实现的Sink包括:HDFS Sink(将数据写入到HDFS中)、Hive Sink(将数据存入Hive中)、Avro Sink(将数据以Avro的方式进行序列化,并发往后端的Avro接收端,也可以是Flume的Avro Source)等若干常见的数据存储和接收系统。
1.3 ChukwaChukwa是Apache旗下的一款开源数据收集软件[11],它可以将不同类型的数据汇聚成适合Hadoop处理的文件,并保存在HDFS中,并与Hadoop集成,可以快速方便地进行各种MapReduce操作。Chukwa本身已经实现了很多内置的功能,能够用于数据的收集和整理。
Chukwa的架构如图 3所示,由Agent、Collector以及HDFS构成。Agent是运行在不同节点上负责收集数据的程序,而Agent又由多个Adapter组成,并由Adapter执行实际的数据收集工作;Collector负责接收不同的Agent发送过来的数据,并负责将数据写入HDFS;HDFS是Chukwa中数据的最终存储系统,能够满足海量数据的存储要求,并具有很好的容错性、可用性、扩展性。Chukwa非常适合于将数据收集后需要进行MapReduce操作的应用场景。
![]() |
图 3 Chukwa架构 Figure 3 Chukwa architecture |
LogStash[12]是著名的开源数据栈ELK (ElasticSearch,Logstash,Kibana)中的那个L,其主要功能就是进行数据的收集,配合ElasticSearch进行数据索引和检索,Kibana用于数据的展示,即为一个完整实时数据分析平台。LogStash是一款轻量级的日志收集处理软件,可以极其方便地把分散的、多样化的日志收集起来,并能根据业务需求实现自定义的处理,然后传输到指定的位置,比如某个服务器或者文件。
图 4为LogStash架构,Input Plugin负责从不同的地方接收或读取数据,转化为LogStash所支持的事件格式,其已经实现了几十种常用的Input Plugin。Filter Plugin则用于根据自定义的规则对事件进行过滤或转为特定的格式。Output Plugin则将事件发往指定的存储位置,将数据进行持久化存储,完成数据的汇总。LogStash已经有非常丰富的Plugin,而且,如果已有的Plugin不能满足要求,还能通过自己编码来实现自定义的Plugin,因此,其灵活性非常好。
![]() |
图 4 LogStash架构 Figure 4 LogStash architecture |
日志数据流收集系统具备三个基本组件,分别是Agent(接收原始数据,并将数据发给Collector)、Collector(接收多个Agent发送过来的数据,汇总后将数据发往Store)、Store(中央存储系统,将汇总后的数据进行持久化存储)。表 1综合对比了Scribe、Flume、Chukwa、LogStash四种日志数据流收集系统。
![]() |
表 1 四种日志数据流收集系统对比 Table 1 Comparison of four kinds of log data stream collection system |
所述四种数据流收集系统都具备一定高可用和扩展性,且都是开源的系统,完全可以进行二次开发,完成功能的自定义扩展。由表 1综合考虑,Flume在各个方面都具备一定优势,是一款通用的数据流收集软件;若需要对数据流进行检索,则LogStash是非常不错的选择;若要实现对收集的数据流进行MapReduce操作,则可以选择Chukwa。
2 消息队列管理技术在离线数据处理系统中,只需要将数据进行汇总到中央存储系统,然后对汇总后的数据定期的集中处理即可。在数据流处理系统中,由于数据是源源不断流入,且需要对新增的数据进行实时处理。相比于离线数据处理系统,数据流处理系统中需要一个消息队列系统充当数据缓冲区的角色,一方面快速接收数据收集系统发送过来的数据,另一方面,当数据处理系统处理能力未达到满负载时,尽量快地发送数据给数据流处理系统,当处理系统达到满负载时,缓存接收到的数据,减缓数据发往数据流处理系统的速度。下面主要介绍当前比较流行的几款消息队列系统。
2.1 RabbitMQRabbitMQ[13]是一个由Erlang开发的、基于高级消息队列(Advanced Message Queue,AMQP)协议[14]的开源消息系统。其最初诞生于金融应用系统,用于转发存储分布式系统中的消息,在扩展性、易用性、高可用性等方面优势突出。
如图 5所示,RabbitMQ中包括Producer(消息产生者)、Broker(消息队列管理者)和Consumer(消息使用者)。
![]() |
图 5 RabbitMQ架构 Figure 5 RabbitMQ architecture |
Broker中的Exchange(消息交换机)负责接收Producer发送过来的消息,并将消息根据Routing Key(路由关键字)绑定到不同的Queue(消息队列)。每一条消息都会被绑定到至少一个Queue,而每一个Queue则是若干条消息的实体。Consumer再从不同的Queue中读取数据,进行后续的分析和计算。
2.2 ZeroMQZeroMQ[15]是一个非常轻量级的消息系统,也是一种基于消息队列的多线程网络库。它是网络通信中新的一层,介于应用层和传输层之间,像框架一样的一个Socket library,大幅度简化了Socket编程,而且性能更高效。与传统消息队列管理系统不同的是,ZeroMQ不再需要一个消息服务器(Broker)来存储转发消息,而是直接在发送端缓存。ZeroMQ是一个可嵌入的并发框架,不需要独立部署任何服务进程,但需要在其提供的API(Application Program Interface)基础上编程实现消息管理逻辑,从这方面来讲,它是一个比较复杂的系统。ZeroMQ设计初衷就是为了尽可能快地发送消息,且其具有良好的跨平台、跨语言特性,能够在Windows、Linux、OS X下运行,能支持超过20种编程语言的编程操作。
图 6ZeroMQ架构图中的I/O(Input/Output)线程所涉及的I/O操作都是异步的。ZeroMQ会在初始化时要求用户传入接口参数,并根据这些参数创建对应的I/O线程,每个I/O线程都有与之绑定的Poller(轮询器),Poller则采用Reactor模型[16]与不同操作系统平台的I/O模型进行通信。主线程与I/O线程通过消息盒子(Mail Box)进行通信。Server开始监听或者Client发起连接时,在主线程中创建连接器或监听器,通过消息盒子以发消息的形式将其绑定到I/O线程,I/O线程会把连接器或监听器添加到Poller中用以侦听读/写事件。Server与Client在第一次通信时,会发送认证标识符,用以进行认证。认证结束后,双方会为此次连接创建会话(Session),以后双方就通过会话进行通信。每个会话都会关联到相应的读/写管道,主线程收发消息只是分别从管道中读/写数据。会话并不直接跟Kernel交换I/O数据,而是通过Plugin到会话中的Engine来与kernel交换I/O数据。
![]() |
图 6 ZeroMQ整体架构 Figure 6 ZeroMQ architecture |
Kafka[17]是由LinkedIn开发的,作为其运营数据处理管道(Pipeline)和活动流(Activity Stream)的基础,并于2010年将其开源,成为Apache下一个子项目。经过几年的发展,现在它已被用作数据管道和消息系统广泛的使用在不同应用领域。Kafka作为一个高性能的分布式发布/订阅(Publish/Subscribe)消息队列系统,其具有以下特性:1) 高吞吐量,能在低性能的设备上达到每秒数十万的消息读写速度;2) 支持水平扩展,当集群吞吐量不能满足需求时,只需要增加设备,就能让其吞吐量近似线性地增长;3) 容错性好,不管消息有没有被消费掉,都可以将数据存储在磁盘上,可以对消息进行多次读取,且可以自动将消息拷贝到不同的机器上,实现数据的冗余;4) 保证消息有序,通过将消息分区存储,能保证每一个分区中的数据都能被有序地消费。
Kafka的整体架构如图 7所示,包括三种角色:生产者(Producer),向Kafka集群发送数据的一端,由不同的数据收集系统和组件构成;代理集群(Broker Cluster),运行Kafka相关进程的一端,负责接收来自Producer的数据,并将数据转发给Consumer;消费者(Consumer),即数据的使用者,如实时数据应用系统等,完成对数据作业务逻辑相关的处理。
![]() |
图 7 Kafka整体架构 Figure 7 Kafka architecture |
在Kafka中每一条消息至少属于某一个主题(Topic),一个Topic则是某一类消息的分组,并根据消息的Topic进行分区(Partition)并分散到不同服务器上的日志(log)文件中按顺序存储。每条消息所在的文件中会有一个不断增长的长整型偏移量(offset),通过offeset能唯一标识一条消息。Kafka中消息存储和消费有关的状态信息,如:offeset,都是通过Zookeeper[18]来保存。虽然Kafka将数据存储到了磁盘中,但是磁盘的顺序读写速度是非常快,甚至能超过内存的随机读写速度,且Kafka中使用了Zero-Copy[19]技术,因此Kafka能保证消息的快速读取。
这三款消息队列系统都是非常优秀的,有很多共性,也有一些区别(如表 2)。
![]() |
表 2 消息队列系统对比 Table 2 Comparison of message queuing systems |
1) RabbixMQ采用通用高级消息队列协议(AMQP),得到很多到公司的支持,且其能很好地支持消息的事物机制、数据的持久化,非常适用于金融行业,但是在相同的配置情况下,其吞吐量比另外两款消息队列系统要低很多。
2) ZeroMQ实质上是一个基于Socket的可嵌入的并发框架,其并没有完整的实现消息队列管理系统,而是需要用户通过调用相关的API来完成对消息的管理,因此,其使用起来要稍微复杂一些。其底层的相关技术,能够尽快地发送消息数据,其吞吐量非常大,但是,不提供数据的持久化支持,即消息被消费者接收后,就不能再次读取,因此在一些需要非常高的吞吐量且不需要多次读取消息,且也能容忍系统故障时丢失部分数据的场景中比较适合。
3) Kafka则是RabbitMQ和ZeroMQ的折中方案,能支持消息的持久化,在尽可能保证数据不丢失的同时,又使用Zero-Copy技术及顺序的存储和读取消息机制,使其具有很高的吞吐量。其扩展性也是非常的出色,只需要增加相应的设备,即能使其吞吐量达到几乎线性地增长。Kafka比较适合互联网的应用场景,在很多的互联网公司都被广泛地使用。
3 分布式流式数据处理技术数据被实时地收集和汇总形成数据流,为了尽快得到实时应用系统需要的数据结果,需要数据分析系统能尽快完成对原始数据的处理。在大数据环境下,单台服务器很难满足短时间内大量的数据计算要求,且考虑到业务和数据的增长,这些都要求数据分析系统具有良好的扩展性。下面介绍目前几种主流的分布式数据流处理系统。
3.1 StormStorm[20]最初是由Twitter开发并开源的、基于分布式的实时数据处理系统,在Twitter、Yahoo、Alibaba等很多知名的大公司都得到广泛的应用。其具有很好的容错性、扩展性,且能到次秒级的延时,非常适合于低延时的应用场景。其组成系统组成为:
1) Nimbus,集群的主节点,负责集群资源的管理、任务的调度分配。
2) Supervisor,负责接收Nimbus分配的任务,启动和停止属于自己管理的工作进程。
3) Zookeeper,是Storm重点依赖的外部组件,提供Supervisor和Nimbus之间协调的服务,Nimbus和Supervisor心跳和任务运行情况都是保存在Zookeeper上。
Storm实现的数据流模型如图 8所示,包括:Topoloy(拓扑),类似于Hadoop上的MapReduce任务,数据在节点之间流动方向所组成的一个图,且包含数据的处理逻辑;Tuple(消息元组),最小的消息处理和传递单元,每个Tuple都是不可变数组;Spout(喷嘴),从Storm外部接收数据转为内部的数据来源,并将原始数据转为处Tuple;Bolt(螺栓),接收来自Spout或上一级的Bolt的Tuple,在其内部作简单的数据转换和计算,并产生多个输出Tuple流,发送给其他的Bolt,协作完成复杂的计算逻辑。
![]() |
图 8 Storm数据流模型 Figure 8 Storm data flow model |
如图 8所示,Storm会通过Spout将外部的流式数据读入Topology中,将其转为消息处理单元Tuple,且给每一个Tuple分配一个消息ID(Identity),开始消息的处理流程。再将Tuple输出到Bolt中,对于复杂的数据处理过程,Storm会将其分解成若干个简单的处理逻辑,并根据特定的顺序在不同的Bolt中进行处理和流通,直到经过最后一个Bolt的计算,此时才会将该消息ID标记为处理完成。
3.2 SamzaSamza[21]是LinkedIn开源的一个分布式流处理系统。Samza具有一些非常优秀的特性:通过简单的API可以非常方便地处理流式数据;具有很好的容错性,能在用户没有感知到的情况下恢复处理失败的任务;任务状态管理,出现故障时,能快速准确地恢复到失败之前的状态。
一个完整的Samza系统由三个组件构成:①Kafka为Samza提供实时的消息数据来源,也可以作为Samza数据处理后的数据存储系统;②Samza进行流式数据处理,用户可以使用它提供的API简单方便的处理流式数据,而不用关心处理过程及容错性等的管理;③Yarn[22]是Samza中的资源分配和任务管理系统,客户端(Client)提交任务时会向Yarn集群中的RM(资源管理器,Resource Manager)申请资源,RM以容器(Container)的形式将资源封装起来,并在容器里执行相应的Samza计算任务。
图 9为Samza系统的流式数据处理模型。当用户提交一个任务时,首先会向Yarn中的Resource Manager申请所需的资源。接着Yarn在Node Manager节点上启动容器,供Samza运行相应的任务。然后Samza进程从Kafka中不同分区中实时拉取数据,并进行相应的计算。最后将处理后的结果再次存入到Kafka进入到下一轮的计算或者输出到其他存储系统。
![]() |
图 9 Samza流处理模型 Figure 9 Samza stream processing model |
Flink[23]起源于柏林理工大学的一个研究性项目,2014年被Apache孵化器所接受,并迅速地成为了ASF(Apache Software Foundation)的顶级项目之一。Flink是一个能同时适用于流数据和批处理的分布式处理引擎,其体现了一个最新的设计理念:数据处理应该是流式的,批处理只是流处理的一个特例,也就是说所有的任务都可以当成流来处理。这也是Flink跟其他流处理系统的最大区别。
图 10展示了Flink流计算的数据处理模型,在分布式数据处理系统中数据会在多个节点(Node)之间进行传输,在不同节点之间的数据传输分为两种情况:
![]() |
图 10 Flink数据处理模型 Figure 10 Flink data processing model |
1) 流处理。对实时达到的数据流在一个节点上处理之后,会将处理后的结果缓存在当前节点中,并立刻将数据传输给后续节点,进行下一步的处理,一直重复这个流程,直到得到最终结果。
2) 批处理。当前节点会把需要处理的所有数据逐条处理,序列化并缓存起来,但不会立刻将该处理后的结果发送给下一个节点,当缓存不足时,会将数据持久化到磁盘,只有当所有数据都被处理完成后,才会将处理后的数据通过网络传输到下一个节点。
Flink通过设置缓存数据的超时时间,来同时应对流处理和批处理系统:若超时时间为0,则会执行上述1) 中的流程;若超时时间为无穷大,则执行2) 的数据处理流程。此外,还可以通过设置超时时间的长短,来达到调节流处理延时的目的。
3.4 Spark StreamingSpark Streaming是Spark[24]中用于流处理的一个组件。Spark是一个通用的并行计算框架,由加州伯克利大学(UCBerkeley)的AMP(Algorithms Machines People)实验室开发,并于2010年开源,2013年成长为Apache旗下为大数据领域最活跃的开源项目之一。Spark也是基于MapReduce模型实现的分布式计算框架,拥有Hadoop MapReduce所具有的优点,并且增加了很多优秀的特性。
Spark同样适用于批处理和流处理,其数据处理的实现都是基于弹性分布式数据集(Resident Distributed Dataset,RDD)[25]。RDD是其本质是一个基于内存的数据集,记录了数据块列表、数据块上的数据如何转化的函数、与父RDD之间的依赖关系(Lineage)、以及针对Key-Value类型数据的分区函数、数据的偏好位置(用于数据计算本地化)。Spark Streaming是Spark生态系统中用于处理流式数据的一个模块,其本质是将用户设定的固定时间内新接收的数据转为一个RDD,进而分成很多小段的批处理,此操作称为离散流(Discretized Stream,DStream)[26]。一个DStream实质是一个时间间隔很短的微批处理(micro-batching),所以,Spark Streaming的本质是将所有的数据处理形式都当作批处理对待。
图 11为Spark Streaming流式数据处理模型,在Spark Streaming中会把每个时间间隔内新接收的数据存入到一个新的RDD中,然后对每一个RDD进行相同的转化操作(Transformation)和动作(Action),且它能支持窗口(Window)操作,即将不同时刻得到的RDD的数据统一进行操作,使之成为一个新的RDD,这样就可以将新数据跟历史数据相结合。
![]() |
图 11 Spark Streaming数据处理模型 Figure 11 Spark Streaming data processing model |
Spark Streaming流处理的理念与其他的流处理系统存在本质区别,但是在很多的应用场景中是可以容忍秒级别的延时,且将流数据进行微批处理在一定程度上能提高系统的吞吐量。此外,它在Spark的全栈式生态系统中,能很好地与批处理、Spark ML(Machine Language)、Spark Graph、Spark SQL(Structured Query Language)等相结合,解决数据的后续处理与分析问题,这是其他系统所不能比拟的。
3.5 分布式流式数据处理系统对比表 3中从不同角度比较Storm、Samza、Flink和Spark Streaming和特性。
![]() |
表 3 流式数据处理系统对比 Table 3 Comparison of stream data processing systems |
在实时性方面,Spark Streaming由于采用微批处理的方式,所以延时最大,会存在秒级的延时,而其他三个流处理系统都是次秒级的延时。
分布式系统都会重点考虑容错性,因此,这些分布式流处理系统都具有很好的容错性。
而在语言支持上,Storm支持C/C++、Python、基于JVM(Java Virtual Machine)等大多数编程语言,相对而言,其语言支持特性是最好的,而且其他的系统一般只支持Python和基于JVM的编程语言。
在状态管理上,Storm最初是不支持状态管理的,后来才提供高层抽象——Trident来支持状态管理,而其他流处理系统都支持状态管理。
数据处理语义方面,都能保证数据至少被处理一次,这种方式在特定场景下,会存在部分数据被多次处理的情况,而Storm、Spark Streaming、Flink通过特定的配置,能达到数据刚好被处理一次的要求。
适用场景及生态系统完整性方面,Spark Streaming是最全面的,既支持批处理,又支持流处理,且还支持分布式图计算、机器学习库等高级功能;Flink紧跟Spark的步伐,也具备非常完善的生态系统,Storm也有部分其他的功能支持,而Samza则只支持流处理,缺乏其他场景的应用支持。
4 数据存储技术一方面,在一些场景中需要将海量的原始数据保存一段很长的时间,供后续的数据分析及防止系统故障导致的数据丢失。另一方面,在流式数据处理系统中,原始数据被处理之后,部分的数据会被立刻交付给应用系统加以应用,但也有部分是需要共享或者长期保存的,这就要求将处理后的结果存储到可靠介质中。
表 4列出了四种常用的数据存储方式,下面从不同方面进行对比介绍:
![]() |
表 4 四种常用的数据存储方式对比 Table 4 Comparison of four commonly used data storage methods |
HDFS(Hadoop Distributed File System),是谷歌GFS(Google File System)的开源实现,是一个分布式的数据存储系统,支持大规模的数据存储,具有很好的容错性,其存储能力随着集群数量的增加呈线性增长,其具备很高的吞吐量,但是不适合低延迟数据访问,无法高效存储大量小文件,不支持多用户写入及任意修改文件。HDFS作为Hadoop生态系统中的主要存储系统,在实时性要求不是很高的情况下,已经成为很多公司的首选存储方案;
HBase[27],作为一个分布式的、面向列存储的开源NoSQl数据库,其理论基础来源于谷歌的BigTable[28],支持上百万列的大表,其数据最终存储在HDFS中;但是,它克服了HDFS实时性和随机读写的缺陷,可以支持数据的随机读写、实时访问,从而弥补了Hadoop生态系统中实时数据读写的空白。在CAP定理[29]中,HBase选择了CP,即:C(Consistency,一致性)和P(Partition tolerance,分区容错性),因此HBase在可用性上稍有欠缺,需要结合Zookeeper来完善其高可用性;
Cassandra[30],最初由Facebook开发,非常适合于社交网络的数据存储,在亚马逊分布式引擎——Dynamo[31]的基础上,结合BigTable的列族(Column Family)数据模型,并采用P2P(Peer to Peer)去中心化节点管理方式,侧重于CAP理论中的AP:A(Availability,可用性)和P(Partition tolerance,分区容错性),而采用最终一致性。支持多数据中心的数据复制,并提供类SQL语言——CQL(Cassandra Query Language)的支持。
Redis[32],是一款基于内存的key-value存储系统。由于基于内存存储,其具有很高的吞吐量,同时也支持将数据持久化到磁盘,提供强大的数据类型支持,包括lists、sets、ordered sets以及hashes等。此外,Redis中所有操作都是原子性的,Redis 3.0[33]以后提供了Cluster(集群)支持,使得其扩展性大幅度增强,但是其数据存储容量比其他分布式数据库系统略小。
表 4中的四种常用数据存储技术分别适用于不同的大数据应用场景,延时大小、扩展性、容错性、高可用性等方面都是大数据环境下需要考虑的关键因素,没有最好,只有更适合业务场景的解决方案。
5 分布式数据流的DDoS攻击检测为了更好地理解分布式数据流处理系统组成,本章介绍一种大数据环境下的分布式拒绝服务(Distributed Denial of Service,DDoS)攻击检测数据流处理系统,其结构如图 12所示。整个系统包括:数据收集、数据分析、数据存储、模型(或算法)训练、入侵检测。
![]() |
图 12 大数据环境下的DDoS攻击检测数据流处理系统 Figure 12 Data stream processing system for DDoS attack detection in big data environment |
系统的数据流向为:1) 数据的来源为不同的服务器,通过各种抓包软件,如:TcpDump、NetFlow、Sniff等,对特定的网卡或端口进行数据包抓取,并通过Flume将不同服务器上的网络数据汇总,将数据抓、分析和检测分离,减轻应用服务器的负担。2) 数据汇聚之后,将所抓取的网络数据作为Kafka Producer的消息源,并传送到Kafka Broker,让Broker对所有网络数据进行有序的管理。3) Spark Streaming则实时从Kafka Broker中拉取数据,再将数据分散到不同的Spark Executor进行分析和统计。4) Spark将抓取的网络数据处理后,一方面可以将结果传给其他的应用,作进一步的分析;另一方面可以将结果持久化,存储在数据库(HDFS或其他数据库)中,供后续分析使用。5) 对得到的实时数据,可以使之与之前得到的历史数据进行合并进行模型(或算法)训练或者直接通过模型进行DDoS检测,并得到检测结果。整个系统数据搜集使用Flume、消息队列管理使用Kafka、数据实时分析使用Spark Streaming、数据存储使用HDFS或其他数据库,具有好的扩展性、容错性和实时处理能力,能充分满足大数据环境下的各种DDoS攻击检测需求。
6 结语本文主要研究了组成大数据环境下分布式数据流处理系统的各个子系统,包括数据收集子系统、消息队列管理子系统、流式数据处理子系统和数据存储子系统,详细介绍了四类子系统中涉及的相关技术,并从不同的应用角度进行了比较,本文的研究内容能为大数据环境下的数据流处理的理论研究和应用系统开发提供参考,有一定的理论和应用价值。
[1] | 国家图书馆研究院. CNNIC发布第37次《中国互联网络发展状况统计报告》[J]. 国家图书馆学刊, 2016 (2) : 20. ( The Research Institute of the National Library. The 37th China Internet network development state statistic report issued by CNNIC[J]. Journal of the National Library of China, 2016 (2) : 20. ) |
[2] | IDC. The digital universe of opportunities:rich data and the increasing value of the Internet of things[EB/OL].[2014-04-15]. http://www.emc.com/leadership/digital-universe/2014iview/executive-summary.htm. |
[3] | 戴永涛.分布式流处理系统研究与应用[D].上海:上海海事大学,2016:1-40. ( DAI Y T. Research and application of distributed streaming system[D]. Shanghai:Shanghai Maritime University, 2016:1-40. ) |
[4] | 赵勇. 架构大数据——大数据技术及算法解析[M]. 北京: 电子工业出版社, 2015 : 394 -410. ( ZHAO Y. Big Data Structure-The Technology and Algorithm Analysis of Big Data[M]. Beijing: Publishing House of Electronics Industry, 2015 : 394 -410. ) |
[5] | 孟小峰, 慈祥. 大数据管理:概念、技术与挑战[J]. 计算机研究与发展, 2013, 50 (1) : 146-169. ( MENG X F, CI X. Big data management:concepts, technology and challenges[J]. Journal of Computer Research and Development, 2013, 50 (1) : 146-169. ) |
[6] | 孙大为, 张广艳, 郑纬民. 大数据流式计算:关键技术及系统实例[J]. 软件学报, 2014, 25 (4) : 839-862. ( SUN D W, ZHANG G Y, ZHENG W M. Big data flow calculation:the key technology and system instance[J]. Journal of Software, 2014, 25 (4) : 839-862. ) |
[7] | MICHAEL K, MILLER K W. Big data:new opportunities and new challenges[J]. Computer, 2013, 46 (6) : 22-24. doi: 10.1109/MC.2013.196 |
[8] | Facebook. Scribe Wiki[EB/OL].[2015-02-03]. https://github.com/facebookarchive/scribe/wiki. |
[9] | Apache Software Foundation. Apache Thrift[EB/OL].[2016-05-01]. http://thrift.apache.org/. |
[10] | Apache Software Foundation. Apache Flume[EB/OL].[2016-04-09]. http://flume.apache.org/. |
[11] | Apache Software Foundation. Apache Chukwa[EB/OL].[2016-04-05]. http://chukwa.apache.org/. |
[12] | Elasticsearch. Elasticsearch Logstash[EB/OL].[2016-04-11]. https://www.elastic.co/products/logstash. |
[13] | Pivotal Software. RabbitMQ[EB/OL].[2016-04-11]. http://www.rabbitmq.com/. |
[14] | Erlang. Erlang introduction[EB/OL].[2016-04-11]. http://www.erlang.org/. |
[15] | HINTJENS P. ZeroMQ:Messaging for Many Applications[M]. Sebastopol: O'Reilly Media, 2013 . |
[16] | SCHMIDT D C. Reactor:an object behavioral pattern for demultiplexing and dispatching handles for synchronous events[J]. Compilers Principles Techniques & Tools, 1999, 261 (2) : 201-208. |
[17] | Apache Software Foundation. Kafka[EB/OL].[2016-04-11]. http://kafka.apache.org/. |
[18] | Apache Software Foundation. Apache Zookeeper[EB/OL].[2016-04-11]. http://zookeeper.apache.org/. |
[19] | PALANIAPPAN S K, NAGARAJA P B. Efficient data transfer through zero copy[EB/OL].[2016-03-13]. https://www.ibm.com/developerworks/linux/library/j-zerocopy/j-zerocopy-pdf.pdf. |
[20] | Apache Software Foundation. Apache Storm[EB/OL].[2016-04-07]. http://storm.apache.org/. |
[21] | Apache Software Foundation. Apache Samza[EB/OL].[2016-04-08]. http://samza.apache.org/. |
[22] | MURTHY A. Apache Hadoop YARN-background and an overview[EB/OL].[2016-04-08]. http://hortonworks.com/blog/apache-hadoop-yarn-background-and-an-overview/. |
[23] | Apache Software Foundation. Apache Flink[EB/OL].[2016-04-08]. http://flink.apache.org/. |
[24] | Apache Software Foundation. Apache Spark[EB/OL].[2016-04-08]. http://spark.apache.org/. |
[25] | ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets:a fault-tolerant abstraction for in-memory cluster computing[C]//Proceedings of the 9th Usenix Conference on Networked Systems Design and Implementation. Berkely:USENIX Association, 2012:2. |
[26] | ZAHARIA M, DAS T, LI H, et al. Discretized streams:an efficient and fault-tolerant model for stream processing on large clusters[C]//Proceedings of the 4th USENIX Conference on Hot Topics in Cloud Computing. Berkeley, CA:USENIX Association, 2012:10. |
[27] | Apache Software Foundation. Apache Hbase[EB/OL].[2016-04-06]. http://hbase.apache.org/. |
[28] | CHANG F, DEAN J, GHEMAWAT S, et al. Bigtable:a distributed storage system for structured data[C]//Proceedings of the 7th USENIX Symposium on Operating Systems Design and Implementation. Berkeley, CA:USENIX Association, 2006:15. |
[29] | 百度百科.CAP原则[EB/OL].[2016-03-18]. http://baike.baidu.com/link?url=i-7VhglR7AO5k63IHraug1jk0t6LE03jVJsKJQiUL4VU22oNGDa3u2vr_PT8m27-b4ZG_vpxtY7laL8jHNzq9q. ( Baidu Encyclopedia. CAP principle[EB/OL].[2016-03-18]. http://baike.baidu.com/link?url=i-7VhglR7AO5k63IHraug1jk0t6LE03jVJsKJQiUL4VU22oNGDa3u2vr_PT8m27-b4ZG_vpxtY7laL8jHNzq9q. ) |
[30] | LAKSHMAN A, MALIK P. Cassandra:a decentralized structured storage system[J]. ACM SIGOPS Operating Systems Review, 2010, 44 (2) : 35-40. doi: 10.1145/1773912 |
[31] | DECANDIA G, HASTORUN D, JAMPANI M, et al. Dynamo:amazon's highly available key-value store[J]. ACM SIGOPS Operating Systems Review, 2007, 41 (6) : 205-220. doi: 10.1145/1323293 |
[32] | Redis Labs. Redis[EB/OL].[2016-04-11]. http://redis.io/. |
[33] | Redis Labs. Redis cluster tutorial[EB/OL].[2016-04-11]. http://redis.io/topics/cluster-tutorial. |