How to Extend Apache Spark with Customized Optimizations

There are a growing set of optimization mechanisms that allow you to achieve competitive SQL performance. Spark has extension points that help third parties to add customizations and optimizations without needing these optimizations to be merged into Apache Spark. This is very powerful and helps extensibility. We have added some enhancements to the existing extension points framework to enable some fine grained control. This talk will be a deep dive at the extension points that is available in Spark today. We will also talk about the enhancements to this API that we developed to help make this API more powerful. This talk will be of benefit to developers who are looking to customize Spark in their deployments.
展开查看详情

1.How to extend Spark with customized optimizations Sunitha Kambhampati, IBM #UnifiedAnalytics #SparkAISummit

2.Center for Open Source Data and AI Technologies CODAIT codait.org IBM Watson West Building CODAIT aims to make AI solutions dramatically 505 Howard St. easier to create, deploy, and manage in the San Francisco, CA enterprise. Relaunch of the IBM Spark Technology Center (STC) to reflect expanded mission. Improving Enterprise AI Lifecycle in Open Source We contribute to foundational open source software across the enterprise AI lifecycle. 36 open-source developers! https://ibm.biz/BdzF6Q

3.Agenda •  Introduce Spark Extension Points API •  Deep Dive into the details –  What you can do –  How to use it –  What things you need to be aware of •  Enhancements to the API –  Why –  Performance results

4.I want to extend Spark •  Performance benefits –  Support for informational referential integrity (RI) constraints –  Add Data Skipping Indexes •  Enabling Third party applications –  Application uses Spark but it requires some additions or small changes to Spark

5.Problem You have developed customizations to Spark. How do you add it to your Spark cluster?

6.Possible Solutions •  Option 1: Get the code merged to Apache Spark –  Maybe it is application specific –  Maybe it is a value add –  Not something that can be merged into Spark •  Option 2: Modify Spark code, fork it –  Maintenance overhead •  Extensible solution: Use Spark’s Extension Points API

7.Spark Extension Points API •  Added in Spark 2.2 in SPARK-18127 •  Pluggable & Extensible •  Extend SparkSession with custom optimizations •  Marked as Experimental API –  relatively stable –  has not seen any changes except addition of more customization

8. Query Execution SQL Query Unresolved Analyzed DataFrame Logical ANALYZER Logical Plan Plan ML Rules

9. Query Execution Unresolved Analyzed Optimized Physical Logical Plan Logical Plan Logical Plan Plan Parser Analyzer Optimizer SparkPlanner Spark Rules Rules Strategies

10. Supported Customizations Parser Analyzer Optimizer SparkPlanner Rules Spark Rules Strategies Custom Custom Custom Custom Spark Parser Rules Rules Strategies

11.Extensions API: At a High level •  New SparkSessionExtensions Class –  Methods to pass the customizations –  Holds the customizations •  Pass customizations to Spark –  withExtensions method in SparkSession.builder

12.SparkSessionExtensions •  @DeveloperApi @Experimental @InterfaceStability.Unstable •  Inject Methods –  Pass the custom user rules to Spark •  Build Methods –  Pass the rules to Spark components –  Used by Spark Internals

13. Extension Hooks: Inject Methods Parser Analyzer Optimizer SparkPlanner injectParser injectOptimizerRule injectPlannerStrategy injectResolutionRule New in master, injectCheckRule SPARK-25560 injectPostHocResolutionRule injectFunction

14.Pass custom rules to SparkSession •  Use ‘withExtensions’ in SparkSession.Builder def withExtensions( f: SparkSessionExtensions => Unit): Builder •  Use the Spark configuration parameter –  spark.sql.extensions •  Takes a class name that implements Function1[SparkSessionExtensions, Unit]

15.Deep Dive

16. Use Case #1 You want to add your own optimization rule to Spark’s Catalyst Optimizer

17.Add your custom optimizer rule •  Step 1: Implement your optimizer rule case class GroupByPushDown(spark: SparkSession) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { …. }} •  Step 2: Create your ExtensionsBuilder function type ExtensionsBuilder = SparkSessionExtensions => Unit val f: ExtensionsBuilder = { e => e.injectOptimizerRule(GroupByPushDown)} •  Step 3: Use the withExtensions method in SparkSession.builder to create your custom SparkSession val spark = SparkSession.builder().master(..).withExtensions(f).getOrCreate()

