2. 国家海洋局数字海洋科学技术重点实验室,天津 300171
海洋地理空间数据具有时间长、范围广、类型多和计算复杂的特点。大规模长时间序列海洋地理空间数据处理是一个极具挑战性的问题。这些处理包括数据检索、计算和可视化处理等。本文重点关注Spark框架下如何利用GPU并行计算机制实现海洋地理空间数据(Oceanographic Geospatial Data,OGD)分布式并行处理的任务调度,以提高大规模长时间序列海洋地理空间数据处理效率,满足实时交互需求。
Apache Spark是一个由AMPLab创建的、通用的、快速的大规模数据处理开源项目,提供了弹性分布式数据集RDD和高级的DAG执行引擎用于支持数据流和内存计算,弥补了MapReduce模式不适应于迭代计算的缺点,补充了Hadoop生态系统。
GPU具有强大的浮点运算能力。除了应用在计算机图形渲染,GPU也越来越多的被用于通用计算领域,尤其在NVIDIA公司推出了通用的并行计算框架CUDA后。在机器学习、模式计算、深度学习、海洋地理空间数据处理领域,GPU都有广泛的应用。
针对Spark框架中如何嵌入GPU并行计算机制,以提高Spark框架分布式并行处理效率,许多研究者做了大量的研究工作,提出了一些大规模分布式并行计算任务的调度策略[1, 3-10]:
Grossman M[1]将OpenCL集成到了MapReduce框架中,使用Aparapi[2]生成OpenCL的kernel代码,允许在分布式系统中使用GPU等异构处理器,构建了异构分布式框架HadoopCL,并为HadoopCL编写了简单易用的Java编程接口。使用Pi和Kmeans等作为benchmark,在10个节点的集群上提高了3.79x和5.05x;Segal O[3]提出了SparkCL,一个开源的基于Java、OpenCL和Spark的统一编程框架。SparkCL把不同的计算设备如GPU、FPGA、DSP、APU同等对待,使用Aparapi生成kernel函数,实现Spark与OpenCL的结合。
Elteir AM[4]提出了StreamMR, 一个OpenCL的MapReduce框架,专门为AMD GPU做了优化,实现了高效的原子算法来加速MapReduce应用。使用矩阵乘法、K-Means等作为benchmark,实现了4x的加速;Grossman M[5]提出了HadoopCL 2,基于HadoopCL构建了为机器学习设计的分布式异构平台,与Mahout对比,实现了20x的加速。
Choi W[6]提出了Vispark,使用GPU在Spark上加速基于数据组的科学计算或者图像处理应用Vispark提供了简单易用的、类python高级语法的API和针对GPU集群的数据抽象。使用高斯过滤和K-means聚类作为benchmark,Vispark的速度分别比Spark快10x和7x;Wang H[7]使用Spark和GPU加速大规模图像检索处理,并构建了工具包IRlib,提供了统一的图像检索的API和利用Spark实现大规模处理的方案。在多种数据集(Oxford5K、Flickr100K)上测试,不同benchmark实现了几十倍至几千倍的加速效果;Li P[8]介绍了通过RMI结合Spark和GPU,提出了HeteroSpark,一个集成于Spark的GPU加速框架。结合了GPU的强大计算能力和CPU的扩展性,用于加速数据密集型和计算密集型应用。使用不同的机器学习应用来作为benchmark,如Logistic Regression、K-Means、Word2Vec等,取得了接近20x的加速效果。Tsiomenko R[9]在Amazon EC2上使用Hadoop和CUDA结合,提高了快速傅里叶变换(FFT)的性能,实现4x的加速。Liu J[10]结合Spark实现了CUDARDD和CUDAFunction类来支持在Spark中运行CUDA程序,并对RDD的存放做了一些优化。实现了在Spark中使用GPU设备加速。
上述研究基本采用以下三种方式实现GPU嵌入Spark/Hadoop平台:
(1) Aparapi生成kernel代码。
(2) OpenCL与Spark/Hadoop结合。
(3) JNI+CUDA。
方式3结构简单、高效,便于与Spark结合,可实现Spark下多种计算任务的加速处理。
上述研究中任务调度均采用Spark原生调度策略:FIFO(First In First Out)或FAIR。而在实际应用问题中,由于异构Spark-GPU集群各节点GPU设备性能和任务自身计算量的差异,会造成任务执行不均衡和长尾效应;同时,在多任务执行调度过程中,由于GPU设备频繁启动(时间间隔较长),造成GPU设备占用率低。因此,任务执行的整体性能较低、难以满足实时交互处理需求,例如大规模长时间序列海洋地理空间数据可视化处理(Oceanographic Geospatial Data Visualization,OGDV)。
针对上述问题,本文提出了Spark-GPU框架(Spark-GPU Framework,SGF),采用无关并行机任务调度算法,实现大规模长时间序列海洋地理空间数据可视化多任务的调度平衡,以提高GPU设备占用率和任务整体性能、平衡各节点的执行效率。SGF包括Spark-GPU调度器(Spark-GPU Scheduler,SGS)和Spark-GPU运行时(Spark-GPU Runtime,SGR)。SGS调度策略以任务计算量和GPU设备计算能力作为调度因子,该调度问题实际上是一个著名的调度问题——无关并行机任务调度[11]。对于该调度问题,有一个多项式的2近似算法。SGS采用此调度算法处理GPU设备OGDV多任务的调度,任务整体处理性能显著提高。
流场可视化作为测试实例,采用线积分卷积[12](Linear Integral Convolution,LIC)算法处理,利用CUDA加速。针对大规模长时间序列海洋地理空间数据可视化任务的调度问题,测试了1 000~2 000场流场可视化任务的执行时间。测试结果表明1 000~2 000场任务下SGF比Spark执行时间缩短了14%~18%,极大加快了OGDV应用处理速度。
本文贡献主要包括两方面:(1)改进Spark-GPU框架,支持大规模计算任务在异构GPU设备上执行的调度均衡;(2)依据计算量、计算能力,规范定义了大规模计算任务在异构GPU设备上执行的调度问题,并给出了一个多项式的2近似求解算法[11]描述。
1 Spark-GPU框架Spark-GPU框架如图 1所示,一个包含多任务的GPU应用处理包括五步。
|
图 1 Spark-GPU框架 Fig. 1 Spark-GPU Framework |
(1) 从HDFS读取数据,split为多个RDD分块;
(2) 依据任务计算量和GPU设备计算能力配置SGS调度器;
(3) SGS通过无关并行机任务调度算法求解调度方案,分配任务到executor上;
(4) Executor通过JNI接口调用CUDA完成计算过程,并将运行状态返回给SGF;
(5) 计算结果保存到HDFS。
1.1 Spark-GPU运行时在Spark上运行单任务有3种常用方法。
方法1 采用JCUDA[13]。JCUDA是CUDA的Java绑定,隐藏了CUDA的原始接口。编译和运行由JCUDA完成。缺点是数据分配、传输、kernel启动等操作每次需要JNI调用本地动态链接库,造成多次JNI调用,程序执行效率低。
方法2 采用Aparapi。Aparapi允许Java开发者在GPU上执行并行代码段,直接将Java字节码转化为OpenCL描述代码在GPU设备上运行。缺点是无法控制转化后的代码,不易对GPU代码进行优化。
方法3 采用JNI[14]+CUDA,通过JNI直接调用CUDA C编写的程序。在本地程序中完成CUDA程序的运行,尽可能的消除了与Java的绑定和通信带来的开销。直接使用CUDA C的API,更为方便的调试CUDA的代码。不同于方法1,每次任务只发起一次JNI调用。
SGR采用方法3,简单高效、节省JNI调用、方便调试。JNI调用过程如图 2所示,GPU程序通过JNI接口调用CUDA动态链接库中的函数,启动GPU设备执行计算过程。
|
图 2 JNI-GPU调用过程 Fig. 2 JNI-GPU Invocation |
在JNI调用中,Spark无法获取JNI下任务的运行状态,当运行一个任务时候,Spark需要获取到任务运行状态。SGR提供了一个简单的容错机制来判断任务是否运行成功。使用cudaLaunchkernel返回一个cudaError_t类型的值V,SGR把V传给JVM来保证一个失败的任务可以重新运行,确保任务正确执行。
1.2 Spark-GPU调度器SGS包含任务配置模块、任务调度算法模块和任务调度控制模块。
任务配置模块包含各任务计算量和各节点的GPU设备计算能力描述,用于估计不同任务在不同GPU设备上的运行时间,构建一个时间代价矩阵。任务调度算法模块依据时间代价矩阵构建一个2近似线性调度模型,将调度任务转化为线性规划问题,求解生成一种具体的调度方案。任务调度控制模块依据此调度方案生成任务列表,并强制划分成特定数目的partition,通过改进的Spark TaskSetScheduler将不同的partition分配到不同的executor上,即任务分发到不同的GPU设备上。
2 分布式并行任务调度 2.1 问题定义将不同计算量的任务调度分配到不同计算能力的GPU设备上属于一个NP-Hard问题。文献[11]描述了该调度问题,给出了一个多项式近似算法构造一个调度使得总时耗不超过最优解的两倍。
问题定义如下:
有m个独立的机器和n个独立的任务,每个任务被分配到一个机器上,任务j在节点i上处理时间为Pij,问题目标是求出一种调度方案是总时耗最小,形式化描述如下。
| $ \min Z = \mathop {\max }\limits_{i = 1..m} \left( {\sum\limits_{j = 1}^n {{x_{ij}}{P_{ij}}} } \right), $ |
| $ {\rm{s}}{\rm{.}}\;{\rm{t}}.\left\{ \begin{array}{l} {x_{ij}} \in \left\{ {0, 1} \right\}\\ \sum\limits_{i = 1..m, j = 1..n} {{x_{ij}} = n} \\ \forall j = 1..n, \sum\limits_{i = 1..m} {{x_{ij}} = 1} \end{array} \right.。$ |
其中xij=1表示任务j在机器i上运行。
本文中OGDV任务对应n个任务,GPU设备对应m个独立的机器。一个任务执行时间可以根据任务计算量和GPU设备计算能力来估计。
2.2 符号说明输入:
n是任务数;
m是GPU设备数;
TC={TCj|j=1..n}是任务计算量列表;
GC={GCi|i=1..m}是GPU设备计算能力列表;
输出:
SS={(i1, j1)..(in, jn)}任务调度解决方案。(i, j)表示任务j分配到GPU设备i上。
其他:
P是一个m×n的时间代价矩阵
L和U分别表示执行时间的上下界
left, right, d用于二分搜索。
2.3 算法描述任务调度算法包含四步:
(1) 根据公式(1),计算时间代价矩阵P
| $ {P_{ij}} = T{C_j}/G{C_i}\;\;fori = 1..m, j = 1..n。$ | (1) |
GC设置为GPU设备浮点计算能力理论值的80%。
(2) 根据公式(2),计算执行时间下界L和上界U
| $ \begin{array}{l} U = \sum\nolimits_{j = 1}^n {\min \left( {{P_{ij}}\left| {1 \le i \le m} \right.} \right)} , \\ \;\;\;\;\;\;\;\;\;\;\;\;\;\;L = U/m。\end{array} $ | (2) |
考虑一个贪心的调度方案,把每个任务分配到执行该任务最快的GPU设备上,那么总的执行时间为上界U,U/m则为下界L。
(3) 二分搜索
设置left, right为L和U,d=(left+right)/2。设决策问题(P, d)表示在时间代价矩阵为P, 执行时间最多为d的时间条件下,是否存在可行的任务分配策略。如果决策问题(P, d)的解是yes,那么设置right为d,否则设置left为d+1, 重复,直到找到最小的执行时间。
使用2-relaxed-decision过程来判定这个决策问题(P, d)是yes还是no。如公式(3)描述,将此决策问题转化为一个线性规划问题,设置d1=d2=…=dm=t=d。如果该线性规划问题有解,则决策问题的解为yes,否则为no。参见文献[11]中rounding定理。
| $ \sum\nolimits_{i \in {M_j}\left( t \right)} {{x_{ij}} = 1\;for\;j = 1..n, } $ | (3) |
| $ \begin{array}{l} \sum\nolimits_{j \in {J_i}\left( t \right)} {{\mathit{\boldsymbol{P}}_{ij}}{x_{ij}} \le {d_i}\;for\;i = 1..m, } \\ {x_{ij}} \ge 0\;for\;j \in {J_i}\left( t \right), i = 1..m。\end{array} $ |
其中:Ji(t)表示在GPU设备i上运行时间不超过t的任务集合;Mj(t)表示执行任务j时间不超过t的GPU设备集合。
(4) 计算调度方案
设x为上述线性规划问题得到的d最小的解。检查xij判断是否为1,如果是,则把(i, j)加入SS。SS为最终的调度方案。
3 实验 3.1 实验环境异构Spark-GPU集群包含10个节点,1个master节点,9个worker节点, 软件配置,操作系统为CentOS 7.0,Spark平台配置为Spark 1.2.0、Hadoop 2.3.0、Java 1.7、Scala 2.10.4,CUDA版本为CUDA 7.5, 9个worker节点中3个节点配置NVIDIA GTX660,3个节点配置NVIDIA GTX970,3个节点配置NVIDIA GTX1080。
流场可视化LIC算法作为测试用例,采用CUDA加速。LIC算法的时间复杂度为O(Gx×Gy×D×D×S),其中Gx×Gy数据网格密度,D插值密度,S积分步数。测试数据网格密度为45×55,插值密度为10,积分步数为1 000。
下面通过单场可视化任务测试作为多场可视化任务的调度测试基准,以说明SGF框架对多场可视化任务调度的效果。
3.2 单任务GPU加速测试表 1为单场可视化任务在不同GPU设备上采用GPU寄存器优化方式的执行时间。
|
|
表 1 单任务不同GPU设备执行时间 Table 1 Single task execution time on differernt GPU devices |
由表 1,简单按照线性关系测算1 000场流场可视化任务在不同设备上的执行时间,如表 2所示
|
|
表 2 多任务不同GPU设备执行时间 Table 2 Multi Tasks Execution time on Differernt GPU Devices |
本文测试了1 000场和2 000场任务在异构Spark-GPU集群下的执行过程,采用不同调度策略、不同worker节点数目下的总执行时间、GPU设备占用率的变化情况。
表 3给出了1 000场、2 000场流场可视化任务在Spark原生调度方案(FIFO)和SGS调度方案下的执行时间对比。每次测试采用多次运行取平均值,右栏计算了执行时间的减少比,公式计算如下:
| $ R = \left( {T1 - T2} \right)/T1。$ |
|
|
表 3 1 000、2 000场任务执行时间 Table 3 1 000, 2 000 tasks execution time |
其中:R表示时间减少比;T1表示Spark原生调度策略(FIFO)下的执行时间;T2表示SGS任务调度策略下的执行时间。测试结果表明1 000~2 000场任务的执行时间减少了14%~18%。
图 3为1 000~2 000场任务在不同调度策略下执行时间的直方图。结果表明SGS调度算法在不同worker节点数和任务数下比Spark原生调度算法减少了任务执行时间。明显可以看出,节点数目越多,运行时间越少。
|
图 3 不同节点数目下的执行时间 Fig. 3 Execution time of different node number |
图 4是在一个i5 4430 CPU + 8G + GTX660的节点上,不同调度算法下的GPU占用率曲线图。蓝色的表示Spark原生调度算法的GPU占用率曲线图,绿色的表示SGS调度算法的GPU占用率曲线图。SGS调度算法的GPU占用率基本在90%~100%之间,范围变化小且较稳定,Spark原生调度算法的GPU占用率在70%~100%之间浮动,范围变化大且不稳定,说明GPU设备没有得到充分利用,其他节点具有相同的情况。这表明SGS可以有效提高GPU设备占用率。
|
图 4 1 000场任务GPU设备占用率曲线 Fig. 4 1 000 tasks GPU devices utilization in a node |
图 5给出了不同调度算法下GPU设备执行时间占用比,表明SGS调度算法得到了较好的性能提升效果,提高了10%~20%。GPU设备执行时间占用比越高说明任务的计算占用时间更高,调度消耗时间显著减少,任务整体性能提升。
|
图 5 不同节点规模下GPU设备执行时间占用比 Fig. 5 GPU devices time occupancy ratio in different node number |
本文优化改进了Spark-GPU框架,以支持异构Spark-GPU集群下多任务的调度处理,并给出了无关并行机任务调度策略的问题求解方法描述,采用一个多项式2近似算法求解该调度问题。通过大规模长时间序列海洋地理空间数据可视化多任务测试,表明SGF框架设计和SGS调度策略具有较好的任务整体性能提升效果,适用于一般性的大规模并行任务调度处理。
将来尚需完善三个方面的工作:(1)需要更多的实例测试验证;(2)完善改进GPU设备计算能力的评估方法;(3)进一步优化大规模并行任务调度算法。
| [1] |
Grossman M, Breternitz M, Sarkar V. Hadoopcl: Mapreduce on distributed heterogeneous platforms through seamless integration of hadoop and opencl[C].Parallel and Distributed Processing Symposium Workshops & PhD Forum (IPDPSW)IEEE: 2013 IEEE 27th International. 2013: 1918-1927.
( 0) |
| [2] |
Gary F. Aparapi: An open source tool for extending the java promise of write once run anywhereto include the gpu[EB/OL].[2012-09-10]http://conferences.oreilly.com/oscon/oscon2012/public/schedule/detail/23434: O'Reilly, 2012.
( 0) |
| [3] |
Segal O, Colangelo P, Nasiri N, et al. SparkCL: A unified programming framework for accelerators on heterogeneous clusters[J]. arXiv Preprint arXiv:1505.01120, 2015.
( 0) |
| [4] |
Elteir M, Lin H, Feng W, et al. StreamMR: an optimized MapReduce framework for AMD GPUs[C]. Parallel and Distributed Systems (ICPADS), IEEE: 2011 IEEE 17th International Conference on. 2011: 364-371.
( 0) |
| [5] |
Grossman M, Breternitz M, Sarkar V. Hadoopcl2: Motivating the design of a distributed, heterogeneous programming system with machine-learning applications[J]. IEEE Transactions on Parallel and Distributed Systems, 2016, 27(3): 762-775. DOI:10.1109/TPDS.2015.2414943
( 0) |
| [6] |
Choi W, Hong S, Jeong W K. Vispark: GPU-accelerated distributed visual computing using spark[J]. SIAM Journal on Scientific Computing, 2016, 38(5): S700-S719. DOI:10.1137/15M1026407
( 0) |
| [7] |
Wang H, Xiao B, Wang L, et al. Accelerating large-scale image retrieval on heterogeneous architectures with Spark[C]. ACM: Proceedings of the 23rd ACM International Conference on Multimedia. 2015: 1023-1026.
( 0) |
| [8] |
Li P, Luo Y, Zhang N, et al. HeteroSpark: A heterogeneous CPU/GPU Spark platform for machine learning algorithms[C]. Networking, Architecture and Storage (NAS), IEEE: 2015 IEEE International Conference on. IEEE, 2015: 347-348.
( 0) |
| [9] |
Tsiomenko R, Rees B S. Accelerating fast fourier transforms using hadoop and CUDA[J]. arXiv Preprint arXiv: 1407.6915, 2014. http://www.oalib.com/paper/4065691
( 0) |
| [10] |
Liu J, Hu Y. Gpu support in spark and gpu/cpu mixed resource scheduling at production scale[EB/OL].[2016-03-25]https://spark-summit.org/2016/events/gpu-support-in-spark-and-gpu-cpu-mixed-resource-scheduling-at-production-scale/: Databricks, 2016.
( 0) |
| [11] |
Lenstra J K, Shmoys D B, Tardos E. Approximation algorithms for scheduling unrelated parallel machines[J]. Mathematical programming, 1990, 46(1-3): 259-271. DOI:10.1007/BF01585745
( 0) |
| [12] |
Cabral B, Leedom L C. Imaging vector fields using line integral convolution[C]. ACM: Proceedings of the 20th Annual Conference on Computer Graphics and Interactive techniques. 1993: 263-270.
( 0) |
| [13] |
Yan Y, Grossman M, Sarkar V. JCUDA: A programmer-friendly interface for accelerating Java programs with CUDA[C]. European Conference on Parallel Processing. Berlin: Springer, 2009: 887-899.
( 0) |
| [14] |
Hunt J. Java native interface[M]. Java for Practitioners. London: Springer, 1999: 417-425.
( 0) |
2. The Key Laboratory of Digital Oceanic Science and Technology, National Marine Data and Information Service, Tianjin 300171, China
2018, Vol. 48



0)