- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 视频嵌入链接 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Carmel- Optimizing SparkSQL for Interactive Analysis
Carmel- Optimizing SparkSQL for Interactive Analysis
展开查看详情
1 .Carmel – Optimizing SparkSQL for Interactive Analysis Nov 2019 Ken, Wang
2 .Introduction • eBay 大数据平台部门首席架构师 • Work on Big Data since 2014 • Work on streaming processing, Jetstream at first • Work on Druid since 2015 • Work on Kylin since 2016 • Work on Spark/Spark SQL since 2017 Data Platform Engineering 2
3 .Agenda • eBay SQL-on Hadoop平台的背景 • Carmel Spark SQL的架构 • Carmel Spark SQL的开发和优化 • 未来的plan,挑战和机会 Data Platform Engineering 3
4 .SQL on Hadoop in eBay • Background eBay is offloading Teradata and moving to Hadoop • Challenges and Gap Easy to use Query Performance Easy to tune, Adaptive optimization Update/Delete support Data Security Workload management • Our Goal 4
5 .The High-Level Architecture of Carmel JDBC/ ODBC Hercules Zeta Gateway Tenant Tenant Tenant …… A B C Alation L B Carmel Hive Metastore Gateway Tableau Shuffle Cache VDM Prod DBs …… Gateway SSD HDD Interactive Analysis Cluster General Cluster 5
6 .Carmel Spark SQL的开发和优化 • Long Running Cluster of Thrift servers • Spark Driver Optimization • Bucketing • Better data placement – Range Partition + Bucketing • Caches • Data Security • Spark Adaptive Execution • Index 6
7 .Long running cluster of Thrift Servers • Carmel Gateway • Spark Thrift Servers on Yarn • Support multiple Authentications • Impersonation • Automatic session dispatch • Standard JDBC driver • Mac/Linux/Windows ODBC drivers • Monitoring and Alerting • System Audit table to record query behaviors • Long running query detection Data Platform Engineering 7
8 .Spark Driver Optimization • New Task Scheduler Implementation - Optimized for concurrency and throughput • Large query result spill to disk • Fix memory leaks in Spark Thrift Server and Driver • Cache • Query Dumper, easy for query plan debug • Spark History server - Log files organized by session - On demand load by sessionid Data Platform Engineering 8
9 .Concurrency/Throughput Test Test on local HDFS with SSD, Executors num = 850 Client Nums Throughput (sqls/min) Avg Latency(ms) Table Type 15 37.8 22583 Partition 20 34.8 32106 Partition 25 35.4 41774 Partition Client Nums Throughput (sqls/min) Avg Latency(ms) Table Type 40 75 30207 Partition 40 115.2 20317 RangePartition + Bucket 2x throughput Data Platform Engineering 9
10 . Bucket Join • Spark SQL Bucket Table Join Require: hash partitioning Sort by join key SortMergeJoin SortMergeJoin If join on bucket column Physical Optimize SortExec SortExec TableScanExec TableScanExec ShuffleExchangeExec ShuffleExchangeExec A B Output: Hash partitioned by bucket columns TableScanExec TableScanExec Sorted by bucket column A B PRESENTATION TITLE GOES HERE 10
11 .Bucket join • One bucket only generate one file • Default Sort by bucket column • Indexed bucket • Support multiple bucketing number • Support Join keys are the super set of bucket keys • Partial Sorted RDD, move sort to RDD scan • Does not support Hive bucket Data Platform Engineering 11
12 .Combine Bucket and Partition? Bucketing is good for large tables join, partitioning is good for data prune and data refreshment, can we combine them? Usually you can’t, partition + bucket will generate (M*N) number of tiny HDFS files and kill the Hadoop NN. Data Platform Engineering 12
13 .A better data placement – Range Partition + Bucket Range partition can provide the flexibility to better manage the partition base on needs. 18 * 365 * n 17 * n + 12 * n Partition 2000-01-01 2017-12-31 2000-01-01 2001-01-01 2017-12-01 2000-01-02 2000-12-31 2001-12-31 2017-12-31 Bucket-1 Bucket-1 Bucket-1 Bucket-1 Bucket-1 Bucket-1 Bucket-2 Bucket-2 Bucket-2 Bucket-2 Bucket-2 Bucket-2 Bucket … … Bucket-3 Bucket-3 Bucket-3 Bucket-3 Bucket-3 Bucket-3 …… …… …… …… …… …… Bucket-n Bucket-n Bucket-n Bucket-n Bucket-n Bucket-n With Range partition + bucket, we can also apply Dynamic Partition Prune(Spark 3.0) to the bucket table. Data Platform Engineering 13
14 .Range Partitions Benchmark result Range PARTITION DDL Query RB IB Run 1 Run 2 Run 3 Run 1 Run 2 Run 3 create LISTING_TABLE q1 30667 9385 8566 29676 110543 26205 PARTITIONED BY RANGE (PARTITION_COL DATE) q2 15225 9639 10166 19998 20427 16260 ( q3 15093 12122 13068 31320 37314 26412 PARTITION p_20050101 VALUES LESS THAN ('2005-01-01'), q4 15034 11543 10800 18280 17564 16217 PARTITION p_20060101 VALUES LESS THAN ('2006-01-01'), q5 4847 4017 4622 8845 9157 9268 q6 16941 8062 7576 24141 26178 20375 PARTITION p_20070101 VALUES LESS THAN ('2007-01-01'), q7 9772 8160 7278 25490 28394 24386 PARTITION p_20080101 VALUES LESS THAN ('2008-01-01'), q8 9389 5140 4623 12845 14457 11050 PARTITION p_20090101 VALUES LESS THAN ('2009-01-01'), q9 4179 3273 3170 9795 10123 9240 PARTITION p_20100101 VALUES LESS THAN ('2010-01-01'), q10 10121 4966 4826 13778 11854 10706 PARTITION p_20110101 VALUES LESS THAN ('2011-01-01'), q11 7867 6633 5932 22442 24968 21133 PARTITION p_20120101 VALUES LESS THAN ('2012-01-01'), q12 5403 6394 6368 12806 11369 12082 q13 10866 6640 5272 13602 13439 11617 PARTITION p_20130101 VALUES LESS THAN ('2013-01-01'), q14 12471 8764 8272 17182 14977 14538 PARTITION p_20140101 VALUES LESS THAN ('2014-01-01'), q15 5984 7459 5626 9625 9032 10147 PARTITION p_20150101 VALUES LESS THAN ('2015-01-01'), q16 9822 8086 8036 14772 16758 14726 PARTITION p_20160101 VALUES LESS THAN ('2016-01-01'), q18 6223 6501 5768 12919 12329 12541 PARTITION p_20170101 VALUES LESS THAN ('2017-01-01'), q19 10738 7766 8254 19342 16175 14469 PARTITION p_20180101 VALUES LESS THAN ('2018-01-01'), q20 6066 5951 5836 15489 12777 11675 PARTITION p_20190101 VALUES LESS THAN ('2019-01-01'), q22 2721 2946 2422 7917 7653 7245 q23 8972 22132 8089 24636 20007 20439 PARTITION p_20200101 VALUES LESS THAN ('2020-01-01'), q28 9367 8629 8745 15229 16969 14250 PARTITION p_99991231 VALUES LESS THAN ('9999-12-31') q30 20940 18554 28292 20343 46132 18715 ) q35 11611 9166 9280 11282 8608 8215 CLUSTERED BY (COL_1) q36 8483 8496 7886 8016 7208 8217 SORTED BY (COL_1, COL_2) q57 3500 3841 3667 9499 9360 9422 INTO 1000 BUCKETS AS q58 3620 4375 4156 9424 9439 9040 select * from LISTING_TABLE_VIEW; q59 7593 8455 8437 23694 20757 18963 q60 10668 10273 9202 56688 30424 25992 q66 28232 11891 10953 31305 16917 16214 sum 323135 249784 235729 550909 611884 450278 Data Platform Engineering 14
15 .Caches • Data Cache - Cache remote hot tables to local SSD • Statement level metadata Cache • Spark Broadcast Cache • Cross session Hive metadata Cache(Working on) • Relational Cache/Materialize View(Working on) Data Platform Engineering 15
16 .Relation Cache/MV Cache and reuse the common query pattern to avoid duplicate calcuate pre-calculation to accelerate the query, make the answer more close to the query. Flexible way to change the data placement, transparent to upstream and downstream Need to implement a query rewriter in logical optimize phase to rewrite logical plan. Refer to Oracle MV rewriter design. A query query Aggregate F Join F query query Compatibility Compatibility Data Sufficiency Check Check Check Rewrite with …... Find common Rewrite with additional subgraph additional filter aggregate MV MV MV MV 16
17 .Relation Cache Query Rewriter • Join Compatibility Check ○ Inner join can derive from outer join by filtering anti-join rows ○ …... • Data Sufficiency Check ○ id=102 ○ id between 1 and 100 ○ id>10 and id<50 ○ id in (20,30,40) ○ …... • Group Compatibility Check • Aggregate Compatibility Check • Lattice for MV dependencies CREATE MATERIALIZED VIEW MV_TARGET SELECT MV_SOURCE.COL_1,SUM(MV_SOURCE.COL_2) FROM MV_SOURCE GROUP BY MV_SOURCE.COL_1; 17
18 .Data Security • SQL/View based access control View level Access - Grant/Revoke/Role Syntax support Access Views • Transparent column level encryption/decryption - Parquet column encryption/decryption Table level Access - Parquet footer encryption/decryption Hive/Spark Tables - Key Management service • Define and manage the same Access Bundle in Hadoop File level Access as Teradata HDFS File Data Platform Engineering 18
19 .Spark Adaptive Execution • A Must have for long running clusters • Enabled by default for all the queues/tenants • Implement Skewed join based on new AE framework in Spark 3.0 • Support 3 table skew joins • Improve Driver memory usage for UnionRDD • Optimization for NAAJ(Null-Aware-Anti-Join) • Optimization for Create/Insert • Limit query optimization, do not submit all the tasks. Data Platform Engineering 19
20 .Future work • Automated MV/Index/Cache life cycle management • CBO Stats collecting automation • Range Filter Simplify • Infer additional Filters from constraints and pushdown • Join pushdown(Adaptive Runtime filter) • Skewed join hint • Support Dela Lake • UDF performance • …. Data Platform Engineering 20
21 .Challenges and Opportunities • Workload management Teradata has very solid workload management • CBO, new CBO framework CBO can do more besides Join reordering Volcano/Cascade style cost based optimization framework commonly adopted by many DB engines(SQL Server, Greenplum Orca, Apache Calcite) Solid Stats derivation, more accurate cost model • Adaptive Execution AE can be more adaptive Data Platform Engineering 21
22 .Q&A Thanks you Data Platform Engineering 22