来自百度的工程师介绍其场内应用从MR迁移到Spark的动机,MR稳定但是技术相对陈旧易用性差,公司内部和外部Spark生态演进相对完善,Spark的易用性和性能上也有较大优势。百度内部统一作业调度系统(Normandy)也让Spark取代MR作业更加便利,但是其推进过程涉及很多历史遗留作业,性能,还有兼容性问题的干扰,这部分是本次分享的核心内容和经验之谈。

注脚

1.叶先进 ( advancedxy ) MR 大规模迁移 Spark 在 Baidu 的实践

2.About Me 高级研发工程师 百度智能云技术二部 – 分布式计算研发 参与百度内部 MR, Spark 和 Bigflow 的研发

3.Agenda 迁移背景 迁移准备工作 一个具体的案例 -- 成果 , 经验和教训 展望

4.迁移背景 – Baidu 的大数据架构 Spark over Yarn, Yarn over Normandy(as a plugin) 各种计算引擎均对接 Normandy 调度 , 机器上各个计算 引擎应用可混用 Spark 和 MR 是两个最大的离线计算引擎 , 整体机器量级 达到 ~10W

5.迁移背景 – Spark in Baidu 2014 2015 2016 2017 2018 Spark import to Baidu Version: 0.8 Build standalone cluster Integrate with in-house FS\Pub-Sub\DW Version: 1.4 Build Cluster over YARN Integrate with in-house Resource Scheduler System Version: 1.6 SQL\Graph Service over Spark OAP Version: 2.1 MR to Spark Version: 2.3 5W+ ~3w

6.迁移背景 MR 稳定但技术陈旧 , 易用性差 . 社区和厂内的 Spark 生态演进完善 . 相对于 MR, Spark 在易用性和性能上有显著优势 MR 和 Spark 计算在百度内部逐渐混布 . 从 17 年起 , 不再提供纯离线的计算资源 (MR). 新的计算资源由在线资源混布提供 . 同期 Baidu 内部 MR 和 Spark 计算团队合并成为批量计算团队

7.准备工作 – 指导思想 资源打平 重点业务 , 高优支持 选择合适的作业 , 支持业务方改写 , 凸显 Spark 性能优势 表示层的作业 ( Bigflow /SQL(on QE)) 后续无缝推动升级

8.准备工作 资源打平 : MR 和 Spark 合并物理队列 , 用户 ugi 无缝提交 MR/Spark 作业

9.准备工作 资源打平 : MR 和 Spark 合并物理队列 , 用户 ugi 无缝提交 MR/Spark 作业 选择合适的任务改写 , 和重点业务方进行沟通 , 推动升级 定性角度 : 大并发量 , 计算拓扑复杂 , 强行将逻辑拆分成 Map Reduce 阶段 定量角度 : 并发数 > 1W, Input + Shuffle > 1T, 作业平均核时数 > 平台利用率 根据任务和重点业务方沟通 , 前期 RD 点对点支持业务方改写 , 测试 , 验证 , 上线

10.准备工作 资源打平 : MR 和 Spark 合并物理队列 , 用户 ugi 无缝提交 MR/Spark 作业 选择合适的任务改写 , 和重点业务方进行沟通 , 推动升级 定性角度 : 大并发量 , 计算拓扑复杂 , 强行将逻辑拆分成 Map Reduce 阶段 定量角度 : 并发数 > 1W, Input + Shuffle > 1T, 作业平均核时数 > 平台利用率 根据任务和重点业务方沟通 , 前期 RD 点对点支持业务方改写 , 测试 , 验证 , 上线 推动表示层作业迁移 , 目前正在做适配和验证过程 .

11.一个具体的案例 : 支持 UDW IDMapping 任务迁移 典型作业改写 设备聚合一期 设备聚合二期

