- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 视频嵌入链接 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
eBay’s Optimized Spark SQL Engine for Interactive Analysis
王玉明-eBay 软件工程师 / Spark Committer
展开查看详情
1 .
2 .关于我 Yuming Wang (Github: wangyum) • eBay 软件工程师 • Apache Spark Committer
3 .eBay’s New Optimized Spark SQL Engine for Interactive Analysis
4 .Agenda 稳定性提升 功能增强 性能改进 系统介绍
5 .PART 01 系统介绍
6 .系统介绍 – 架构 Spark Spark Spark Thrift Thrift Thrift Server Server Server
7 . 系统介绍 – 状态 01 2200+ Node 02 1+ PB Memory 03 10+ Queue
8 .PART02 功能增强
9 .功能增强 - 访问控制 ⚫ Gateway 权限认证,集群和queue的访问控制 ⚫ 基于SQL的数据库/表级别的访问权限控制 GRANT SELECT ON table1 TO USER user1; GRANT SELECT ON DATABASE db1 TO USER user1; GRANT SELECT ON table1 TO ROLE role1; SHOW GRANT USER user1 ON TABLE table1; REVOKE ...
10 .功能增强 - Update/Delete支持 1 Delta Lake 2 Update/Delete SQL语法 3 10000+ Delta table 4 Delta表的管理
11 .功能增强 - 上传/下载 API 通过JDBC下载 大结果集 通过JDBC上传 csv文件到表 集成ODBC与下 载API
12 .功能增强 – Recommendations for Common Query Problems 01 02 03 Add a filter Detect badly Detect the condition to skewed data bucket read p a r t i t i o n column 04 05 Check the Detect bad dangerous join condition join condition
13 .功能增强 – 语法 Temp table Recursive SQL query Support column list specification in insert statement* Implement qualify clause Support ‘LIKE ANY’ and ‘LIKE ALL’ operators* Resolve SQL query reference a column by an alias
14 .功能增强 – 其他 R e a d /Write t a b l es a c r o ss d i ffere nt c l u s ter C o m p a c t t a b l e s c o n tai ns s m a l l f i l e s / M e r g e s m a l l f i l e s C u s t o m Pa r q u etO utputC o mm itte r Su p p o rt a n a l yz e a l l t a b l es i n a s p e c ifi c d a t a base* Su p p o rt s p i l l r e s u lt t o d i s k Th r i ft s e r ve r q u e r y a u d i t l o g / Q u e r y e xc e p t io n Su m m a r y Vi e w Po i n t
15 .PART03 性能改进
16 .性能改进 – 透明数据缓存 缓存常用的 生产数据集 01 02 共享HDFS 性能不稳定
17 .性能改进 –索引 • 独立的索引文件, 更 灵活 的 创建 删 除索 引 CREATE INDEX i_id ON TABLE t1 (id) AS "bf";
18 .性能改进 – Bucket Join • 倍数关系bucket 表join 可以不shuffle Coalesce Rebucket • Join条件是bucket 列的超集 CREATE TABLE t1 (id STRING, value STRING) USING parquet CLUSTERED BY(id) INTO 1000 BUCKETS; CREATE TABLE t2 (id STRING, value STRING) USING parquet CLUSTERED BY(id) INTO 1000 BUCKETS; SELECT * FROM t1 JOIN t2 ON t1.id=t2.id AND t1.value=t2.value;
19 .性能改进 – Runtime filter (1) BroadcastHashJoin BroadcastHashJoin id IN (1, 2, …) Exchange Exchange HashAggregate HashAggregate DynamicFilter DynamicFilter source Big Input1 Small Input Big Input1 Small Input
20 .性能改进 – Runtime filter (2) BroadcastHashJoin BroadcastHashJoin SortMergeJoin id IN (1, 2, …) SortMergeJoin Exchange Exchange Exchange Exchange DynamicFilter DynamicFilter source Big Input1 Big Input2 Small Input Big Input1 Big Input2 Small Input
21 . 性能改进 – Runtime filter (3) SortMergeJoin SortMergeJoin Exchange DynamicFilter source InBloomFilter(id, bf) BuildBloomFilter Exchange Exchange Exchange DynamicFilter Small Input1 Big Input1 Small Input1 Big Input1
22 . 性能改进 – Range Join SELECT a FROM t1 INNER JOIN t2 ON t1.ip BETWEEN t2.begin_ip AND t2.end_ip Range Join with Range Index Nested Loop Join O(N * M) => O(N * 2 * LOG(M))
23 .性能改进 – Limit 下推 Improve limit only query on datasource table SELECT * FROM t1 LIMIT 5; Push down limit through InnerLike when condition is empty* SELECT * FROM t1 CROSS JOIN t2 LIMIT 5; Push down limit through window when partitionSpec is empty* SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rowId FROM t1 LIMIT 5; Push down limit through Project with Join* SELECT a FROM t1 LEFT JOIN t2 ON a = x AND b = y LIMIT 5; Push down limit 1 through Aggregate if it is group only* SELECT * FROM (SELECT * FROM t1 UNION SELECT * FROM t2) tmp LIMIT 1;
24 .性能改进 – Filter 下推 A 支 持 下 推 d a t e , b yt e , s h o r t , d e c i ma l 和 t i m e st a m p 类 型 到 P a r q u e t * SELECT * FROM t1 WHERE c_date = date '2021-10-16' 支 持 下 推 I n 和 S t r i n g S t a r tA 谓词到Parquet* sWith B B SELECT * FROM t1 WHERE id IN(1, 3, 5) AND name LIKE 'Spark%' C F i l t e r p u s h d o wn t h r o u g h wi n d o w SELECT * FROM t1 WHERE QUALIFY RANK() OVER(PARTITION BY a ORDER BY b DESC) = 1 C o m b i n e Fi l t e r s 支 持 n o n - d e te r mi n i s t i c 表 达 式 * D CREATE VIEW v1 AS SELECT * FROM t1 WHERE dt NOT IN ('2020-01-01', '2021-01-01’); SELECT * FROM v1 WHERE dt = '2021-05-01' AND rand() <= D0.01; E 改 进 H i ve M e t a s to r e f i l t e r 下 推 *
25 .性能改进 – Filter其他优化 Constraints inferred from inequality attributes SELECT * FROM t1 JOIN t2 ON t1.id = t2.id AND t2.start_dt <= t1.src_dt WHERE t1.src_dt <= date ‘2021-10-16’; ==> t2.start_dt <= date ‘2021-10-16’ 根据选择率和Cost重排过滤条件 ⚫ Conjunctive predicates F o r m u l a : ( s e l e c t i v i t y - 1 . 0 ) / e x p re s s i o n c o s t SELECT * FROM t WHERE udf(name) LIKE '%w%' AND id = 1 ⚫ Disjunctive predicates F o r m u l a : ( - s e l e c t i v i t y ) / e x p re s s i o n c o s t SELECT * FROM t WHERE id > 10 OR id > 0
26 .性能改进 – 其他 A Improve the statistics estimation* B Simplify/Optimiz e conditional expressions* C Skew join improvement( Spark Skew Join 的原理与优化 ) D Parquet 读优化 E Lazy listing support F Improve cast string to decimal type*
27 .PART04 稳定性改进
28 .稳定性改进 A 限制最终结果集和中间结果集的大小 B 单个sql 的task数量以及总tas k时间限制 C 单次table scan的文件数和大小的限制 D SQL 优化阶段的限制 E Join 膨胀率限制 F Spark UI 分离
29 .