A Deep Dive into Query Execution Engine of Spark SQL

Spark SQL enables Spark to perform efficient and fault-tolerant relational query processing with analytics database technologies. The relational queries are compiled to the executable physical plans consisting of transformations and actions on RDDs with the generated Java code. The code is compiled to Java bytecode, executed at runtime by JVM and optimized by JIT to native machine code at runtime. This talk will take a deep dive into Spark SQL execution engine. The talk includes pipelined execution, whole-stage code generation, UDF execution, memory management, vectorized readers, lineage based RDD transformation and action.
展开查看详情

1.Deep Dive: Query Execution of Spark SQL Maryann Xue, Xingbo Jiang, Kris Mok Apr. 2019 1

2.About Us Software Engineers • Maryann Xue PMC of Apache Calcite & Apache Phoenix @maryannxue • Xingbo Jiang Apache Spark Committer @jiangxb1987 • Kris Mok OpenJDK Committer @rednaxelafx 2

3.Databricks Unified Analytics Platform DATABRICKS WORKSPACE Notebooks Jobs Models APIs Dashboards End to end ML lifecycle DATABRICKS RUNTIME Databricks Delta ML Frameworks Reliable & Scalable Simple & Integrated DATABRICKS CLOUD SERVICE

4. Databricks Customers Across Industries Financial Services Healthcare & Pharma Media & Entertainment Data & Analytics Services Technology Public Sector Retail & CPG Consumer Services Marketing & AdTech Energy & Industrial IoT

5.Apache Spark 3.x Spark Spark 3rd-party Spark ML Streaming Graph Libraries SQL SparkSession / DataFrame / Dataset APIs Catalyst Optimization & Tungsten Execution Data Source Connectors Spark Core 5

6.Apache Spark 3.x Spark Spark 3rd-party Spark ML Streaming Graph Libraries SQL SparkSession / DataFrame / Dataset APIs Catalyst Optimization & Tungsten Execution Data Source Connectors Spark Core 6

7.Spark SQL Engine Analysis -> Logical Optimization -> Physical Planning -> Code Generation -> Execution Runtime 7

8.Spark SQL Engine - Front End Analysis -> Logical Optimization -> Physical Planning -> Code Generation -> Execution Runtime Reference: A Deep Dive into Spark SQL’s Catalyst Optimizer, Yin Huai, Spark Summit 2017 8

9.Spark SQL Engine - Back End Analysis -> Logical Optimization -> Physical Planning -> Code Generation -> Execution Runtime 9

10.Agenda 10

11.Agenda Physical Planning 11

12.Physical Planning • Transform logical operators into physical operators • Choose between different physical alternatives - e.g., broadcast-hash-join vs. sort-merge-join • Includes physical traits of the execution engine - e.g., partitioning & ordering. • Some ops may be mapped into multiple physical nodes - e.g., partial agg —> shuffle —> final agg 1

13.A Physical Plan Example Scan B SELECT a1, sum(b1)FROM A Filter JOIN B ON A.key = B.key Scan A WHERE b1 < 1000 GROUP BY a1 BroadcastExchange Scan B BroadcastHashJoin Scan A Filter HashAggregate Join ShuffleExchange Aggregate HashAggregate 1

14.Scheduling a Physical Plan Job 1 Stage 1 Scan B Filter • Scalar subquery Broadcast exchange: BroadcastExchange Job 2 Stage 1 - Executed as separate jobs Scan A • Partition-local ops: BroadcastHashJoin - Executed in the same stage HashAggregate • Shuffle: ShuffleExchange Stage 2 - The stage boundary - A sync barrier across all nodes HashAggregate 1

15.Agenda Code Generation 15

16.Execution, Old: Volcano Iterator Model • Volcano iterator model - All ops implement the same interface, e.g., next() - next() on final op -> pull input from child op by calling child.next() -> goes on and on, ending up with a propagation of next() calls • Pros: Good abstraction; Easy to implement • Cons: Virtual function calls —> less efficient next() next() next() Scan Filter Project Result Iterator iterate 1

17.Execution, New: Whole-Stage Code Generation • Inspired by Thomas Neumann’s paper • Fuse a string of operators (oftentimes Scan the entire stage) into one WSCG op that runs the generated code. long count = 0; Filter for (item in sales) { if (price < 100) { • A general-purpose execution engine count += 1; just like Volcano model but without Project } Volcano’s performance downsides: } - No virtual function calls - Data in CPU registers Aggregate - Loop unrolling & SIMD

