申请试用
HOT
登录
注册
 
Delta Engine:High Performance Query Engine for Data Lakes
openLooKeng
/
发布于
/
786
人观看

Delta Engine:High Performance Query Engine for Data Lakes

展开查看详情

1 .Delta Engine: High performance query engine for data lakes Reynold Xin Chief Architect & Co-founder, Databricks

2 .Delta Engine § Builds on Apache Spark 3.0 SQL Spark Koalas DataFrame § Fully API compatible § Accelerates SQL and DataFrame workloads with: Query Optimizer ▪ Improved query optimizer ▪ Native vectorized execution engine ▪ Caching Native Execution Engine Caching

3 .Delta Engine’s Improved Query Optimizer § Extends Spark’s cost-based SQL Spark Koalas DataFrame optimizer and adaptive query execution with advanced stats Query Optimizer § Up to 18x performance increase for star schema workloads Native Execution Engine Caching

4 .Delta Engine’s Caching § Automatically caches input SQL Spark Koalas DataFrame § Transcodes data into a more CPU-efficient format Query Optimizer fully leveraging NVMe SSDs Native § Up to 5X scan performance increase Execution Engine Caching

5 . Spark SQL Koalas DataFrame Query Optimizer Native Execution Engine Query Optimizer

6 .Hardware Workloads

7 .Spark Summit 2015 Keynote

8 .Hardware Changes since 2015 2010 2015 2020 50 MB/s 500 MB/s 16 GB/s Storage 10X (HDD) (SSD) (NVMe) Network 1 Gbps 10 Gbps 100 Gbps 10X CPU ~3 GHz ~3 GHz ~3 GHz L CPUs continue to be the bottleneck. How do we achieve next level performance?

9 .Workload Trends Businesses are moving faster, and as a result organizations spend less time in data modeling, leading to worse performance: § Most columns don’t have “NOT NULL” defined § Strings are convenient, and many date columns are stored as strings § Data are more denormalized and continuously ingested Can we get both agility and performance?

10 .Photon New execution engine for Delta Engine to accelerate Spark SQL Built from scratch in C++, for performance: § Vectorization: exploit data-level parallelism and instruction-level parallelism § Optimize for modern structured and semi-structured workloads

11 .CPU 101 CPU frequency is not getting higher, but parallelisms are improving: § Data-level parallelism: SIMD register width ▪ MMX/SSE: 128 bits ▪ AVX2: 256 bits ▪ AVX512: 512 bits § Instruction-level parallelism: out-of-order window ▪ Sandy Bridge: 168 ▪ Haswell: 192 ▪ Skylake: 224 https://en.wikichip.org/wiki/File:skylake_buff_window.png

12 .

13 .Photon New execution engine to accelerate Spark SQL, as part of Delta Engine Built from scratch in C++, for performance: § Vectorization: exploit data-level parallelism and instruction-level parallelism § Optimize for modern structured and semi-structured workloads

14 .Vectorization: Columnar In-memory Format Row Format Column Format 1 alex 4.1 1 2 3 4 2 bart 3.5 alex bart ala ankur 3 ala 6.4 3.5 4.1 6.4 9.9 4 ankur 9.9

15 .Simple Example: select col1 + col3 from table for (int32_t i = 0; i < num_rows; ++i) { Col1 1 2 3 4 out[i] = col1[i] + col3[i]; } Col2 alex bart ala ankur Col3 3.5 4.1 6.4 9.9 Benefits: - Compact memory access (better prefetching and caching) Out 4.5 6.1 9.4 13.9 - Simpler code snippet, easier for compilers to optimize (e.g. loop unrolling)

16 .Vectorized non-SIMD version SIMD version (“v” instructions)

17 .Exploiting Data Level Parallelism col1 + col3: Billion rows processed per core per sec (higher is better) SIMD non-SIMD 0 1 2 3 4 5 6 7 8

18 .Photon New execution engine to accelerate Spark SQL, as part of Delta Engine Built from scratch in C++, for performance: § Vectorization: exploit data-level parallelism and instruction-level parallelism § Optimize for modern structured and semi-structured workloads

