- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
PingCAP-Infra-Meetup-94-wanggang-Exploration-and-improvements
展开查看详情
1 .Some explorations and improvements on Spark PLATFORM ENGINEERING REVIEW 1
2 .Agenda Ø Bloom Filter Index Ø Bucket Join Ø Range Partition Ø Materialized View Ø New CBO PLATFORM ENGINEERING REVIEW 2
3 .Bloom Filter Index • Problem Statement ○ Full table scan will be performed if not applicable on partition pruning • Solution ○ Index can help to locate the target data efficiently. BF index is more like data skip index DDL Driver CREATE INDEX index_name ON IndexMetadata TABLE table_name(column_name) AS Hive 'BF’; Metastore IndexMetadata ALTER INDEX index_name ON Index Files table_name REBUILD; FileFormatWriter Spark Job HDFS SELECT * FROM table_name PruneFiles WHERE index_name=...; Query PruneRowGroup PLATFORM ENGINEERING REVIEW 3
4 .Bloom Filter Index Select tranDate, sum(tranTotal) from Tran where selllerID=xxx group by tranDate; Saving 98.2% IO, 44% CPU Time PLATFORM ENGINEERING REVIEW 4
5 .Bucket Join • Problem Statement Shuffle & Sort is the most expensive operations for big tables join • Solution Map Side Join (Bucket Join) Sort merge join Sort Merge Join Sort (a1) Sort (b1) Exchange (a1) Exchange (b1) TableScan a TableScan b TableScan a TableScan b PLATFORM ENGINEERING REVIEW 5
6 .One to Many Bucket Join • Support one to many bucket join. • Some improvements for one to many bucket join. Apply merge sort on table scan to accelerate sort operator. Scan re-bucketing to increment concurrency. PLATFORM ENGINEERING REVIEW 6
7 .Optimizations for bucket join Ø Case 1: • Table a (a1, a2, a3), bucket by (a1, a3), bucket number n. • Table b (b1, b2, b3), bucket by (b1, b3), bucket number n. • Table a join table b on (a1=b1, a2=b2, a3=c3) Sort merge join Sort merge join Sort (a1,a2,a3) Sort (b1,b2,b3) Sort (a1,a2,a3) Sort (b1,b2,b3) Exchange (a1,a2,a3) Exchange (b1,b2,b3) TableScan a TableScan b TableScan a TableScan b PLATFORM ENGINEERING REVIEW 7
8 .Optimizations for bucket join Ø Case 2: • Table a (a1, a2, a3), bucket by (a1, a2), bucket number n. • Table b (b1, b2, b3), bucket by (b1), bucket number k. • Table a join table b on (a1=b1, a2=b2, a3=b3) Sort merge join Sort merge join Sort (a1,a2,a3) Sort (b1,b2,b3) Sort (b1,b2,b3) Sort (a1,a2,a3) Exchange (a1,a2,a3) Exchange (b1,b2,b3) Exchange (b1,b2) TableScan a TableScan b TableScan a TableScan b PLATFORM ENGINEERING REVIEW 8
9 .Optimizations for bucket join Ø Case 3: • Table a (a1, a2, a3), bucket by (a1, a2), bucket number n. • Table b (b1, b2, b3), non-bucketed. • Table a join table b on (a1=b1, a2=b2, a3=b3) Sort merge join Sort merge join Sort (a1,a2,a3) Sort (b1,b2,b3) Sort (b1,b2,b3) Sort (a1,a2,a3) Exchange (a1,a2,a3) Exchange (b1,b2,b3) Exchange (b1,b2) TableScan a TableScan b TableScan a TableScan b PLATFORM ENGINEERING REVIEW 9
10 .Bucket Join Query q64 Base-20180717 eBay-20180717 eBay/Base 751 70 0.09 q93 150 16 0.11 q80 263 50 0.19 q50 87 22 0.26 q40 41 16 0.38 q22 19 8 0.46 q49 45 25 0.56 TPC-DS Benchmark (103 Query) q29 150 84 0.56 Total duration q95 256 146 0.57 Spark2.3.0 – 9,496s q85 52 32 0.62 eBay Spark – 8,365s q25 142 89 0.62 Saving 11.9% q39b 11 7 0.62 q17 145 91 0.63 Maximum 10 times q39a 12 8 0.64 q78 354 238 0.67 q5 45 32 0.70 q24a 611 443 0.72 q24b 596 435 0.73 ss_max 20 15 0.74 q86 6 5 0.88 q70 20 18 0.90 q37 18 17 0.92 PLATFORM ENGINEERING REVIEW 10
11 . Range Partition • Problem Statement Partition + Bucket distribution will cause M*N huge file numbers which put lot pressure on HDFS Partial update will cause whole table reload. • Solution Range partition can provide the flexibility to better manage the partition base on needs 18 * 365 * n 17 * n + 12 * n Partition 2000-01-01 2017-12-31 2000-01-01 2001-01-01 2017-12-01 2000-01-02 2000-12-31 2001-12-31 2017-12-31 Bucket-1 Bucket-1 Bucket-1 Bucket-1 Bucket-1 Bucket-1 Bucket-2 Bucket-2 Bucket-2 Bucket-2 Bucket-2 Bucket-2 Bucket … … Bucket-3 Bucket-3 Bucket-3 Bucket-3 Bucket-3 Bucket-3 …… …… …… …… …… …… Bucket-n Bucket-n Bucket-n Bucket-n Bucket-n Bucket-n PLATFORM ENGINEERING REVIEW 11
12 .Range Partition DDL & DML CREATE TABLE sales (......) PARTITION BY RANGE (dt Date) ( PARTITION sales_2006 VALUES LESS RangePartition THAN (TO_DATE('01-01-2007','dd-mm-yyyy')), Expression PARTITION sales_2007 VALUES LESS THAN (TO_DATE('01-01-2008','dd-mm-yyyy')) ); HiveMetastore ALTER TABLE sales ADD PARTITION RangePartition ( PARTITION sales_2008 VALUES LESS Expression rearrange THAN (TO_DATE('01-01-2009','dd-mm-yyyy'))); data by partition &bucket FileFormatWriter Spark Job INSERT INTO TABLE sales SELECT ...; HDFS SELECT * FROM sales pruneRangeByPre WHERE dt BETWEEN '01-01-2008' AND '01-01- Query Optimizer 2010' dicates PLATFORM ENGINEERING REVIEW 12
13 .Materialized View • Reuse cache DDL in Spark SQL CACHE TABLE... AS SELECT • Each MV represented as a CachedData object, managed by CacheManager • Data in MV can be store in memory and disk CACHE TABLE sum_sales_time_product_mv AS SELECT mv.prod_name,SUM(mv.amount_sold) FROM join_sales_time_product_mv mv GROUP BY mv.prod_name; PLATFORM ENGINEERING REVIEW 13
14 .Materialized View Logical Phase Physical Phase agg1 AggExec Planner Query1 join joinExec cache t1 t2 T1 scan T2 scan CacheManager agg2 agg2 AggExec Planner Query2 join Rewriter t1 t2 InMemoryRelation InMemoryTableScan PLATFORM ENGINEERING REVIEW 14
15 .Materialized View • Problem Statement Need rewrite query to leverage MV. • Solution Implement a rewriter in logical optimize phase to rewrite logical plan. Refer to Oracle MV rewriter design. A query query Aggregate F Join query F query Compatibility Data Sufficiency Compatibility Check Check Check Rewrite with …... Find common Rewrite with additional subgraph additional filter aggregate MV MV MV MV PLATFORM ENGINEERING REVIEW 15
16 .cache table q1_mv as select Enable MV l_returnflag, # of rows:2412 l_linestatus, l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, Disable MV sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, #of rows: 903165785 count(l_quantity) as cnt_qty, count(l_extendedprice) as cnt_price, count(l_discount) as cnt_disc, sum(l_discount) as sum_disc, count(*) as count_order from lineitem group by l_returnflag, l_linestatus, l_shipdate; Aggregation Materialization View select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem where l_shipdate <= date '1998-09-02' and l_shipdate >= date '1997-09-02' group by l_returnflag, l_linestatus order by Disable MV Rewrite: Enable MV Rewrite: l_returnflag, l_linestatus; Query Example Leveraging MV 16.365 seconds 1.55 seconds
17 .Materialized View • TODOs ○ More intelligent rewrite algorithm ○ Cost based MV selection ○ Centralized meta data storage ○ Centralized data storage ○ Stored in more efficient format PLATFORM ENGINEERING REVIEW 17
18 .New CBO • Problem Statement ○ Only Join Reorder is CBO ○ Current CBO is in logical phase, kills the possibility to select the best plan due to physical cost ○ Current cost model is based on table/column level statistics, without knowledge about the physical operation cost • Solution ○ Use Cascades framework to do top-down rule-based search space generation: ■ Join Enumeration ■ Aggregate Pushdown/Pullup ■ Physical implementation: Join impl, scan impl…… ■ MV rewrite ■ Even task parallelism…. ○ Refine cost model due to physical implementations PLATFORM ENGINEERING REVIEW 18
19 .New CBO Cascades Optimizer Logical Plan Physical Plan copyIn copyOut generate SearchSpace Join Enumeration RuleSet Aggregate Pushdown/Pullup Physical implementation: Join impl, scan impl…… MV rewrite ... PLATFORM ENGINEERING REVIEW 19
20 .New CBO • Cost Model ○ Refer to Hive cost model. ○ Count in CPU/IO/Network cost. Cost(operator) = Cost(CPU) + Cost(IO) + Cost(Network) ○ CPU cost determined by row count, IO/Network cost determined by data size. PLATFORM ENGINEERING REVIEW 20
21 .New CBO • TODOs ○ Stats estimation is not good enough ○ Selectivity Estimation is not accurate ○ Cost model need be verified and adjusted in prod environment ○ Join Enumeration is time-consuming ○ Star Join is not considered in join enumeration ○ …... PLATFORM ENGINEERING REVIEW 21
22 .Thanks PLATFORM ENGINEERING REVIEW 22