18.Execution Models: Old vs. New • Volcano iterator model: Pull model; Driven by the final operator next() next() next() Scan Filter Project Result Iterator iterate • WSCG model: Push model; Driven by the head/source operator next() Scan Filter Project Result Iterator iterate 1

19.A Physical Plan Example - WSCG Job 2 WSCG Job 1 Scan A WSCG BroadcastHashJoin Stage 1 Scan B Stage 1 HashAggregate Filter ShuffleExchange WSCG BroadcastExchange Stage 2 HashAggregate

20.Implementation • The top node WholeStageCodegenExec implements the iterator interface to interop with other code-gen or non-code-gen physical ops. • All underlying operators implement a code-generation interface: doProduce() & doConsume() • Dump the generated code: df.queryExecution.debug.codegen

21.Single dependency • A WSCG node contains a linear list of physical operators that support code generation. • No multi dependency between enclosed ops. • A WSCG node may consist of one or more pipelines. WSCG Pipeline 1 Pipeline 2 Op1 Op2 Op3 Op4 Op5

22.A Single Pipeline in WSCG • A string of non-blocking operators form a pipeline in WSCG • The head/source: - Implement doProduce() - the driving loop producing source data. • The rest: - doProduce() - fall through to head of the pipeline. - Implement doConsume() for its own processing logic. produce produce produce produce Op1 Op2 Op3 WSCG Generate Code consume consume consume

23. A Single Pipeline Example Scan SELECT sid FROM emps WHERE age < 36 produce while (table.hasNext()) { InternalRow row = table.next(); Filter produce Project produce WholeStageCodegen START: if (shouldStop()) return; produce Generated for RowIterator }

24. A Single Pipeline Example Scan SELECT sid FROM emps WHERE age < 36 produce consume while (table.hasNext()) { InternalRow row = table.next(); Filter if (row.getInt(2) < 36) { produce Project produce } WholeStageCodegen START: if (shouldStop()) return; produce Generated for RowIterator }

25. A Single Pipeline Example Scan SELECT sid FROM emps WHERE age < 36 produce consume while (table.hasNext()) { InternalRow row = table.next(); Filter if (row.getInt(2) < 36) { produce consume String sid = row.getString(0); rowWriter.write(0, sid); Project produce } WholeStageCodegen START: if (shouldStop()) return; produce Generated for RowIterator }

26. A Single Pipeline Example Scan SELECT sid FROM emps WHERE age < 36 produce consume while (table.hasNext()) { InternalRow row = table.next(); Filter if (row.getInt(2) < 36) { produce consume String sid = row.getString(0); rowWriter.write(0, sid); Project produce consume ret = rowWriter.getRow(); } WholeStageCodegen START: if (shouldStop()) return; produce Generated for RowIterator }

27.Multiple Pipelines in WSCG • Head (source) operator: • End (sink): RowIterator - The source, w/ or w/o input RDDs - Pulls result from the last pipeline - e.g., Scan, SortMergeJoin • Blocking operators: • Non-blocking operators: - End of the previous pipeline - In the middle of the pipeline - Start of a new pipeline - e.g., Filter, Project - e.g., HashAggregate, Sort WSCG source non-blocking blocking non-blocking sink Pipeline 1 Pipeline 2 RowIterator Op1 Op2 Op3 Op4 Op5

28.Blocking Operators in WSCG • A Blocking operator, e.g., HashAggregateExec, SortExec, break pipelines, so there may be multiple pipelines in one WSCG node. • A Blocking operator’s doConsume(): - Implement the callback to build intermediate result. • A Blocking operator’s doProduce(): - Consume the entire output from upstream to finish building the intermediate result. - Start a new loop and produce output for downstream based on the intermediate result.

29. A Blocking Operator Example - HashAgg SELECT age, count(*) FROM emps GROUP BY age HashAggregate doProduce() while (table.hasNext()) { InternalRow row = table.next(); child.produce() int age = row.getInt(2); Scan hashMap.insertOrIncrement(sid); } consume HashAggregate produce START: WholeStageCodegen produce