12.典型作业改写 作业类型 Hadoop Streaming Hadoop 代码 Python 代码 + 胶水脚本 Spark 代码 Spark RD 帮助其改造成 Scala 版本 作业逻辑 有用户定制的 InputFormat / OutputFormat 业务逻辑相对简单 迁移效果 Spark 产出和 MR 完全一致 , 占用的存储大小一致 相同资源 Quota 下 : Spark 性能提升 3 – 5 倍 (20min -> 4 min, 12min -> 4min) Spark 资源使用量是 Quota 的 ¼, MR 基本用满 Quota

13.改写碰到的坑 - 1 用户 MR 代码自定义了 InputFormat 和 OutputFormat , 基于厂内的 MR(1.x 版本 ). 和 Spark 依赖的 Hadoop (2.x 版本 , 厂内定制版本 ) 存在部分接口不兼容的问题 /** * FileSystem.java */ public BlockLocation [][] getFileBlockLocations ( FileStatus [] file , long [] start , long [] len ) throws IOException { if ( file . length != start . length || file . length != len . length ) { throw new IOException ( "check argument length failed." ); } BlockLocation [][] bls = new BlockLocation [ file . length ][]; for ( int i = 0 ; i < file . length ; i ++) { bls [ i ] = getFileBlockLocations ( file [ i ], start [ i ], len [ i ]); } return bls ; }

14.改写碰到的坑 - 2 问题 : UdwCombineFileInputFormat 和 Spark 中的 UDW 库冲突 . Hive Serde jar 冲突 , 用户程序依赖旧版本的 0.7 hive- serde 解决 : –jars 上传用户的 jars 配置 Spark 参数 : spark.driver.userClassPathFirst =true spark.executor.userClassPathFirst =true

15.改写碰到的坑 -- Sorting Hadoop 作业 , reduce 端默认有序 Spark reduce 端除非指定排序 , 否则乱序 . ` repartitionAndSortWithInPartitions ` 是个好方法

16.设备聚合数据流迁移 挑战 数据量特别大:最大的任务输入数据 30+ T 存储, 32 w 文件, 需要 S park 在 S pli t, S huflle , C ounter 等环节提供良好支持 计算逻辑非常复杂: IDMapping 拥有非常复杂的属性合并、反作弊等计算逻辑

17.设备聚合数据流迁移效果   运行时间 ( h) Cpu( 标准核*天) 内存 ( TB* 天) 迁移前 13.63 30062.03704 3.066990741 迁移后 6.7 16504.05833 2.574039352 对比 ( 前 / 后 ) 49.16% 54.90% 83.93% 主要效果 整体数据流延迟降低 6.9h( 减少 50.84% ) CPU 消耗 每天可以节约 1.36 万标准核*天 ( 节约 45.1%) 内存节约 493 GB * 天 ( 节约 16.07%)

18.数据流迁移碰到的坑 – map only task 用户的一个作业需要在 map only task 中完成读取和写入 hdfs 操作 , 同时在写输出的时候 , 需要访问 InputFormat 设置的 map.input.file 来决定输出路径 Spark 的 saveAsHadoopFile 生成计算逻辑时 , 使用的是 driver 端的 hadoop job conf , 其序列化后再发送给 Executor. 对于 Map Only 的 Spark task, 读取和写入 hdfs 使用的两个 hadoop job conf 并不是同一个实例 Soltution : 将 map input file name 编码到 key 中 . 改写 OutputFormat , 从 key 中反编码出来 map input file name

