Delta Engine:High Performance Query Engine for Data Lakes

Delta Engine:High Performance Query Engine for Data Lakes

展开查看详情

1.Delta Engine: High performance query engine for data lakes Reynold Xin Chief Architect & Co-founder, Databricks

2.Delta Engine § Builds on Apache Spark 3.0 SQL Spark Koalas DataFrame § Fully API compatible § Accelerates SQL and DataFrame workloads with: Query Optimizer ▪ Improved query optimizer ▪ Native vectorized execution engine ▪ Caching Native Execution Engine Caching

3.Delta Engine’s Improved Query Optimizer § Extends Spark’s cost-based SQL Spark Koalas DataFrame optimizer and adaptive query execution with advanced stats Query Optimizer § Up to 18x performance increase for star schema workloads Native Execution Engine Caching

4.Delta Engine’s Caching § Automatically caches input SQL Spark Koalas DataFrame § Transcodes data into a more CPU-efficient format Query Optimizer fully leveraging NVMe SSDs Native § Up to 5X scan performance increase Execution Engine Caching

5. Spark SQL Koalas DataFrame Query Optimizer Native Execution Engine Query Optimizer

6.Hardware Workloads

7.Spark Summit 2015 Keynote

8.Hardware Changes since 2015 2010 2015 2020 50 MB/s 500 MB/s 16 GB/s Storage 10X (HDD) (SSD) (NVMe) Network 1 Gbps 10 Gbps 100 Gbps 10X CPU ~3 GHz ~3 GHz ~3 GHz L CPUs continue to be the bottleneck. How do we achieve next level performance?

9.Workload Trends Businesses are moving faster, and as a result organizations spend less time in data modeling, leading to worse performance: § Most columns don’t have “NOT NULL” defined § Strings are convenient, and many date columns are stored as strings § Data are more denormalized and continuously ingested Can we get both agility and performance?

10.Photon New execution engine for Delta Engine to accelerate Spark SQL Built from scratch in C++, for performance: § Vectorization: exploit data-level parallelism and instruction-level parallelism § Optimize for modern structured and semi-structured workloads

11.CPU 101 CPU frequency is not getting higher, but parallelisms are improving: § Data-level parallelism: SIMD register width ▪ MMX/SSE: 128 bits ▪ AVX2: 256 bits ▪ AVX512: 512 bits § Instruction-level parallelism: out-of-order window ▪ Sandy Bridge: 168 ▪ Haswell: 192 ▪ Skylake: 224 https://en.wikichip.org/wiki/File:skylake_buff_window.png

12.

13.Photon New execution engine to accelerate Spark SQL, as part of Delta Engine Built from scratch in C++, for performance: § Vectorization: exploit data-level parallelism and instruction-level parallelism § Optimize for modern structured and semi-structured workloads

14.Vectorization: Columnar In-memory Format Row Format Column Format 1 alex 4.1 1 2 3 4 2 bart 3.5 alex bart ala ankur 3 ala 6.4 3.5 4.1 6.4 9.9 4 ankur 9.9

15.Simple Example: select col1 + col3 from table for (int32_t i = 0; i < num_rows; ++i) { Col1 1 2 3 4 out[i] = col1[i] + col3[i]; } Col2 alex bart ala ankur Col3 3.5 4.1 6.4 9.9 Benefits: - Compact memory access (better prefetching and caching) Out 4.5 6.1 9.4 13.9 - Simpler code snippet, easier for compilers to optimize (e.g. loop unrolling)

16.Vectorized non-SIMD version SIMD version (“v” instructions)

17.Exploiting Data Level Parallelism col1 + col3: Billion rows processed per core per sec (higher is better) SIMD non-SIMD 0 1 2 3 4 5 6 7 8

18.Photon New execution engine to accelerate Spark SQL, as part of Delta Engine Built from scratch in C++, for performance: § Vectorization: exploit data-level parallelism and instruction-level parallelism § Optimize for modern structured and semi-structured workloads

