- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 视频嵌入链接 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
A Deep Dive into Query Execution Engine of Spark SQL
1 .Deep Dive: Query Execution of Spark SQL Maryann Xue, Xingbo Jiang, Kris Mok Apr. 2019 1
2 .About Us Software Engineers • Maryann Xue PMC of Apache Calcite & Apache Phoenix @maryannxue • Xingbo Jiang Apache Spark Committer @jiangxb1987 • Kris Mok OpenJDK Committer @rednaxelafx 2
3 .Databricks Unified Analytics Platform DATABRICKS WORKSPACE Notebooks Jobs Models APIs Dashboards End to end ML lifecycle DATABRICKS RUNTIME Databricks Delta ML Frameworks Reliable & Scalable Simple & Integrated DATABRICKS CLOUD SERVICE
4 . Databricks Customers Across Industries Financial Services Healthcare & Pharma Media & Entertainment Data & Analytics Services Technology Public Sector Retail & CPG Consumer Services Marketing & AdTech Energy & Industrial IoT
5 .Apache Spark 3.x Spark Spark 3rd-party Spark ML Streaming Graph Libraries SQL SparkSession / DataFrame / Dataset APIs Catalyst Optimization & Tungsten Execution Data Source Connectors Spark Core 5
6 .Apache Spark 3.x Spark Spark 3rd-party Spark ML Streaming Graph Libraries SQL SparkSession / DataFrame / Dataset APIs Catalyst Optimization & Tungsten Execution Data Source Connectors Spark Core 6
7 .Spark SQL Engine Analysis -> Logical Optimization -> Physical Planning -> Code Generation -> Execution Runtime 7
8 .Spark SQL Engine - Front End Analysis -> Logical Optimization -> Physical Planning -> Code Generation -> Execution Runtime Reference: A Deep Dive into Spark SQL’s Catalyst Optimizer, Yin Huai, Spark Summit 2017 8
9 .Spark SQL Engine - Back End Analysis -> Logical Optimization -> Physical Planning -> Code Generation -> Execution Runtime 9
10 .Agenda 10
11 .Agenda Physical Planning 11
12 .Physical Planning • Transform logical operators into physical operators • Choose between different physical alternatives - e.g., broadcast-hash-join vs. sort-merge-join • Includes physical traits of the execution engine - e.g., partitioning & ordering. • Some ops may be mapped into multiple physical nodes - e.g., partial agg —> shuffle —> final agg 1
13 .A Physical Plan Example Scan B SELECT a1, sum(b1)FROM A Filter JOIN B ON A.key = B.key Scan A WHERE b1 < 1000 GROUP BY a1 BroadcastExchange Scan B BroadcastHashJoin Scan A Filter HashAggregate Join ShuffleExchange Aggregate HashAggregate 1
14 .Scheduling a Physical Plan Job 1 Stage 1 Scan B Filter • Scalar subquery Broadcast exchange: BroadcastExchange Job 2 Stage 1 - Executed as separate jobs Scan A • Partition-local ops: BroadcastHashJoin - Executed in the same stage HashAggregate • Shuffle: ShuffleExchange Stage 2 - The stage boundary - A sync barrier across all nodes HashAggregate 1
15 .Agenda Code Generation 15
16 .Execution, Old: Volcano Iterator Model • Volcano iterator model - All ops implement the same interface, e.g., next() - next() on final op -> pull input from child op by calling child.next() -> goes on and on, ending up with a propagation of next() calls • Pros: Good abstraction; Easy to implement • Cons: Virtual function calls —> less efficient next() next() next() Scan Filter Project Result Iterator iterate 1
17 .Execution, New: Whole-Stage Code Generation • Inspired by Thomas Neumann’s paper • Fuse a string of operators (oftentimes Scan the entire stage) into one WSCG op that runs the generated code. long count = 0; Filter for (item in sales) { if (price < 100) { • A general-purpose execution engine count += 1; just like Volcano model but without Project } Volcano’s performance downsides: } - No virtual function calls - Data in CPU registers Aggregate - Loop unrolling & SIMD
18 .Execution Models: Old vs. New • Volcano iterator model: Pull model; Driven by the final operator next() next() next() Scan Filter Project Result Iterator iterate • WSCG model: Push model; Driven by the head/source operator next() Scan Filter Project Result Iterator iterate 1
19 .A Physical Plan Example - WSCG Job 2 WSCG Job 1 Scan A WSCG BroadcastHashJoin Stage 1 Scan B Stage 1 HashAggregate Filter ShuffleExchange WSCG BroadcastExchange Stage 2 HashAggregate
20 .Implementation • The top node WholeStageCodegenExec implements the iterator interface to interop with other code-gen or non-code-gen physical ops. • All underlying operators implement a code-generation interface: doProduce() & doConsume() • Dump the generated code: df.queryExecution.debug.codegen
21 .Single dependency • A WSCG node contains a linear list of physical operators that support code generation. • No multi dependency between enclosed ops. • A WSCG node may consist of one or more pipelines. WSCG Pipeline 1 Pipeline 2 Op1 Op2 Op3 Op4 Op5
22 .A Single Pipeline in WSCG • A string of non-blocking operators form a pipeline in WSCG • The head/source: - Implement doProduce() - the driving loop producing source data. • The rest: - doProduce() - fall through to head of the pipeline. - Implement doConsume() for its own processing logic. produce produce produce produce Op1 Op2 Op3 WSCG Generate Code consume consume consume
23 . A Single Pipeline Example Scan SELECT sid FROM emps WHERE age < 36 produce while (table.hasNext()) { InternalRow row = table.next(); Filter produce Project produce WholeStageCodegen START: if (shouldStop()) return; produce Generated for RowIterator }
24 . A Single Pipeline Example Scan SELECT sid FROM emps WHERE age < 36 produce consume while (table.hasNext()) { InternalRow row = table.next(); Filter if (row.getInt(2) < 36) { produce Project produce } WholeStageCodegen START: if (shouldStop()) return; produce Generated for RowIterator }
25 . A Single Pipeline Example Scan SELECT sid FROM emps WHERE age < 36 produce consume while (table.hasNext()) { InternalRow row = table.next(); Filter if (row.getInt(2) < 36) { produce consume String sid = row.getString(0); rowWriter.write(0, sid); Project produce } WholeStageCodegen START: if (shouldStop()) return; produce Generated for RowIterator }
26 . A Single Pipeline Example Scan SELECT sid FROM emps WHERE age < 36 produce consume while (table.hasNext()) { InternalRow row = table.next(); Filter if (row.getInt(2) < 36) { produce consume String sid = row.getString(0); rowWriter.write(0, sid); Project produce consume ret = rowWriter.getRow(); } WholeStageCodegen START: if (shouldStop()) return; produce Generated for RowIterator }
27 .Multiple Pipelines in WSCG • Head (source) operator: • End (sink): RowIterator - The source, w/ or w/o input RDDs - Pulls result from the last pipeline - e.g., Scan, SortMergeJoin • Blocking operators: • Non-blocking operators: - End of the previous pipeline - In the middle of the pipeline - Start of a new pipeline - e.g., Filter, Project - e.g., HashAggregate, Sort WSCG source non-blocking blocking non-blocking sink Pipeline 1 Pipeline 2 RowIterator Op1 Op2 Op3 Op4 Op5
28 .Blocking Operators in WSCG • A Blocking operator, e.g., HashAggregateExec, SortExec, break pipelines, so there may be multiple pipelines in one WSCG node. • A Blocking operator’s doConsume(): - Implement the callback to build intermediate result. • A Blocking operator’s doProduce(): - Consume the entire output from upstream to finish building the intermediate result. - Start a new loop and produce output for downstream based on the intermediate result.
29 . A Blocking Operator Example - HashAgg SELECT age, count(*) FROM emps GROUP BY age HashAggregate doProduce() while (table.hasNext()) { InternalRow row = table.next(); child.produce() int age = row.getInt(2); Scan hashMap.insertOrIncrement(sid); } consume HashAggregate produce START: WholeStageCodegen produce