19.数据流迁移碰到的坑 – 大 shuffle 作业 Reduce 并发较大时 , Reduce Stage 一启动就出现了 FetchFailedException : failed to allocate 16777216 byte(s) of direct memory (used: xxxxxxx , max: yyyyyyy ) 触发场景 : Map Stage 有足够多的 shuffle writer(1W+) Reduce Stage 有足够多的 partitions (UDW 场景 8 W+), 导致每个 reduce 读取的数据足够小 (< 默认 48 MB) 但又必须发起较多的请求 . 触发原因 : Spark Shuffle 框架 Reducer 端读取数据时 , 默认配置 Int.MaxValue 的最大连接并发数 . 在该配置下 , 单 executor 发起过多的请求 ( 每个 request netty 会为其创建 native byte buffer), 导致 direct memory oom 解决方案 :  调小 spark.reducer.maxSizeInFlight , 控制 reduce 端的并发 . 但这个值过小的时候 , 会影响 Reduce 的读取速度 调小 spark.reducer.maxReqsInFlight , 控制总并发数 , 比如说 100 - 400 之间 . 这个参数应和 maxSizeInFlight 配合使用 . 另外一个思路就是减小 reduce stage 的 partitions, 那么每个 reduce 需要读取的数据就会变多 , 向 Map Stage 的 executor 发起的网络请求会被切分成多轮 总体来讲 , 一般的作业不需要关注这个问题 . 平台改进 : Spark 框架在 AE 或者普通的作业中 , 可以考虑自动计算出来合适的网络请求并发数 ( Spark 启动下一轮 task 之前 , 能够拿到上一轮 stage 的 shuffle write records/size).

20.数据流迁移的坑 – 数据 diff 问题 数据 Diff 问题是个天坑 . 启动之前一定要预先想好做 diff 的方案 Diff 方案的设计有时候需要紧贴业务 , 业务方能更有建设性地设计 diff 方案 Python 的 Double 的表示和 Scala 是不一致的 ...

21.Baidu 内部 MR 迁移 Spark 进展 确认 Spark 相对于 MR 的性能优势 , 多个业务方已经完成一期的迁移 , 均取得明显的资源和时效性收益 . 种子业务作业迁移反向驱动了我们批处理计算团队持续改善 Spark 计算引擎和周边组件 : 厂内定制版本的 Hadoop 改造 , 以便和老的 MapReduce 兼容 HadoopStreamingRDD 的开发 , 支持 Hadoop streaming/ bistreaming Spark 对接厂内可持久化 Shuffle System (DCE Shuffle)

22.展望 19 年将会有更多的 MR 迁移至 Spark, 预期 3 – 5 年内 MR 将全部迁移至 Spark Spark 对接可持久化 Shuffle System 完成后 , 将能更好地支持百度内部已有的大 shuffle ( 最大 Shuffle 量级为 PB) MR 作业 内部 MR 迁移 Spark 过程中 , 预期对 Spark 组件提出更多的稳定性和功能性需求 , 反推计算团队做出更多的改善 , 择通用性 bug fixes/ 改善反哺社区 .

23.We are hiring Email: yexianjin@baidu.com

user picture

相关Slides

  • 本PPT解释了作为支持交易型分布式数据库系统的TiDB核心产品架构及其主要组件,包括TiDB,TiKV,Placement Driver,TiSpark,TheFlash,Tool,TiDB-operator for k8s等,对其基本作用进行阐述,并对其中的核心组件TiKV重点分析,解释了基本数据组织方式,执行方式,数据管理,水平扩展和负载均衡,以及分布式一致性等基本问题。最好对其分析引擎TiSpark也进行了简要功能说明。

  • 介绍了ES的基本结构,功能和原理,重点分析了在实际生产环境中各种运维和监控的指标,以及各种调优经验和配置参数,还有运维自动化的方法论探讨,可以作为ES在实际生产环境中的最佳实践部署和运维监控案例,也可以帮助ES平台维护者理解并思考如何提供更好的ES服务及运维保障。

  • Adaptive Execution @ Spark + AI Summit Europe 2018 Video @ https://databricks.com/session/spark-sql-adaptive-execution-unleashes-the-power-of-cluster-in-large-scale-2

  • Apache Spark作为分布式内存计算引擎,内存使用的优化对于性能提升至关重要,Intel的Optane(傲腾)技术,让内存和SSD之间架设了个新的数据缓存/存储层,并通过PMDK等特殊的API绕过文件系统,系统调用,内存拷贝等一系列额外操作,让性能有极大的提升。Intel开源的OAP(Optimized Analytics Package)for Apache Spark项目,也是基于这个前体,构建即席查询引擎,以及在机器学习算法诸如KMeans算法上也获得了不错的性能回报。