Vectorized Query Execution in Apache Spark at Facebook

A standard query execution system processes one row at a time. Vectorized query execution batches multiples rows together in a columnar format, and each operator uses simple loops to iterate over data within a batch. This feature greatly reduces the CPU usage for reading, writing and query operations like scanning, filtering. In this talk, we will take a deep dive into Facebook’s ORC-based vectorized reader and writer implementation, discuss how vectorization affects performance of various data types in Hive/Spark, and quantify the improvements vectorization brings to the Facebook Warehouse.
展开查看详情

1. Vectorized Query Execution in Apache Spark at Facebook Chen Yang Spark Summit | 04/24/2019

2.About me Chen Yang • Software Engineer at Facebook (Data Warehouse Team) • Working on Spark execution engine improvements • Worked on Hive and ORC in the past

3.Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead

4.Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead

5.Spark at Facebook • Largest SQL query engine at Facebook (by CPU usage) • We use Spark on disaggregated compute/storage clusters • Scale/upgrade clusters independently • Efficiency is top priority for Spark at Facebook given the scale • Compute efficiency: Optimize CPU and Memory usage • Storage efficiency: Optimize on disk size and IOPS

6.Spark at Facebook • Compute efficiency : Optimize CPU and Memory usage • Significant percentage (>40%) of CPU time is spent in reading/writing • Storage efficiency : Optimize on disk size and IOPS • Storage format have big impact on on-disk size and IOPS • Facebook data warehouse use ORC format

7.Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead

8.Row format vs Columnar format Logical table Row format user_id os 1 android 1 ios 3 ios 6 android 1 android 1 ios Columnar format 3 ios 1 1 3 6 android ios ios android 6 android

9.Row format vs Columnar format Logical table Row format user_id os 1 android 1 ios 3 ios 6 android 1 android On disk row format: In memory row format: csv UnsafeRow, OrcLazyRow 1 ios Columnar format 3 ios 1 1 3 6 android ios ios android 6 android

10.Row format vs Columnar format Logical table Row format user_id os 1 android 1 ios 3 ios 6 android 1 android On disk row format: In memory row format: csv UnsafeRow, OrcLazyRow 1 ios Columnar format 3 ios 1 1 3 6 android ios ios android 6 android On disk columnar format: In memory columnar format: Parquet, ORC Arrow, VectorizedRowBatch

11.Columnar on disk format Logical table • Better compression ratio user_id os • Type specific encoding 1 android • Minimize I/O 1 ios • Projection push down (column pruning) 3 ios • Predicate push down (filtering based on stats) 6 android Columnar format 1 1 3 6 android ios ios android

12.Better compression ratio • Encoding Dictionary encoding • Dictionary encoding • Run Length encoding android ios ios android • Delta encoding • Compression 0 android 0 1 1 0 • zstd 1 ios • zlib • snappy

13.Minimize I/O • Projection push down Column pruning Filtering based on stats Minimize I/O • Column pruning user_id os user_id os user_id os • Predicate push down • Filtering based on stats 1 android 1 android 1 android 1 ios 1 ios 1 ios SELECT 3 ios 3 ios 3 ios COUNT(*) 6 android 6 android 6 android FROM user_os_info WHERE user_id = 3

14.ORC in open source vs at Facebook Open source: • ORC(Optimized Row Columnar) is a self-describing type-aware columnar file format designed for Hadoop workloads. • It is optimized for large streaming reads, but with integrated support for finding required rows quickly. Facebook: • Fork of Apache ORC, Also called DWRF • Various perf improvements (IOPS, memory etc) • Some special format for specific use case in Facebook (FlatMap)

15.Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead

16.Row-at-a-time vs vector-at-a-time SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 user_id os 1 android 1 ios 3 ios 6 android

17.Row-at-a-time processing Orc Reader Row SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 user_id os 1 android 1 ios 3 ios Row 6 android Orc Writer

18.Row-at-a-time processing Orc Reader OrcLazyRow UnsafeRow SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 UnsafeRow OrcRow Orc Writer

19.Row-at-a-time processing Orc Reader OrcLazyRow UnsafeRow SELECT private void agg_doAggregateWithoutKey() { while (inputadapter_input.hasNext()) { COUNT(*) inputadapter_row = inputadapter_input.next(); value = inputadapter_row.getLong(0); FROM user_os_info // do aggr } WHERE user_id = 3 } UnsafeRow OrcRow Orc Writer

20.Vector-at-a-time processing Orc Reader Vector SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 user_id os 1 android 1 ios 3 ios Vector 6 android Orc Writer

21.Vector-at-a-time processing Orc Reader VectorizedRowBatch SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 VectorizedRowBatch 1 1 3 6 VectorizedRowBatch Orc Writer

22.Vector-at-a-time processing Orc Reader VectorizedRowBatch SELECT private void agg_doAggregateWithoutKey() { while (orc_scan_batch != null) { COUNT(*) int numRows = orc_scan_batch.size; while (orc_scan_batch_idx < numRows) { FROM user_os_info value = orc_scan_col_0.vector[orc_scan_row_idx]; orc_scan_batch_idx++; WHERE user_id = 3 // do aggr } nextBatch(); } } VectorizedRowBatch private void nextBatch() { 1 1 3 6 if (orc_scan_input.hasNext()) { batch = orc_scan_input.next(); col_0 = orc_scan_batch.cols[0]; VectorizedRowBatch } Orc Writer }

23. Row-at-a-time vs vector-at-a-time • Lower overhead per row Row-at-a-time processing Vector-at-a-time processing private void agg_doAggregateWithoutKey() { private void agg_doAggregateWithoutKey() { • Avoid virtual function while (inputadapter_input.hasNext()) { while (orc_scan_batch != null) { dispatch cost per row int numRows = orc_scan_batch.size; row = inputadapter_input.next(); • Better cache locality while (orc_scan_batch_idx < numRows) { value = inputadapter_row.getLong(0); • Avoid unnecessary // do aggr value = col_0.vector[orc_scan_row_idx]; orc_scan_batch_idx++; copy/conversion } // do aggr } • No need to convert between } nextBatch(); OrcLazyRow and UnsafeRow } } private void nextBatch() { if (orc_scan_input.hasNext()) { batch = orc_scan_input.next(); col_0 = orc_scan_batch.cols[0]; } }

24. Row-at-a-time vs vector-at-a-time • Lower overhead per row Row-at-a-time processing Vector-at-a-time processing • Avoid virtual function dispatch cost per row Orc Reader Orc Reader • Better cache locality OrcLazyRow • Avoid unnecessary UnsafeRow RowBatch copy/conversion WholeStage WholeStage • No need to convert between CodeGen CodeGen OrcLazyRow and UnsafeRow

25.Vectorized ORC reader/writer in open source vs at Facebook Open source: • Vectorized reader only support simple type • Vectorized writer is not supported Facebook: • Vectorized reader support all types, integrated with codegen • Vectorized writer support all types, plan to integrated with codegen

26.Vector-at-a-time + Whole Stage CodeGen • Currently only HiveTableScan and InsertIntoHiveTable understands ORC columnar format • Most of Spark Operators still process one row at a time

27.Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead

28.Vectorized reader + vector-at-a-time microbenchmark Up to 8x speed up when reading 10M row from a single column table

29.Vectorized reader and writer + vector-at-a-time microbenchmark Up to 3.5x speed up when reading and writing 10M row from a single column table