In-Memory Storage Evolution in Apache Spark

This talk will summarize recent activities in Apache Spark developer’s community to enhance columnar storage in Spark 2.3. Columnar storage is known as an efficient format for keeping consecutive fields of a column. On the other hand, previous versions of Spark used columnar storage in a few places. Columnar storage was an internal data structure. Spark 2.3 published an abstract class ColumnVector as a public API. Then, Spark 2.3 uses ColumnVector to effectively support several columnar storages with huge performance improvements. Pre-Spark 2.3 uses columnar storages for reading Apache Parquet and creating table cache in a program written in SQL, DataFrame, or Dataset (e.g. df.cache()). These columnar storages are accessed using different internal APIs. This difference led to performance inefficiency of table cache. Spark 2.3 defined ColumnVector as a public API. Then, Spark 2.3 can read data in Apache Arrow and Apache ORC thru ColumnVector without extra data conversion and data copy. While PySpark in pre-Spark 2.3 had huge overhead regarding serialization and desterilization, Spark 2.3 eliminated this overhead by using to use pandas with Apache Arrow. Thus, Spark 2.3 improves performance of PySpark. Spark 2.3 accesses columnar storage for table cache thru ColumnVector without data copy. Spark 2.3 also improves performance of table cache. Here are takeaways of this talk: (1) ColumnVector in Spark 2.3 is a public API of columnar storage to exchange data with other columnar storages. (2) Spark 2.3 uses ColumnVector to exchange famous columnar storages Apache Arrow and Apache ORC with low overhead, and improves performance. (3) Spark 2.3 and later versions improve performance of PySpark by using Pandas. (4) Spark 2.3 and later versions use ColumnVector for table cache and improved performance.
展开查看详情

1. In-Memory Storage Evolution in Apache Spark Kazuaki Ishizaki IBM Research – Tokyo @kiszk #UnifiedAnalytics #SparkAISummit

2.About Me – Kazuaki Ishizaki • Researcher at IBM Research in compiler optimizations • Working for IBM Java virtual machine over 20 years – In particular, just-in-time compiler • Committer of Apache Spark (SQL package) from 2018 • ACM Distinguished Member • Homepage: http://ibm.biz/ishizaki b: https://github.com/kiszk wit: @kiszk https://slideshare.net/ishizaki In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 2 #UnifiedAnalytics #SparkAISummit

3.Why is In-Memory Storage? • In-memory storage is mandatory for high performance • In-memory columnar storage is necessary to – Support first-class citizen column format Parquet – Achieve better compression rate for table cache Row format Column format Row 0 Spark 2.0 1 Column x Spark AI Summit Row 1 AI 1.9 2 Column y 2.0 1.9 5000.0 Row 2 Summit 5000.0 3 Column z 1 2 3 memory address memory address In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 3 #UnifiedAnalytics #SparkAISummit

4.What I Will Talk about • Columnar storage is used to improve performance for – table cache, Parquet, ORC, and Arrow • Columnar storage from Spark 2.3 – improves performance of PySpark with Pandas UDF using Arrow – can be connected with external other columnar storages by using a public class “ColumnVector” #UnifiedAnalytics #SparkAISummit 4

5. How Columnar Storage is Used • Table cache ORC df = ... df = spark.read.format(“orc”).load(“c”) df.cache df1 = df.selectExpr(“y + 1.2”) df1 = df.selectExpr(“y + 1.2”) • Pandas UDF Parquet @pandas_udf(‘double’) df = spark.read.parquet(“c”) def plus(v): df1 = df.selectExpr(“y + 1.2”) return v + 1.2 df1 = df.withColumn(‘yy’, plus(df.y)) In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 5 #UnifiedAnalytics #SparkAISummit

