- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
How to Extend Apache Spark with Customized Optimizations
展开查看详情
1 .How to extend Spark with customized optimizations Sunitha Kambhampati, IBM #UnifiedAnalytics #SparkAISummit
2 .Center for Open Source Data and AI Technologies CODAIT codait.org IBM Watson West Building CODAIT aims to make AI solutions dramatically 505 Howard St. easier to create, deploy, and manage in the San Francisco, CA enterprise. Relaunch of the IBM Spark Technology Center (STC) to reflect expanded mission. Improving Enterprise AI Lifecycle in Open Source We contribute to foundational open source software across the enterprise AI lifecycle. 36 open-source developers! https://ibm.biz/BdzF6Q
3 .Agenda • Introduce Spark Extension Points API • Deep Dive into the details – What you can do – How to use it – What things you need to be aware of • Enhancements to the API – Why – Performance results
4 .I want to extend Spark • Performance benefits – Support for informational referential integrity (RI) constraints – Add Data Skipping Indexes • Enabling Third party applications – Application uses Spark but it requires some additions or small changes to Spark
5 .Problem You have developed customizations to Spark. How do you add it to your Spark cluster?
6 .Possible Solutions • Option 1: Get the code merged to Apache Spark – Maybe it is application specific – Maybe it is a value add – Not something that can be merged into Spark • Option 2: Modify Spark code, fork it – Maintenance overhead • Extensible solution: Use Spark’s Extension Points API
7 .Spark Extension Points API • Added in Spark 2.2 in SPARK-18127 • Pluggable & Extensible • Extend SparkSession with custom optimizations • Marked as Experimental API – relatively stable – has not seen any changes except addition of more customization
8 . Query Execution SQL Query Unresolved Analyzed DataFrame Logical ANALYZER Logical Plan Plan ML Rules
9 . Query Execution Unresolved Analyzed Optimized Physical Logical Plan Logical Plan Logical Plan Plan Parser Analyzer Optimizer SparkPlanner Spark Rules Rules Strategies
10 . Supported Customizations Parser Analyzer Optimizer SparkPlanner Rules Spark Rules Strategies Custom Custom Custom Custom Spark Parser Rules Rules Strategies
11 .Extensions API: At a High level • New SparkSessionExtensions Class – Methods to pass the customizations – Holds the customizations • Pass customizations to Spark – withExtensions method in SparkSession.builder
12 .SparkSessionExtensions • @DeveloperApi @Experimental @InterfaceStability.Unstable • Inject Methods – Pass the custom user rules to Spark • Build Methods – Pass the rules to Spark components – Used by Spark Internals
13 . Extension Hooks: Inject Methods Parser Analyzer Optimizer SparkPlanner injectParser injectOptimizerRule injectPlannerStrategy injectResolutionRule New in master, injectCheckRule SPARK-25560 injectPostHocResolutionRule injectFunction
14 .Pass custom rules to SparkSession • Use ‘withExtensions’ in SparkSession.Builder def withExtensions( f: SparkSessionExtensions => Unit): Builder • Use the Spark configuration parameter – spark.sql.extensions • Takes a class name that implements Function1[SparkSessionExtensions, Unit]
15 .Deep Dive
16 . Use Case #1 You want to add your own optimization rule to Spark’s Catalyst Optimizer
17 .Add your custom optimizer rule • Step 1: Implement your optimizer rule case class GroupByPushDown(spark: SparkSession) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { …. }} • Step 2: Create your ExtensionsBuilder function type ExtensionsBuilder = SparkSessionExtensions => Unit val f: ExtensionsBuilder = { e => e.injectOptimizerRule(GroupByPushDown)} • Step 3: Use the withExtensions method in SparkSession.builder to create your custom SparkSession val spark = SparkSession.builder().master(..).withExtensions(f).getOrCreate()
18 .How does the rule get added? • Catalyst Optimizer – Rules are grouped in Batches (ie RuleExecutor.Batch) – one of the fixed batch has a placeholder to add custom optimizer rules – passes in the extendedOperatorOptimizationRules to the batch. def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] • SparkSession stores the SparkSessionExtensions in transient class variable extensions • The SparkOptimizer instance gets created during the SessionState creation for the SparkSession – overrides the extendedOperatorOptimizationRules method to include the customized rules – Check the optimizer method in BaseSessionStateBuilder
19 .Things to Note • Rule gets added to a predefined batch • Batch here refers to RuleExecutor.Batch • In Master, it is to the following batches: – “Operator Optimization before Inferring Filters” – “Operator Optimization after Inferring Filters” • Check the defaultBatches method in Optimizer class
20 . Use Case #2 You want to add some parser extensions
21 .Parser Customization • Step 1: Implement your parser customization case class RIExtensionsParser( spark: SparkSession, delegate: ParserInterface) extends ParserInterface { …} • Step 2: Create your ExtensionsBuilder function type ExtensionsBuilder = SparkSessionExtensions => Unit val f: ExtensionsBuilder = { e => e.injectParser(RIExtensionsParser)} • Step 3: Use the withExtensions method in SparkSession.builder to create your custom SparkSession val spark = SparkSession.builder().master("…").withExtensions(f).getOrCreate()
22 .How do the parser extensions work? • Customize the parser for any new syntax to support • Delegate rest of the Spark SQL syntax to the SparkSqlParser • sqlParser is created by calling the buildParser on the extensions object in the SparkSession – See sqlParser in BaseSessionStateBuilder class – SparkSqlParser (Default Spark Parser) is passed in along with the SparkSession
23 . Use Case #3 You want to add some specific checks in the Analyzer
24 .Analyzer Customizations • Analyzer Rules injectResolutionRule • PostHocResolutionRule injectPostHocResolutionRule • CheckRules injectCheckRule
25 .Analyzer Rule Customization • Step 1: Implement your Analyzer rule case class MyRIRule(spark: SparkSession) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { …. }} • Step 2: Create your ExtensionsBuilder function type ExtensionsBuilder = SparkSessionExtensions => Unit val f: ExtensionsBuilder = { e => e.injectResolutionRule(MyRIRule)} • Step 3: Use the withExtensions method in SparkSession.builder to create your custom SparkSession val spark = SparkSession.builder().master("..").withExtensions(f).getOrCreate
26 .How is the rule added to the Analyzer? • Analyzer has rules in batches – Batch has a placeholder extendedResolutionRules to add custom rules – Batch “Post-Hoc Resolution” for postHocResolutionRules • SparkSession stores the SparkSessionExtensions in extensions • When SessionState is created, the custom rules are passed to the Analyzer by overriding the following class member variables – val extendedResolutionRules – val postHocResolutionRules – val extendedCheckRules • Check the BaseSessionStateBuilder.analyzer method • Check the HiveSessionStateBuilder.analyzer method
27 .Things to Note • Custom resolution rule gets added in the end to ‘Resolution’ Batch • The checkRules will get called in the end of the checkAnalysis method after all the spark checks are done • In Analyzer.checkAnalysis method: extendedCheckRules.foreach(_(plan))
28 . Use Case #4 You want to add custom planning strategies
29 .Add new physical plan strategy • Step1: Implement your new physical plan Strategy class case class IdxStrategy(spark: SparkSession) extends SparkStrategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = { ….. } } • Step 2: Create your ExtensionsBuilder function type ExtensionsBuilder = SparkSessionExtensions => Unit val f: ExtensionsBuilder = { e => e.injectPlannerStrategy(IdxStrategy)} • Step 3: Use the withExtensions method in SparkSession.builder to create your custom SparkSession val spark = SparkSession.builder().master(..).withExtensions(f).getOrCreate()