19.Hash Table: Most Important Data Structure Example: select sum(value) from table group by key Hash Table Table (ht) for (int32_t i = 0; i < batchSize; ++i) { int32_t bucket = hash(keyCol[i]) % ht->size; key value e rob if (ht[bucket].key == keyCol[i]) { P ht[bucket].value += valueCol[i]; Hash ← Scan } } Note: ”Over-simplified”. No hash table collision.

20.Hash Table: Most Important Data Structure Example: select sum(value) from table group by key CPU breakdown for (int32_t i = 0; i < batchSize; ++i) { int32_t bucket = hash(keyCol[i]) % ht->size; Useful if (ht[bucket].key == keyCol[i]) { work } ht[bucket].value += valueCol[i]; Memory stalls 36% } 64%

21.How do we optimize the following? for (int32_t i = 0; i < batchSize; ++i) { int32_t bucket = hash(keyCol[i]) % ht->size; if (ht[bucket].key == keyCol[i]) { Loop body is large (key comparison can be ht[bucket].value += valueCol[i]; very expensive, e.g. comparing strings) } } § Instructions to access memory are intermixed with instructions to compute hash code, compare keys and addition. § Large loop body -> CPU sees fewer memory access instruction -> CPU executes fewer memory access at once. § Solution: make loops smaller.

22.How do we optimize the following? for (int32_t i = 0; i < batchSize; ++i) { int32_t bucket = hash(keyCol[i]) % ht->size; if (ht[bucket].key == keyCol[i]) { ht[bucket].value += valueCol[i]; } } for (int32_t i = 0; i < batchSize; ++i) { buckets[i] = hash(keyCol[i]) % ht->size; } for (int32_t i = 0; i < batchSize; ++i) { keys[i] = ht[buckets[i]].key; } for (int32_t i = 0; i < batchSize; ++i) { if (keys[i] == keyCol[i]) { Break the big loop into smaller loops ht[buckets[i]].value += valueCol[i]; } }

23.Exploiting Instruction Level Parallelism § Pipeline memory access, load 140 multiple memory addresses in 120 parallel 100 47 80 § Prefetch to eliminate cache- 60 misses 40 84 23 20 23 § Minimize TLB misses with huge 0 pages Traditional Systems Photon Memory stalls Useful work

24.TPC-DS 30TB Queries/Hour (Higher is better) 120 100 110 3.3x 80 60 speedup 40 20 32 0 Delta Engine with Photon Delta Engine w/o Photon

25.Photon New execution engine to accelerate Spark SQL, as part of Delta Engine Built from scratch in C++, for performance: § Vectorization: exploit data level parallelism and instruction-level parallelism § Optimize for modern structured and semi-structured workloads

26.Faster String Processing MBs processed per core per sec, UPPER() function (higher is better) JVM C++ 0 25 50 75 100 125 MBs processed per core per sec, SUBSTRING() function (higher is better) JVM C++ 0 100 200 300 400 500 Can we do better?

27.UTF-8 String Encoding UTF-8 is a variable-len encoding: A 1 byte 41 © 2 bytes c2 a9 3 bytes e8 be 9b ! 4 bytes f0 9f 92 a9 Great for memory (more compact), but computationally expensive: § substring(), ... needs to look at the bytes one by one. § If fixed length encoding, simply return the number of bytes.

28.Fixed Length vs Variable Length Performance Not realistic to ask users to define whether a column is ASCII vs UTF-8, even though most data are ASCII. Can we get UTF-8’s memory saving, and the performance of ASCII when data are ASCII?

29.Faster String Processing Goal: only pay the performance penalty when necessary. Separate processing into two steps: Step 1. Fully SIMD (AVX) ASCII detection. Runs at ~60GB/s/core. Step 2. If column batch is ASCII, run the specialized fixed-length version; otherwise, run the variable-length version. https://lemire.me/blog/2018/10/19/validating-utf-8-bytes-using-only-0-45-cycles-per-byte-avx-edition/