6.Performance among Spark Versions • DataFrame table cache from Spark 2.0 to Spark 2.4 Performance comparison among different Spark versions Spark 2.4 shorter is better Spark 2.3 Spark 2.0 0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1 Relative elapsed time df.filter(“i % 16 == 0").count In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 6 #UnifiedAnalytics #SparkAISummit

7.How This Improvement is Achieved • Structure of columnar storage • Generated code to access columnar storage In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 7 #UnifiedAnalytics #SparkAISummit

8.Outline • Introduction • Deep dive into columnar storage • Deep dive into generated code of columnar storage • Next steps In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 8 #UnifiedAnalytics #SparkAISummit

9. In-Memory Storage Evolution (1/2) RDD table cache : Java objects Table cache : Own memory layout by Project Tungsten for table cache Parquet : Own memory layout, but different class from table Spark cache to 1.3 1.4 to 1.6 2.0 to 2.2 version RDD table cache Table cache | Spark 2.0 Spark AI 2.0 1.9 | AI 1.9 Parquet vectorized reader Spark AI 2.0 1.9 In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 9 #UnifiedAnalytics #SparkAISummit

10. In-Memory Storage Evolution (2/2) ColumnVector class becomes public class from Spark 2.3 Table cache, Parquet, ORC, and Arrow use common ColumnVector class Spark 2.3 2.4 version Spark AI 2.0 1.9 ColumnVector becomes a public class Table cache Parquet vectorized reader Pandas UDF with Arrow ORC vectorized reader In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 10 #UnifiedAnalytics #SparkAISummit

11.Implementation in Spark 1.4 to 1.6 • Table cache uses CachedBatch that is not accessed directly from generated code CachedBatch.buffers case class CachedBatch( Spark AI buffers: Array[Array[Byte]], stats: Row) 2.0 1.9 In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 11 #UnifiedAnalytics #SparkAISummit

12.Implementation in Spark 2.0 • Parquet uses ColumnVector class that has well-defined methods that could be called from generated code public abstract class ColumnVector { float getFloat(…) … UTF8String getUTF8String(…) … … } ColumnarBatch public final class OnHeapColumnVector extends ColumnVector { ColumnVector private byte[] byteData; Spark AI Spark AI … copy private float[] floatData; 2.0 1.9 2.0 1.9 … } In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 12 #UnifiedAnalytics #SparkAISummit

13.Implementation in Spark 2.3 • Table cache, Parquet, and Arrow also use ColumnVector • ColumnVector becomes a public class to define APIs public final class OnHeapColumnVector Table cache extends ColumnVector { ColumnVector.java // Array for each type. Parquet private byte[] byteData; /** * An interface representing in-memory columnar data in … vectorized readers private float[] floatData; Spark. This interface defines the main APIs … * to access the data, as well as their batched versions. } The batched versions are considered to be * faster and preferable whenever possible. */ @Evolving public abstract class ColumnVector … { float getFloat(…) … public final class ArrowColumnVector Pandas UDF with Arrow UTF8String getUTF8String(…) … extends ColumnVector { … … } } https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 13 #UnifiedAnalytics #SparkAISummit

14. ColumnVector for Your Columnar • Developers can write an own class, which extends ColumnVector, to support a new columnar or to exchange data with other formats Columnar MyColumnarClass data source extends ColumnVector In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 14 #UnifiedAnalytics #SparkAISummit

15.Implementation in Spark 2.4 • ORC also uses ColumnVector public final class OnHeapColumnVector Table cache extends ColumnVector { ColumnVector.java // Array for each type. Parquet and ORC private byte[] byteData; /** * An interface representing in-memory columnar data in … vectorized readers private float[] floatData; Spark. This interface defines the main APIs … * to access the data, as well as their batched versions. } The batched versions are considered to be * faster and preferable whenever possible. */ @Evolving public abstract class ColumnVector … { float getFloat(…) … public final class ArrowColumnVector Pandas UDF with Arrow UTF8String getUTF8String(…) … extends ColumnVector { … … } } https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 15 #UnifiedAnalytics #SparkAISummit

16.Outline • Introduction • Deep dive columnar storage • Deep dive generated code of columnar storage • Next steps In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 16 #UnifiedAnalytics #SparkAISummit

17.How Spark Program is Executed? • A Spark program is translated into Java code to be executed df = ... Spark Program df.cache df1 = df.selectExpr(“y + 1.2”) Source: Michael et al., Spark SQL: Relational Data Processing in Spark, Catalyst SIGMOD’15 while (rowIterator.hasNext()) { Row row = rowIterator.next; … } Java virtual machine In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 17 #UnifiedAnalytics #SparkAISummit

18.Access Columnar Storage (before 2.0) • While columnar storage is used, generated code gets data from row storage Data conversion is required df: CachedBatch Spark AI Columnar storage 2.0 1.9 df1 = df.selectExpr(“y + 1.2") Catalyst Data conversion Row while (rowIterator.hasNext()) { row: Spark 2.0 storage Row row = rowIterator.next(); float y = row.getFloat(1); 2.0 float f = y + 1.2; … } In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 18 #UnifiedAnalytics #SparkAISummit

19.Access Columnar Storage (from 2.0) • When columnar storage is used, reading data elements directly accesses columnar storage – Removed copy for Parquet in 2.0 and table cache in 2.3 df1 = df.selectExpr(“y + 1.2") df: ColumnVector 2.0 1.9 Catalyst ColumnVector column1 = … while (i++ < numRows) { i=0 i=1 float y = column1.getFloat(i); y: 2.0 1.9 float f = y + 1.2; f: 3.2 3.1 … } In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 19 #UnifiedAnalytics #SparkAISummit

20.Access Columnar Storage (from 2.3) • Generate this pattern for all cases regarding ColumnVector • Use for-loop to encourage compiler optimizations – Hotspot compiler applies loop optimizations to a well-formed loop df1 = df.selectExpr(“y + 1.2") df: ColumnVector 2.0 1.9 Catalyst ColumnVector column1 = … for (int i = 0; i < numRows; i++) { i=0 i=1 float y = column1.getFloat(i); y: 2.0 1.9 float f = y + 1.2; f: 3.2 3.1 … } In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 20 #UnifiedAnalytics #SparkAISummit

21.How Columnar Storage is used in PySpark • Share data in columnar storages of Spark and Pandas – No serialization and deserialization – 3-100x performance improvements Source: ”Introducing Pandas UDF for PySpark” by Databricks blog ColumnVector @pandas_udf(‘double’) def plus(v): return v + 1.2 Details on “Apache Arrow and Pandas UDF on Apache Spark” by Takuya Ueshin In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 21 #UnifiedAnalytics #SparkAISummit

22.Outline • Introduction • Deep dive columnar storage • Deep dive generated code of columnar storage • Next steps In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 22 #UnifiedAnalytics #SparkAISummit

23.Next Steps • Short-term – support an array type in ColumnVector for table cache – support additional external columnar storage • Middle-term – exploit SIMD instructions to process multiple rows in a column in generated code • Extension of SPARK-25728 (Tungsten IR) #UnifiedAnalytics #SparkAISummit 23

24.Integrate Spark with Others • Frameworks: DL/ML frameworks From rapids.ai GPU • SPARK-24579 • SPARK-26413 • Resources: GPU, FPGA, .. • SPARK-27396 FPGA • SAIS2019: “Apache Arrow-Based Unified Data Sharing and Transferring Format Among CPU and Accelerators” In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 24 #UnifiedAnalytics #SparkAISummit

25.Takeaway • Columnar storage is used to improve performance for – table cache, Parquet, ORC, and Arrow • Columnar storage from Spark 2.3 – improves performance of PySpark with Pandas UDF using Arrow – can be connected with external other columnar storages by using a public class “ColumnVector” #UnifiedAnalytics #SparkAISummit 25

26.Thanks Spark Community • Especially, @andrewor14, @bryanCutler, @cloud-fan, @dongjoon-hyun, @gatorsmile, @hvanhovell, @mgaido91, @ueshin, @viirya #UnifiedAnalytics #SparkAISummit 26