- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 视频嵌入链接 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Vectorized Query Execution in Apache Spark at Facebook
展开查看详情
1 . Vectorized Query Execution in Apache Spark at Facebook Chen Yang Spark Summit | 04/24/2019
2 .About me Chen Yang • Software Engineer at Facebook (Data Warehouse Team) • Working on Spark execution engine improvements • Worked on Hive and ORC in the past
3 .Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
4 .Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
5 .Spark at Facebook • Largest SQL query engine at Facebook (by CPU usage) • We use Spark on disaggregated compute/storage clusters • Scale/upgrade clusters independently • Efficiency is top priority for Spark at Facebook given the scale • Compute efficiency: Optimize CPU and Memory usage • Storage efficiency: Optimize on disk size and IOPS
6 .Spark at Facebook • Compute efficiency : Optimize CPU and Memory usage • Significant percentage (>40%) of CPU time is spent in reading/writing • Storage efficiency : Optimize on disk size and IOPS • Storage format have big impact on on-disk size and IOPS • Facebook data warehouse use ORC format
7 .Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
8 .Row format vs Columnar format Logical table Row format user_id os 1 android 1 ios 3 ios 6 android 1 android 1 ios Columnar format 3 ios 1 1 3 6 android ios ios android 6 android
9 .Row format vs Columnar format Logical table Row format user_id os 1 android 1 ios 3 ios 6 android 1 android On disk row format: In memory row format: csv UnsafeRow, OrcLazyRow 1 ios Columnar format 3 ios 1 1 3 6 android ios ios android 6 android
10 .Row format vs Columnar format Logical table Row format user_id os 1 android 1 ios 3 ios 6 android 1 android On disk row format: In memory row format: csv UnsafeRow, OrcLazyRow 1 ios Columnar format 3 ios 1 1 3 6 android ios ios android 6 android On disk columnar format: In memory columnar format: Parquet, ORC Arrow, VectorizedRowBatch
11 .Columnar on disk format Logical table • Better compression ratio user_id os • Type specific encoding 1 android • Minimize I/O 1 ios • Projection push down (column pruning) 3 ios • Predicate push down (filtering based on stats) 6 android Columnar format 1 1 3 6 android ios ios android
12 .Better compression ratio • Encoding Dictionary encoding • Dictionary encoding • Run Length encoding android ios ios android • Delta encoding • Compression 0 android 0 1 1 0 • zstd 1 ios • zlib • snappy
13 .Minimize I/O • Projection push down Column pruning Filtering based on stats Minimize I/O • Column pruning user_id os user_id os user_id os • Predicate push down • Filtering based on stats 1 android 1 android 1 android 1 ios 1 ios 1 ios SELECT 3 ios 3 ios 3 ios COUNT(*) 6 android 6 android 6 android FROM user_os_info WHERE user_id = 3
14 .ORC in open source vs at Facebook Open source: • ORC(Optimized Row Columnar) is a self-describing type-aware columnar file format designed for Hadoop workloads. • It is optimized for large streaming reads, but with integrated support for finding required rows quickly. Facebook: • Fork of Apache ORC, Also called DWRF • Various perf improvements (IOPS, memory etc) • Some special format for specific use case in Facebook (FlatMap)
15 .Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
16 .Row-at-a-time vs vector-at-a-time SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 user_id os 1 android 1 ios 3 ios 6 android
17 .Row-at-a-time processing Orc Reader Row SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 user_id os 1 android 1 ios 3 ios Row 6 android Orc Writer
18 .Row-at-a-time processing Orc Reader OrcLazyRow UnsafeRow SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 UnsafeRow OrcRow Orc Writer
19 .Row-at-a-time processing Orc Reader OrcLazyRow UnsafeRow SELECT private void agg_doAggregateWithoutKey() { while (inputadapter_input.hasNext()) { COUNT(*) inputadapter_row = inputadapter_input.next(); value = inputadapter_row.getLong(0); FROM user_os_info // do aggr } WHERE user_id = 3 } UnsafeRow OrcRow Orc Writer
20 .Vector-at-a-time processing Orc Reader Vector SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 user_id os 1 android 1 ios 3 ios Vector 6 android Orc Writer
21 .Vector-at-a-time processing Orc Reader VectorizedRowBatch SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 VectorizedRowBatch 1 1 3 6 VectorizedRowBatch Orc Writer
22 .Vector-at-a-time processing Orc Reader VectorizedRowBatch SELECT private void agg_doAggregateWithoutKey() { while (orc_scan_batch != null) { COUNT(*) int numRows = orc_scan_batch.size; while (orc_scan_batch_idx < numRows) { FROM user_os_info value = orc_scan_col_0.vector[orc_scan_row_idx]; orc_scan_batch_idx++; WHERE user_id = 3 // do aggr } nextBatch(); } } VectorizedRowBatch private void nextBatch() { 1 1 3 6 if (orc_scan_input.hasNext()) { batch = orc_scan_input.next(); col_0 = orc_scan_batch.cols[0]; VectorizedRowBatch } Orc Writer }
23 . Row-at-a-time vs vector-at-a-time • Lower overhead per row Row-at-a-time processing Vector-at-a-time processing private void agg_doAggregateWithoutKey() { private void agg_doAggregateWithoutKey() { • Avoid virtual function while (inputadapter_input.hasNext()) { while (orc_scan_batch != null) { dispatch cost per row int numRows = orc_scan_batch.size; row = inputadapter_input.next(); • Better cache locality while (orc_scan_batch_idx < numRows) { value = inputadapter_row.getLong(0); • Avoid unnecessary // do aggr value = col_0.vector[orc_scan_row_idx]; orc_scan_batch_idx++; copy/conversion } // do aggr } • No need to convert between } nextBatch(); OrcLazyRow and UnsafeRow } } private void nextBatch() { if (orc_scan_input.hasNext()) { batch = orc_scan_input.next(); col_0 = orc_scan_batch.cols[0]; } }
24 . Row-at-a-time vs vector-at-a-time • Lower overhead per row Row-at-a-time processing Vector-at-a-time processing • Avoid virtual function dispatch cost per row Orc Reader Orc Reader • Better cache locality OrcLazyRow • Avoid unnecessary UnsafeRow RowBatch copy/conversion WholeStage WholeStage • No need to convert between CodeGen CodeGen OrcLazyRow and UnsafeRow
25 .Vectorized ORC reader/writer in open source vs at Facebook Open source: • Vectorized reader only support simple type • Vectorized writer is not supported Facebook: • Vectorized reader support all types, integrated with codegen • Vectorized writer support all types, plan to integrated with codegen
26 .Vector-at-a-time + Whole Stage CodeGen • Currently only HiveTableScan and InsertIntoHiveTable understands ORC columnar format • Most of Spark Operators still process one row at a time
27 .Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
28 .Vectorized reader + vector-at-a-time microbenchmark Up to 8x speed up when reading 10M row from a single column table
29 .Vectorized reader and writer + vector-at-a-time microbenchmark Up to 3.5x speed up when reading and writing 10M row from a single column table