Spark 是基于内存的分布式计算引擎,以处理的高效和稳定著称。然而在实际的应用开发过程中,开发者还是会遇到种种问题,其中一大类就是和性能相关。在本文中,笔者将结合自身实践,谈谈如何尽可能地提高应用程序性能。
分布式计算引擎在调优方面有四个主要关注方向,分别是 CPU 、内存、网络开销和 I/O ,其具体调优目标如下:
数据倾斜意味着某一个或某几个 Partition 中的数据量特别的大,这意味着完成针对这几个 Partition 的计算需要耗费相当长的时间。
如果大量数据集中到某一个 Partition ,那么这个 Partition 在计算的时候就会成为瓶颈。图 1 是 Spark 应用程序执行并发的示意图,在 Spark 中,同一个应用程序的不同 Stage 是串行执行的,而同一 Stage 中的不同 Task 可以并发执行, Task 数目由 Partition 数来决定,如果某一个 Partition 的数据量特别大,则相应的 task 完成时间会特别长,由此导致接下来的 Stage 无法开始,整个 Job 完成的时间就会非常长。
要避免数据倾斜的出现,一种方法就是选择合适的 key ,或者是自己定义相关的 partitioner 。在 Spark 中 Block 使用了 ByteBuffer 来存储数据,而 ByteBuffer 能够存储的最大数据量不超过 2GB 。如果某一个 key 有大量的数据,那么在调用 cache 或 persist 函数时就会碰到 spark-1476 这个异常。
下面列出的这些 API 会导致 Shuffle 操作,是数据倾斜可能发生的关键点所在
图 1: Spark 任务并发模型
def rdd: RDD[T]
}
// TODO View bounds are deprecated, should use context bounds
// Might need to change ClassManifest for ClassTag in spark 1.0.0
case class DemoPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
// Here we use a single Long to try to ensure the sort is balanced,
// but for really large dataset, we may want to consider
// using a tuple of many Longs or even a GUID
def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()
.grouped(numPartitions).map(t => (t._1._1, t._2))
}
case class DemoRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
def grouped(size: Int): RDD[T] = {
// TODO Version where withIndex is cached
val withIndex = rdd.mapPartitions(_.zipWithIndex)
val startValues =
withIndex.mapPartitionsWithIndex((i, iter) =>
Iterator((i, iter.toIterable.last))).toArray().toList
.sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)
withIndex.mapPartitionsWithIndex((i, iter) => iter.map {
case (value, index) => (startValues(i) + index.toLong, value)
})
.partitionBy(new Partitioner {
def numPartitions: Int = size
def getPartition(key: Any): Int =
(key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
})
.map(_._2)
}
}
定义隐式的转换
implicit def toDemoRDD[T: ClassManifest](rdd: RDD[T]): DemoRDD[T] =
new DemoRDD[T](rdd)
implicit def toDemoPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]): DemoPairRDD[K, V] = DemoPairRDD(rdd)
implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
}
在 spark-shell 中就可以使用了
import RDDConversions._
yourRdd.grouped(5)
Spark 的 Shuffle 过程非常消耗资源, Shuffle 过程意味着在相应的计算节点,要先将计算结果存储到磁盘,后续的 Stage 需要将上一个 Stage 的结果再次读入。数据的写入和读取意味着 Disk I/O 操作,与内存操作相比, Disk I/O 操作是非常低效的。
使用 iostat 来查看 disk i/o 的使用情况, disk i/o 操作频繁一般会伴随着 cpu load 很高。
如果数据和计算节点都在同一台机器上,那么可以避免网络开销,否则还要加上相应的网络开销。 使用 iftop 来查看网络带宽使用情况,看哪几个节点之间有大量的网络传输。
图 2 是 Spark 节点间数据传输的示意图, Spark Task 的计算函数是通过 Akka 通道由 Driver 发送到 Executor 上,而 Shuffle 的数据则是通过 Netty 网络接口来实现。由于 Akka 通道中参数 spark.akka.framesize 决定了能够传输消息的最大值,所以应该避免在 Spark Task 中引入超大的局部变量。
图 2: Spark 节点间的数据传输
为了提高 Spark 应用程序的效率,尽可能的提升 CPU 的利用率。并发数应该是可用 CPU 物理核数的两倍。在这里,并发数过低, CPU 得不到充分的利用,并发数过大,由于 spark 是每一个 task 都要分发到计算结点,所以任务启动的开销会上升。
并发数的修改,通过配置参数来改变 spark.default.parallelism ,如果是 sql 的话,可能通过修改 spark.sql.shuffle.partitions 来修改。
repartition 和 coalesce 都能实现数据分区的动态调整,但需要注意的是 repartition 会导致 shuffle 操作,而 coalesce 不会。
groupBy 操作应该尽可能的避免,第一是有可能造成大量的网络开销,第二是可能导致 OOM 。以 WordCount 为例来演示 reduceByKey 和 groupBy 的差异
reduceByKey
sc.textFile(“ README.md ”).map(l=>l.split(“,”)).map(w=>(w,1)).reduceByKey(_ + _)
图 3 : reduceByKey 的 Shuffle 过程
Shuffle 过程如图 2 所示
groupByKey
sc.textFile(“ README.md ”).map(l=>l.split(“,”)).map(w=>(w,1)).groupByKey.map(r=>(r._1,r._2.sum))
图 4 : groupByKey 的 Shuffle 过程
建议: 尽可能使用 reduceByKey, aggregateByKey, foldByKey 和 combineByKey
假设有一 RDD 如下所示,求每个 key 的均值
val data = sc.parallelize( List((0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)) )
方法一: reduceByKey
data.map(r=>(r._1, (r.2,1))).reduceByKey((a,b)=>(a._1 + b._1, a._2 + b._2)).map(r=>(r._1,(r._2._1/r._2._2)).foreach(println)
方法二: combineByKey
data.combineByKey(value=>(value,1),
(x:(Double, Int), value:Double)=> (x._1+value, x._2 + 1), (x:(Double,Int), y:(Double, Int))=>(x._1 + y._1, x._2 + y._2))
在 Join 过程中,经常会遇到大表和小表的 join. 为了提高效率可以使用 BroadcastHashJoin, 预先将小表的内容广播到各个 Executor, 这样将避免针对小表的 Shuffle 过程,从而极大的提高运行效率。
其实 BroadCastHashJoin 核心就是利用了 BroadCast 函数,如果理解清楚 broadcast 的优点,就能比较好的明白 BroadcastHashJoin 的优势所在。
以下是一个简单使用 broadcast 的示例程序。
val lst = 1 to 100 toList
val exampleRDD = sc.makeRDD(1 to 20 toSeq, 2)
val broadcastLst = sc.broadcast(lst)
exampleRDD.filter(i=>broadcastLst.valuecontains(i)).collect.foreach(println)
有时需要将计算结果存储到外部数据库,势必会建立到外部数据库的连接。应该尽可能的让更多的元素共享同一个数据连接而不是每一个元素的处理时都去建立数据库连接。
在这种情况下, mapPartitions 和 foreachPartitons 将比 map 操作高效的多。
移动计算的开销远远低于移动数据的开销。
Spark 中每个 Task 都需要相应的输入数据,因此输入数据的位置对于 Task 的性能变得很重要。按照数据获取的速度来区分,由快到慢分别是:
Spark 在 Task 执行的时候会尽优先考虑最快的数据获取方式,如果想尽可能的在更多的机器上启动 Task ,那么可以通过调低 spark.locality.wait 的值来实现, 默认值是 3s 。
除了 HDFS , Spark 能够支持的数据源越来越多,如 Cassandra, HBase,MongoDB 等知名的 NoSQL 数据库,随着 Elasticsearch 的日渐兴起, spark 和 elasticsearch 组合起来提供高速的查询解决方案也成为一种有益的尝试。
上述提到的外部数据源面临的一个相同问题就是如何让 spark 快速读取其中的数据, 尽可能的将计算结点和数据结点部署在一起是达到该目标的基本方法,比如在部署 Hadoop 集群的时候,可以将 HDFS 的 DataNode 和 Spark Worker 共享一台机器。
以 cassandra 为例,如果 Spark 的部署和 Cassandra 的机器有部分重叠,那么在读取 Cassandra 中数据的时候,通过调低 spark.locality.wait 就可以在没有部署 Cassandra 的机器上启动 Spark Task 。
对于 Cassandra, 可以在部署 Cassandra 的机器上部署 Spark Worker ,需要注意的是 Cassandra 的 compaction 操作会极大的消耗 CPU ,因此在为 Spark Worker 配置 CPU 核数时,需要将这些因素综合在一起进行考虑。
这一部分的代码逻辑可以参考源码 TaskSetManager::addPendingTask
private def addPendingTask(index: Int, readding: Boolean = false) {
// Utility method that adds `index` to a list only if readding=false or it's not already there
def addTo(list: ArrayBuffer[Int]) {
if (!readding || !list.contains(index)) {
list += index
}
}
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
case e: HDFSCacheTaskLocation => {
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) => {
for (e <- set) {
addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
}
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
}
case _ => Unit
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
}
}
if (tasks(index).preferredLocations == Nil) {
addTo(pendingTasksWithNoPrefs)
}
if (!readding) {
allPendingTasks += index // No point scanning this whole list to find the old task there
}
}
如果准备让 spark 支持新的存储源,进而开发相应的 RDD ,与位置相关的部分就是自定义 getPreferredLocations 函数,以 elasticsearch-hadoop 中的 EsRDD 为例,其代码实现如下。
override def getPreferredLocations(split: Partition): Seq[String] = {
val esSplit = split.asInstanceOf[EsPartition]
val ip = esSplit.esPartition.nodeIp
if (ip != null) Seq(ip) else Nil
}
使用好的序列化算法能够提高运行速度,同时能够减少内存的使用。
Spark 在 Shuffle 的时候要将数据先存储到磁盘中,存储的内容是经过序列化的。序列化的过程牵涉到两大基本考虑的因素,一是序列化的速度,二是序列化后内容所占用的大小。
kryoSerializer 与默认的 javaSerializer 相比,在序列化速度和序列化结果的大小方面都具有极大的优势。所以建议在应用程序配置中使用 KryoSerializer.
spark.serializer org.apache.spark.serializer.KryoSerializer
默认的 cache 没有对缓存的对象进行序列化,使用的 StorageLevel 是 MEMORY_ONLY,这意味着要占用比较大的内存。可以通过指定 persist 中的参数来对缓存内容进行序列化。
exampleRDD.persist(MEMORY_ONLY_SER)
需要特别指出的是 persist 函数是等到 job 执行的时候才会将数据缓存起来,属于延迟执行; 而 unpersist 函数则是立即执行,缓存会被立即清除。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.