18.How does the rule get added? •  Catalyst Optimizer –  Rules are grouped in Batches (ie RuleExecutor.Batch) –  one of the fixed batch has a placeholder to add custom optimizer rules –  passes in the extendedOperatorOptimizationRules to the batch. def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] •  SparkSession stores the SparkSessionExtensions in transient class variable extensions •  The SparkOptimizer instance gets created during the SessionState creation for the SparkSession –  overrides the extendedOperatorOptimizationRules method to include the customized rules –  Check the optimizer method in BaseSessionStateBuilder

19.Things to Note •  Rule gets added to a predefined batch •  Batch here refers to RuleExecutor.Batch •  In Master, it is to the following batches: –  “Operator Optimization before Inferring Filters” –  “Operator Optimization after Inferring Filters” •  Check the defaultBatches method in Optimizer class

20. Use Case #2 You want to add some parser extensions

21.Parser Customization •  Step 1: Implement your parser customization case class RIExtensionsParser( spark: SparkSession, delegate: ParserInterface) extends ParserInterface { …} •  Step 2: Create your ExtensionsBuilder function type ExtensionsBuilder = SparkSessionExtensions => Unit val f: ExtensionsBuilder = { e => e.injectParser(RIExtensionsParser)} •  Step 3: Use the withExtensions method in SparkSession.builder to create your custom SparkSession val spark = SparkSession.builder().master("…").withExtensions(f).getOrCreate()

22.How do the parser extensions work? •  Customize the parser for any new syntax to support •  Delegate rest of the Spark SQL syntax to the SparkSqlParser •  sqlParser is created by calling the buildParser on the extensions object in the SparkSession –  See sqlParser in BaseSessionStateBuilder class –  SparkSqlParser (Default Spark Parser) is passed in along with the SparkSession

23. Use Case #3 You want to add some specific checks in the Analyzer

24.Analyzer Customizations •  Analyzer Rules injectResolutionRule •  PostHocResolutionRule injectPostHocResolutionRule •  CheckRules injectCheckRule

25.Analyzer Rule Customization •  Step 1: Implement your Analyzer rule case class MyRIRule(spark: SparkSession) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { …. }} •  Step 2: Create your ExtensionsBuilder function type ExtensionsBuilder = SparkSessionExtensions => Unit val f: ExtensionsBuilder = { e => e.injectResolutionRule(MyRIRule)} •  Step 3: Use the withExtensions method in SparkSession.builder to create your custom SparkSession val spark = SparkSession.builder().master("..").withExtensions(f).getOrCreate

26.How is the rule added to the Analyzer? •  Analyzer has rules in batches –  Batch has a placeholder extendedResolutionRules to add custom rules –  Batch “Post-Hoc Resolution” for postHocResolutionRules •  SparkSession stores the SparkSessionExtensions in extensions •  When SessionState is created, the custom rules are passed to the Analyzer by overriding the following class member variables –  val extendedResolutionRules –  val postHocResolutionRules –  val extendedCheckRules •  Check the BaseSessionStateBuilder.analyzer method •  Check the HiveSessionStateBuilder.analyzer method

27.Things to Note •  Custom resolution rule gets added in the end to ‘Resolution’ Batch •  The checkRules will get called in the end of the checkAnalysis method after all the spark checks are done •  In Analyzer.checkAnalysis method: extendedCheckRules.foreach(_(plan))

28. Use Case #4 You want to add custom planning strategies

29.Add new physical plan strategy •  Step1: Implement your new physical plan Strategy class case class IdxStrategy(spark: SparkSession) extends SparkStrategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = { ….. } } •  Step 2: Create your ExtensionsBuilder function type ExtensionsBuilder = SparkSessionExtensions => Unit val f: ExtensionsBuilder = { e => e.injectPlannerStrategy(IdxStrategy)} •  Step 3: Use the withExtensions method in SparkSession.builder to create your custom SparkSession val spark = SparkSession.builder().master(..).withExtensions(f).getOrCreate()