19 .Hash Table: Most Important Data Structure Example: select sum(value) from table group by key Hash Table Table (ht) for (int32_t i = 0; i < batchSize; ++i) { int32_t bucket = hash(keyCol[i]) % ht->size; key value e rob if (ht[bucket].key == keyCol[i]) { P ht[bucket].value += valueCol[i]; Hash ← Scan } } Note: ”Over-simplified”. No hash table collision.

20 .Hash Table: Most Important Data Structure Example: select sum(value) from table group by key CPU breakdown for (int32_t i = 0; i < batchSize; ++i) { int32_t bucket = hash(keyCol[i]) % ht->size; Useful if (ht[bucket].key == keyCol[i]) { work } ht[bucket].value += valueCol[i]; Memory stalls 36% } 64%

21 .How do we optimize the following? for (int32_t i = 0; i < batchSize; ++i) { int32_t bucket = hash(keyCol[i]) % ht->size; if (ht[bucket].key == keyCol[i]) { Loop body is large (key comparison can be ht[bucket].value += valueCol[i]; very expensive, e.g. comparing strings) } } § Instructions to access memory are intermixed with instructions to compute hash code, compare keys and addition. § Large loop body -> CPU sees fewer memory access instruction -> CPU executes fewer memory access at once. § Solution: make loops smaller.

22 .How do we optimize the following? for (int32_t i = 0; i < batchSize; ++i) { int32_t bucket = hash(keyCol[i]) % ht->size; if (ht[bucket].key == keyCol[i]) { ht[bucket].value += valueCol[i]; } } for (int32_t i = 0; i < batchSize; ++i) { buckets[i] = hash(keyCol[i]) % ht->size; } for (int32_t i = 0; i < batchSize; ++i) { keys[i] = ht[buckets[i]].key; } for (int32_t i = 0; i < batchSize; ++i) { if (keys[i] == keyCol[i]) { Break the big loop into smaller loops ht[buckets[i]].value += valueCol[i]; } }

23 .Exploiting Instruction Level Parallelism § Pipeline memory access, load 140 multiple memory addresses in 120 parallel 100 47 80 § Prefetch to eliminate cache- 60 misses 40 84 23 20 23 § Minimize TLB misses with huge 0 pages Traditional Systems Photon Memory stalls Useful work

24 .TPC-DS 30TB Queries/Hour (Higher is better) 120 100 110 3.3x 80 60 speedup 40 20 32 0 Delta Engine with Photon Delta Engine w/o Photon

25 .Photon New execution engine to accelerate Spark SQL, as part of Delta Engine Built from scratch in C++, for performance: § Vectorization: exploit data level parallelism and instruction-level parallelism § Optimize for modern structured and semi-structured workloads

26 .Faster String Processing MBs processed per core per sec, UPPER() function (higher is better) JVM C++ 0 25 50 75 100 125 MBs processed per core per sec, SUBSTRING() function (higher is better) JVM C++ 0 100 200 300 400 500 Can we do better?

27 .UTF-8 String Encoding UTF-8 is a variable-len encoding: A 1 byte 41 © 2 bytes c2 a9 3 bytes e8 be 9b ! 4 bytes f0 9f 92 a9 Great for memory (more compact), but computationally expensive: § substring(), ... needs to look at the bytes one by one. § If fixed length encoding, simply return the number of bytes.

28 .Fixed Length vs Variable Length Performance Not realistic to ask users to define whether a column is ASCII vs UTF-8, even though most data are ASCII. Can we get UTF-8’s memory saving, and the performance of ASCII when data are ASCII?

29 .Faster String Processing Goal: only pay the performance penalty when necessary. Separate processing into two steps: Step 1. Fully SIMD (AVX) ASCII detection. Runs at ~60GB/s/core. Step 2. If column batch is ASCII, run the specialized fixed-length version; otherwise, run the variable-length version. https://lemire.me/blog/2018/10/19/validating-utf-8-bytes-using-only-0-45-cycles-per-byte-avx-edition/

0 点赞
1 收藏
5下载
确认
3秒后跳转登录页面
去登陆