基于 Spark 打造高效云原生数据分析引擎

议题一:
基于 Spark 打造高效云原生数据分析引擎

辛庸,阿里巴巴计算平台事业部 EMR 技术专家。Apache Hadoop,Apache Spark contributor。对 Hadoop、Spark、Hive、Druid 等大数据组件有深入研究。目前从事大数据云化相关工作,专注于计算引擎、存储结构、数据库事务等内容。

议题简介:
由阿里巴巴 EMR 团队提交的 TPC-DS 成绩在九月份的榜单中取得了排名第一的成绩。这个成绩背后离不开 EMR 团队对 Spark 执行引擎持续不断的优化。
本次分享将选取一些有代表性的优化点,深入到技术细节做详细介绍,包括但不限于动态过滤、CBO增强、TopK排序等等。

展开查看详情

1.基于 Spark 打造高效云 原生数据分析引擎 辛现银(辛庸) 阿里云智能计算平台事业部技术专家

2. 01 TPC-DS BenchMark on EMR 02 Jindo-Spark: Efficient cloud-native Contents analyze engine 目录 01 Runtime Filter 02 Join Reorder 03 TopK optimization 04 Relational Cache 05 Spark Transaction 03 Now and Future

3.TPC-DS Benchmark 1,824,283 0.31USD 14,861,137 0.18USD 5,261,414 0.53CNY

4. EMR-Jindo Jindo-Spark Jindo-Spark DataSource API V1/V2 高效计算引擎,处理多种计算任务 Delta Lake Jindo-Fs Node1 Node2 Node… NodeN Jindo-FS 云原生存储,同时兼顾性能与价格 OSS

5. Jindo-Spark Runtime Filter Enhanced Join Reorder RF 自适应的运行时数据裁剪 JR 遗传算法;外连接重排 TopK File Index TK 推理并下推 TopK 逻辑,运算早期过滤数据 FI 文件级别过滤,min/max/bloom/倒排等 Relational Cache Spark Transaction RC 将查询从分钟级提升为亚秒级 ST 为 Spark 引入 Full ACID 支持 More… M Smart Shuffle SS Push based, 减少 sort-merge 次数

6. Runtime Filter No-RF/RF 加速比 12 10 8 6 35% 4 2 0 q54 q80 q40 q55 q42 q52 q32 q98 q43 q95 q71 q70 运行时动态裁剪数据,避免不必要的计算 SPARK-27227

7. Enhanced Join Reorder 多表 Join 的遗传算法 复杂度 遍历:~O(N!) 动态规划:~O(2^N) 遗传算法:~O(N)*n SPARK-27714

8. Enhanced Join Reorder 多表 Join 的遗传算法 随机式 杂交算法 Query64: 18个表 Join 1750 1800 启发式生成 bushy tree 1400 1350 1050 900 700 450 350 0 0 DP GA 不做Reorder GA 边重组杂交 SPARK-27714

9. Enhanced Join Reorder 外连接重排算法 750 600 450 300 150 0 q80 q40 问题 满足什么条件可以进行 reorder? SIGMOD ’90: Query graphs, implementing trees, and freely-reorderable outerjoins SPARK-27714

10. Top K Problem TopK TopK 问题: Map 1 Filter 2500 order by xxx limit xxx 2000 带 group by 的情形 TopK Map 2 Red 1 TopK window function 的 topK Filter 1500 解法:Reduce 端的 TopK 推至 Map 端 1000 堆排:Nln(N) -> Nln(K) TopK 500 Map 3 Filter TopK 推至 Map: 0 q67 Nln(K) -> MKln(K),其中 MK << N 额外收益:大大减轻 shuffle 的数据量 Data Flow TopK Candidate

11. Relational Cache Relational Cache (RC) RC 创建/维护 RC使用 任意Spark表,视图或者Dataset等关系型 DDL Create Realtional Cache Catalyst Optimizer 数据抽象的数据实体, similar to SQL Materialized View, but more than that EXEC 使用场景 1.亚秒级响应的MOLAP引擎 OSS/HDFS/MEM Rewrite Rules based RC 2.交互式BI,Dashboard。 让数据适配计算,从而大幅提升计算效率 SPARK-26764

12. Relational Cache 数据预计算 数据预组织 雪花模型,PK、FK 优化 文件索引 借助 EMR Spark 快速计算 列式存储 数据立方,存储在主流存储引擎之中 数据分区与 ZOrder 查询自动匹配 Query: Rewritten Query: SELECT empid, deptname, hire_date SELECT empid, deptname, hire_date FROM emps JOIN depts ON (emps.depno = depts.depno) FROM mv WHERE hire_date >= ‘2018-01-01’ AND hire_date <= ‘2018-06-30’ WHERE hire_date >= ‘2018-01-01’ AND hire_date <= ‘2018-06-30’ SPARK-26764

13. Relational Cache BenchMark Cache vs No-Cache Star Schema Benchmark(ms) 218801 250 100000 30018 6014 200 10000 3261 1187 726 824 859 1010 997 1000 150 100 100 10 50 1 0 2 20 Cache 40 No-Cache 200 Sppedup 1000 For More Detail:2019 云栖大会大数据生态专场: 王道远:Spark Relational Cache实现亚秒级响应的交互式分析 SPARK-26764

14. Spark Transaction 场景 数据订正、删除;GDPR 实时数仓:数据流式入库,并发查询。尤其是云原生时代的数据仓库、数据湖建设 方案 悲观机制:类似于 Hive,SQL 执行期间对对象加锁 乐观机制:类似于 Delta,SQL 提交时进行校对

15. Spark Transaction 执行流程 底层存储

16. Spark Txn vs Delta Txn 定制化引擎读写接口 支持该格式的任意引擎 中心化事务管理器 存储层 存储层 SnapShot1 SnapShot2 SnapShot3 元数据+事务管理 Files DeltaFiles 数据 File 1 1 File File 1 1 File AddFiles AddFiles File 1 1 File File 1 1 File AddFiles File 1 1 File DelFiles DelFiles File 1 1 File File 1 1 File Files/ RowId RawRow RawRow AddFiles/ DeltaFiles … … DelFiles …

17. Pessimistic vs Optimistic 悲观机制(Spark/Hive Txn) 乐观机制(Delta) 1. 在并发操作很少的情况下,效率很高 1. 不存在冲突导致的任务失败 2. 底层格式无关,不绑定于某一种格式/引擎 优点 2. 多版本控制在行,粒度更细 3. Merge on write,读性能高 3. Merge on read,写性能高 4. 实现简单 1. 冲突导致的任务失败 1. 修改了数据格式,通用性差 2. 多版本控制在表/分区 缺点 2. Merge on read,读性能低 3. Merge on write, 写性能低,存在写放大的问 3. 实现复杂 题 在并发少、读性能更为重要的数据湖场景,Delta 更优

18. Now and Future 1. Native CodeGen 实现更高效处理效率 2. 基于 Spark over Delta 的高效数据湖建设 3. 围绕 Kafka、Spark、Delta、Kudu 等构件多场景实时数仓解决方案

19.欢迎关注

20.THANKS !