- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- <iframe src="https://www.slidestalk.com/u6/Migrating_from_MR_to_Spark_in_Baidu?embed" frame border="0" width="640" height="360" scrolling="no" allowfullscreen="true">复制
- 微信扫一扫分享
MR大规模迁移Spark在Baidu的实践
展开查看详情
1 .叶先进 ( advancedxy ) MR 大规模迁移 Spark 在 Baidu 的实践
2 .About Me 高级研发工程师 百度智能云技术二部 – 分布式计算研发 参与百度内部 MR, Spark 和 Bigflow 的研发
3 .Agenda 迁移背景 迁移准备工作 一个具体的案例 -- 成果 , 经验和教训 展望
4 .迁移背景 – Baidu 的大数据架构 Spark over Yarn, Yarn over Normandy(as a plugin) 各种计算引擎均对接 Normandy 调度 , 机器上各个计算 引擎应用可混用 Spark 和 MR 是两个最大的离线计算引擎 , 整体机器量级 达到 ~10W
5 .迁移背景 – Spark in Baidu 2014 2015 2016 2017 2018 Spark import to Baidu Version: 0.8 Build standalone cluster Integrate with in-house FS\Pub-Sub\DW Version: 1.4 Build Cluster over YARN Integrate with in-house Resource Scheduler System Version: 1.6 SQL\Graph Service over Spark OAP Version: 2.1 MR to Spark Version: 2.3 5W+ ~3w
6 .迁移背景 MR 稳定但技术陈旧 , 易用性差 . 社区和厂内的 Spark 生态演进完善 . 相对于 MR, Spark 在易用性和性能上有显著优势 MR 和 Spark 计算在百度内部逐渐混布 . 从 17 年起 , 不再提供纯离线的计算资源 (MR). 新的计算资源由在线资源混布提供 . 同期 Baidu 内部 MR 和 Spark 计算团队合并成为批量计算团队
7 .准备工作 – 指导思想 资源打平 重点业务 , 高优支持 选择合适的作业 , 支持业务方改写 , 凸显 Spark 性能优势 表示层的作业 ( Bigflow /SQL(on QE)) 后续无缝推动升级
8 .准备工作 资源打平 : MR 和 Spark 合并物理队列 , 用户 ugi 无缝提交 MR/Spark 作业
9 .准备工作 资源打平 : MR 和 Spark 合并物理队列 , 用户 ugi 无缝提交 MR/Spark 作业 选择合适的任务改写 , 和重点业务方进行沟通 , 推动升级 定性角度 : 大并发量 , 计算拓扑复杂 , 强行将逻辑拆分成 Map Reduce 阶段 定量角度 : 并发数 > 1W, Input + Shuffle > 1T, 作业平均核时数 > 平台利用率 根据任务和重点业务方沟通 , 前期 RD 点对点支持业务方改写 , 测试 , 验证 , 上线
10 .准备工作 资源打平 : MR 和 Spark 合并物理队列 , 用户 ugi 无缝提交 MR/Spark 作业 选择合适的任务改写 , 和重点业务方进行沟通 , 推动升级 定性角度 : 大并发量 , 计算拓扑复杂 , 强行将逻辑拆分成 Map Reduce 阶段 定量角度 : 并发数 > 1W, Input + Shuffle > 1T, 作业平均核时数 > 平台利用率 根据任务和重点业务方沟通 , 前期 RD 点对点支持业务方改写 , 测试 , 验证 , 上线 推动表示层作业迁移 , 目前正在做适配和验证过程 .
11 .一个具体的案例 : 支持 UDW IDMapping 任务迁移 典型作业改写 设备聚合一期 设备聚合二期
12 .典型作业改写 作业类型 Hadoop Streaming Hadoop 代码 Python 代码 + 胶水脚本 Spark 代码 Spark RD 帮助其改造成 Scala 版本 作业逻辑 有用户定制的 InputFormat / OutputFormat 业务逻辑相对简单 迁移效果 Spark 产出和 MR 完全一致 , 占用的存储大小一致 相同资源 Quota 下 : Spark 性能提升 3 – 5 倍 (20min -> 4 min, 12min -> 4min) Spark 资源使用量是 Quota 的 ¼, MR 基本用满 Quota
13 .改写碰到的坑 - 1 用户 MR 代码自定义了 InputFormat 和 OutputFormat , 基于厂内的 MR(1.x 版本 ). 和 Spark 依赖的 Hadoop (2.x 版本 , 厂内定制版本 ) 存在部分接口不兼容的问题 /** * FileSystem.java */ public BlockLocation [][] getFileBlockLocations ( FileStatus [] file , long [] start , long [] len ) throws IOException { if ( file . length != start . length || file . length != len . length ) { throw new IOException ( "check argument length failed." ); } BlockLocation [][] bls = new BlockLocation [ file . length ][]; for ( int i = 0 ; i < file . length ; i ++) { bls [ i ] = getFileBlockLocations ( file [ i ], start [ i ], len [ i ]); } return bls ; }
14 .改写碰到的坑 - 2 问题 : UdwCombineFileInputFormat 和 Spark 中的 UDW 库冲突 . Hive Serde jar 冲突 , 用户程序依赖旧版本的 0.7 hive- serde 解决 : –jars 上传用户的 jars 配置 Spark 参数 : spark.driver.userClassPathFirst =true spark.executor.userClassPathFirst =true
15 .改写碰到的坑 -- Sorting Hadoop 作业 , reduce 端默认有序 Spark reduce 端除非指定排序 , 否则乱序 . ` repartitionAndSortWithInPartitions ` 是个好方法
16 .设备聚合数据流迁移 挑战 数据量特别大:最大的任务输入数据 30+ T 存储, 32 w 文件, 需要 S park 在 S pli t, S huflle , C ounter 等环节提供良好支持 计算逻辑非常复杂: IDMapping 拥有非常复杂的属性合并、反作弊等计算逻辑
17 .设备聚合数据流迁移效果 运行时间 ( h) Cpu( 标准核*天) 内存 ( TB* 天) 迁移前 13.63 30062.03704 3.066990741 迁移后 6.7 16504.05833 2.574039352 对比 ( 前 / 后 ) 49.16% 54.90% 83.93% 主要效果 整体数据流延迟降低 6.9h( 减少 50.84% ) CPU 消耗 每天可以节约 1.36 万标准核*天 ( 节约 45.1%) 内存节约 493 GB * 天 ( 节约 16.07%)
18 .数据流迁移碰到的坑 – map only task 用户的一个作业需要在 map only task 中完成读取和写入 hdfs 操作 , 同时在写输出的时候 , 需要访问 InputFormat 设置的 map.input.file 来决定输出路径 Spark 的 saveAsHadoopFile 生成计算逻辑时 , 使用的是 driver 端的 hadoop job conf , 其序列化后再发送给 Executor. 对于 Map Only 的 Spark task, 读取和写入 hdfs 使用的两个 hadoop job conf 并不是同一个实例 Soltution : 将 map input file name 编码到 key 中 . 改写 OutputFormat , 从 key 中反编码出来 map input file name
19 .数据流迁移碰到的坑 – 大 shuffle 作业 Reduce 并发较大时 , Reduce Stage 一启动就出现了 FetchFailedException : failed to allocate 16777216 byte(s) of direct memory (used: xxxxxxx , max: yyyyyyy ) 触发场景 : Map Stage 有足够多的 shuffle writer(1W+) Reduce Stage 有足够多的 partitions (UDW 场景 8 W+), 导致每个 reduce 读取的数据足够小 (< 默认 48 MB) 但又必须发起较多的请求 . 触发原因 : Spark Shuffle 框架 Reducer 端读取数据时 , 默认配置 Int.MaxValue 的最大连接并发数 . 在该配置下 , 单 executor 发起过多的请求 ( 每个 request netty 会为其创建 native byte buffer), 导致 direct memory oom 解决方案 : 调小 spark.reducer.maxSizeInFlight , 控制 reduce 端的并发 . 但这个值过小的时候 , 会影响 Reduce 的读取速度 调小 spark.reducer.maxReqsInFlight , 控制总并发数 , 比如说 100 - 400 之间 . 这个参数应和 maxSizeInFlight 配合使用 . 另外一个思路就是减小 reduce stage 的 partitions, 那么每个 reduce 需要读取的数据就会变多 , 向 Map Stage 的 executor 发起的网络请求会被切分成多轮 总体来讲 , 一般的作业不需要关注这个问题 . 平台改进 : Spark 框架在 AE 或者普通的作业中 , 可以考虑自动计算出来合适的网络请求并发数 ( Spark 启动下一轮 task 之前 , 能够拿到上一轮 stage 的 shuffle write records/size).
20 .数据流迁移的坑 – 数据 diff 问题 数据 Diff 问题是个天坑 . 启动之前一定要预先想好做 diff 的方案 Diff 方案的设计有时候需要紧贴业务 , 业务方能更有建设性地设计 diff 方案 Python 的 Double 的表示和 Scala 是不一致的 ...
21 .Baidu 内部 MR 迁移 Spark 进展 确认 Spark 相对于 MR 的性能优势 , 多个业务方已经完成一期的迁移 , 均取得明显的资源和时效性收益 . 种子业务作业迁移反向驱动了我们批处理计算团队持续改善 Spark 计算引擎和周边组件 : 厂内定制版本的 Hadoop 改造 , 以便和老的 MapReduce 兼容 HadoopStreamingRDD 的开发 , 支持 Hadoop streaming/ bistreaming Spark 对接厂内可持久化 Shuffle System (DCE Shuffle)
22 .展望 19 年将会有更多的 MR 迁移至 Spark, 预期 3 – 5 年内 MR 将全部迁移至 Spark Spark 对接可持久化 Shuffle System 完成后 , 将能更好地支持百度内部已有的大 shuffle ( 最大 Shuffle 量级为 PB) MR 作业 内部 MR 迁移 Spark 过程中 , 预期对 Spark 组件提出更多的稳定性和功能性需求 , 反推计算团队做出更多的改善 , 择通用性 bug fixes/ 改善反哺社区 .
23 .We are hiring Email: yexianjin@baidu.com