spark 将 dataframe 写到 hdfs 为什么会如此耗时?

2017-12-25 16:25:23 +08:00
 wudc
公司这边有个数据聚合的任务,聚合的过程中遇到了数据倾斜,通过局部聚合和全局聚合的方式解决,速度提升了很多,12G 的数据大概需要 15 分钟左右聚合完,但是在最后以 json 格式的文件存储到 hdfs 上时特别耗时,而且偶尔会报内存溢出,spark.default.parallelism 设置的是 200,最后保存时是这么写的:df.repartition(20).write.json(savePath),求 spark 大神指点我哪里有问题,该怎么解决这个问题?谢谢。
7030 次点击
所在节点    程序员
18 条回复
linuxchild
2017-12-25 16:54:10 +08:00
压缩一下再写试试
wudc
2017-12-25 17:22:51 +08:00
@linuxchild 嗯,谢谢,我加上了这段代码 df.persist(StorageLevel.MEMORY_AND_DISK_SER),现在程序在跑
liprais
2017-12-25 17:25:25 +08:00
df.repartition(20) 这样不是只起二十个 partition 在写么
直接 df.write.json 试试?
wudc
2017-12-25 18:32:42 +08:00
@liprais 想最后把结果写到 20 个文件中
mind3x
2017-12-25 18:35:51 +08:00
用 coalesce(20) 试试
zhusimaji
2017-12-25 18:39:03 +08:00
@wudc 你这个只是数据持久化
zhusimaji
2017-12-25 18:40:00 +08:00
@wudc 可以看下 spark job 看看卡在哪了
Mondoz
2017-12-25 18:46:45 +08:00
同 coalesce
wudc
2017-12-25 18:47:36 +08:00
@zhusimaji 卡在 df.repartition(20).write.json(savePath)这行了,数据量比较小时还可以但是数据量一大就栈溢出
wudc
2017-12-25 18:49:06 +08:00
@Mondoz
@mind3x
好的,改成 coalesce 已经在测试,谢谢。
liprais
2017-12-25 19:20:27 +08:00
可以先 df.repartition(20) 看看执行计划
然后你就明白为啥这么慢了
wudc
2017-12-25 19:28:13 +08:00
@liprais 嗯,明白了,去掉 reparation 确实快了不少,谢谢!
zhusimaji
2017-12-25 22:43:30 +08:00
@wudc 一般情况下不要指定分区数,因为这个都会自动计算出合理的分区,分区设置的过少,必然导致每个 job 处理的时间变长
wudc
2017-12-26 13:46:32 +08:00
@zhusimaji 嗯嗯,听你这么说我明白了,我分区是想防止过多结果文件的产生,现在看来有点画蛇添足了。
wudc
2017-12-26 13:47:55 +08:00
@zhusimaji 我这程序在处理大数据量的聚合时偶尔会报 OOM 内存溢出,您对此有什么比较好的解决方法吗?
zhusimaji
2017-12-26 15:23:43 +08:00
@wudc 不知道你是不是在 yarn 上执行,如果内存够的情况下给每个 executer 分配内存大一点
zhusimaji
2017-12-26 15:26:06 +08:00
oom 产生的原因很多,最好根据打印的 log 日志去找相对应的资料
wudc
2017-12-26 16:24:21 +08:00
@zhusimaji 是在 yarn 上,executor-cores、num-executors 和 executor-memory 已经设置成当前集群所允许的最大值了,我再想想吧,还是非常感谢你的指导。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/417480

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX