Making Nested Columns as First Citizen in Apache Spark SQL

Apple Siri is the world’s largest virtual assistant service powering every iPhone, iPad, Mac, Apple TV, Apple Watch, and HomePod. We use large amounts of data to provide our users the best possible personalized experience. Our raw event data is cleaned and pre-joined into an unified data for our data consumers to use. To keep the rich hierarchical structure of the data, our data schemas are very deep nested structures. In this talk, we will discuss how Spark handles nested structures in Spark 2.4, and we’ll show the fundamental design issues in reading nested fields which is not being well considered when Spark SQL was designed. This results in Spark SQL reading unnecessary data in many operations. Given that Siri’s data is super nested and humongous, this soon becomes a bottleneck in our pipelines. Then we will talk about the various approaches we have taken to tackle this problem. By making nested columns as first citizen in Spark SQL, we can achieve dramatic performance gain. In some of our production queries, the speed-up can be 20x in wall clock time and 8x less data being read. All of our work will be open source, and some has already been merged into upstream.
展开查看详情

1.Making Nested Columns as First • Citizens in Apache Spark SQL Cesar Delgado @hpcfarmer DB Tsai @dbtsai Spark+AI SF © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

2. Siri The world’s most popular intelligent assistant service powering every iPhone, iPad, Mac, Apple TV, Apple Watch, and HomePod © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

3. Siri Open Source Team • We’re Spark, Hadoop, HBase PMCs / Committers / Contributors • We’re the advocate for Open Source • Pushing our internal changes back to the upstreams • Working with the communities to review pull requests, develop new features and bug fixes © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

4. Siri Data • Machine learning is used to personalize your experience throughout your day • We believe privacy is a fundamental human right © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

5. Siri Scale • Large amounts of requests, Data Centers all over the world • Hadoop / Yarn Cluster has thousands of nodes • HDFS has hundred of PB • 100's TB of raw event data per day • More than 90% of jobs are Spark • Less than 10% are legacy Pig and MapReduce jobs © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

6. Details about our data • Deeply nested relational model data with couple top level columns • The total nested fields are more than 2k • Stored in Parquet format partitioned by UTC day • Most queries are only for a small subset of the data © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

7. An Example of Hierarchically Organized Table Real estate information can be naturally modeled by case class Address(houseNumber: Int, streetAddress: String, city: String, state: String, zipCode: String) case class Facts(price: Int, size: Int, yearBuilt: Int) case class School(name: String) case class Home(address: Address, facts: Facts, schools: List[School]) © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

8. Nested SQL Schema sql("select * from homes”).printSchema() root |-- address: struct (nullable = true) | |-- houseNumber: integer (nullable = true) | |-- streetAddress: string (nullable = true) | |-- city: string (nullable = true) | |-- state: string (nullable = true) | |-- zipCode: string (nullable = true) |-- facts: struct (nullable = true) | |-- price: integer (nullable = true) | |-- size: integer (nullable = true) | |-- yearBuilt: integer (nullable = true) |-- schools: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

9. Find cities with houses worth more than 2M sql("select address.city from homes where facts.price > 2000000”) .explain(true) == Physical Plan == *(1) Project [address#55.city AS city#75] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56], DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts)], ReadSchema: struct<address:struct<houseNumber:int,streetAddress:string, city:string,state:string,zipCode:strin…, facts:struct(address:int…)> • We only need two nested columns, address.city and facts.prices • But entire address and facts are read © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

10. [SPARK-4502], [SPARK-25363] Parquet with Nested Columns • Parquet is a columnar storage format with complex nested data structures in mind • Support very efficient compression and encoding schemes • As a columnar format, each nested column is stored separately as if it's a flattened table • No easy way to cherry pick couple nested columns in Spark • Foundation - Allow reading subset of nested columns right after Parquet FileScan © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

11. Find cities with houses worth more than 2M • With [SPARK-4502], [SPARK-25363] sql("select address.city from homes where facts.price > 2000000”) == Physical Plan == *(1) Project [address#55.city AS city#77] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56] DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts)], ReadSchema: struct<address:struct<city:string>,facts:struct<price:int>> • Only two nested columns are read! © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

12. Find cities with houses worth more than 2M • With [SPARK-4502], [SPARK-25363] sql("select address.city from homes where facts.price > 2000000”) == Physical Plan == *(1) Project [address#55.city AS city#77] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56] DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts)], ReadSchema: struct<address:struct<city:string>,facts:struct<price:int>> • Parquet predicate pushdown are not working for nested fields in Spark © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

13. Find cities with houses worth more than 2M • With [SPARK-25556] sql("select address.city from homes where facts.price > 2000000”) == Physical Plan == *(1) Project [address#55.city AS city#77] +- *(1) Filter (isnotnull(facts#56) && (facts#56.price > 2000000)) +- *(1) FileScan parquet [address#55,facts#56] DataFilters: [isnotnull(facts#56), (facts#56.price > 2000000)], Format: Parquet, PushedFilters: [IsNotNull(facts), GreaterThan(facts.price,2000000)], ReadSchema: struct<address:struct<city:string>,facts:struct<price:int>> • Predicate Pushdown in Parquet for nested fields provides significant performance gain by eliminating non-matches earlier to read less data and save the cost of processing them © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

14. Applying an UDF after repartition val areaUdf = udf{ (city: String, state: String, zipCode: String) => s"$city, $state $zipCode" } val query = sql("select * from homes").repartition(1).select( areaUdf(col("address.city"), col("address.state"), col("address.zipCode")) ).explain() == Physical Plan == *(2) Project [UDF(address#58.city, address#58.state, address#58.zipCode) AS UDF(address.city, address.state, address.zipCode)#70] +- Exchange RoundRobinPartitioning(1) +- *(1) Project [address#58] +- *(1) FileScan parquet [address#58] Format: Parquet, ReadSchema: struct<address:struct<houseNumber:int,streetAddress:string, city:string,state:string,zipCode:string>> © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

15. Problems in Supporting Nested Structures in Spark • Root level columns are represented by Attribute which is base of leaf named expressions • To get a nested field from a root level column, a GetStructField expression with child of Attribute has to be used • All column pruning logics are done in Attribute levels, resulting either the entire root column is taken or pruned • No easy way to cherry pick couple nested columns in this model © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

16. [SPARK-25603] Generalize Nested Column Pruning • [SPARK-4502], [SPARK-25363] are foundation to support nested structures better with Parquet in Spark • If an operator such as Repartition, Sample, or Limit are applied after Parquet FileScan, nested column pruning will not work • We address this by flattening the nested fields using Alias right after data read © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

17. Applying an UDF after repartition val query = sql("select * from homes").repartition(1).select( areaUdf(col("address.city"), col("address.state"), col("address.zipCode")) ).explain() == Physical Plan == *(2) Project [UDF(_gen_alias_84#84, _gen_alias_85#85, _gen_alias_86#86) AS UDF(address.city, address.state, address.zipCode)#64] +- Exchange RoundRobinPartitioning(1) +- *(1) Project [address#55.city AS _gen_alias_84#84, address#55.state AS _gen_alias_85#85, address#55.zipCode AS _gen_alias_86#86] +- *(1) FileScan parquet [address#55] ReadSchema: struct<address:struct<city:string,state:string,zipCode:string>> • Nested fields are replaced by Alias with flatten structures • Only three used nested fields are read from Parquet files © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

18. Production Query - Finding a Needle in a Haystack © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

19. Spark 2.3.1 1.2h 7.1TB © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

20. Spark 2.4 with [SPARK-4502], [SPARK-25363], and [SPARK-25556] 1.2h 7.1TB 3.3min 840GB © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

21. • 21x faster in wall clock time
 • 8x less data being read
 • More power efficient © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

22. Other work • Enhance the Dataset performance by analyzing JVM bytecode and turn closures into Catalyst expressions • Please check our other presentation tomorrow at 11am for more © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

23. Conclusions With some work, engineering rigor and some optimizations Spark can run at very large scale in lightning speed © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.

24.• [SPARK-4502] • [SPARK-25363] • [SPARK-25556] • [SPARK-25603]

25. Thank you © 2018 Apple Inc. All rights reserved. Redistribution or public display not permitted without written permission from Apple.