- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
In-Memory Storage Evolution in Apache Spark
展开查看详情
1 . In-Memory Storage Evolution in Apache Spark Kazuaki Ishizaki IBM Research – Tokyo @kiszk #UnifiedAnalytics #SparkAISummit
2 .About Me – Kazuaki Ishizaki • Researcher at IBM Research in compiler optimizations • Working for IBM Java virtual machine over 20 years – In particular, just-in-time compiler • Committer of Apache Spark (SQL package) from 2018 • ACM Distinguished Member • Homepage: http://ibm.biz/ishizaki b: https://github.com/kiszk wit: @kiszk https://slideshare.net/ishizaki In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 2 #UnifiedAnalytics #SparkAISummit
3 .Why is In-Memory Storage? • In-memory storage is mandatory for high performance • In-memory columnar storage is necessary to – Support first-class citizen column format Parquet – Achieve better compression rate for table cache Row format Column format Row 0 Spark 2.0 1 Column x Spark AI Summit Row 1 AI 1.9 2 Column y 2.0 1.9 5000.0 Row 2 Summit 5000.0 3 Column z 1 2 3 memory address memory address In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 3 #UnifiedAnalytics #SparkAISummit
4 .What I Will Talk about • Columnar storage is used to improve performance for – table cache, Parquet, ORC, and Arrow • Columnar storage from Spark 2.3 – improves performance of PySpark with Pandas UDF using Arrow – can be connected with external other columnar storages by using a public class “ColumnVector” #UnifiedAnalytics #SparkAISummit 4
5 . How Columnar Storage is Used • Table cache ORC df = ... df = spark.read.format(“orc”).load(“c”) df.cache df1 = df.selectExpr(“y + 1.2”) df1 = df.selectExpr(“y + 1.2”) • Pandas UDF Parquet @pandas_udf(‘double’) df = spark.read.parquet(“c”) def plus(v): df1 = df.selectExpr(“y + 1.2”) return v + 1.2 df1 = df.withColumn(‘yy’, plus(df.y)) In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 5 #UnifiedAnalytics #SparkAISummit
6 .Performance among Spark Versions • DataFrame table cache from Spark 2.0 to Spark 2.4 Performance comparison among different Spark versions Spark 2.4 shorter is better Spark 2.3 Spark 2.0 0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1 Relative elapsed time df.filter(“i % 16 == 0").count In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 6 #UnifiedAnalytics #SparkAISummit
7 .How This Improvement is Achieved • Structure of columnar storage • Generated code to access columnar storage In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 7 #UnifiedAnalytics #SparkAISummit
8 .Outline • Introduction • Deep dive into columnar storage • Deep dive into generated code of columnar storage • Next steps In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 8 #UnifiedAnalytics #SparkAISummit
9 . In-Memory Storage Evolution (1/2) RDD table cache : Java objects Table cache : Own memory layout by Project Tungsten for table cache Parquet : Own memory layout, but different class from table Spark cache to 1.3 1.4 to 1.6 2.0 to 2.2 version RDD table cache Table cache | Spark 2.0 Spark AI 2.0 1.9 | AI 1.9 Parquet vectorized reader Spark AI 2.0 1.9 In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 9 #UnifiedAnalytics #SparkAISummit
10 . In-Memory Storage Evolution (2/2) ColumnVector class becomes public class from Spark 2.3 Table cache, Parquet, ORC, and Arrow use common ColumnVector class Spark 2.3 2.4 version Spark AI 2.0 1.9 ColumnVector becomes a public class Table cache Parquet vectorized reader Pandas UDF with Arrow ORC vectorized reader In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 10 #UnifiedAnalytics #SparkAISummit
11 .Implementation in Spark 1.4 to 1.6 • Table cache uses CachedBatch that is not accessed directly from generated code CachedBatch.buffers case class CachedBatch( Spark AI buffers: Array[Array[Byte]], stats: Row) 2.0 1.9 In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 11 #UnifiedAnalytics #SparkAISummit
12 .Implementation in Spark 2.0 • Parquet uses ColumnVector class that has well-defined methods that could be called from generated code public abstract class ColumnVector { float getFloat(…) … UTF8String getUTF8String(…) … … } ColumnarBatch public final class OnHeapColumnVector extends ColumnVector { ColumnVector private byte[] byteData; Spark AI Spark AI … copy private float[] floatData; 2.0 1.9 2.0 1.9 … } In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 12 #UnifiedAnalytics #SparkAISummit
13 .Implementation in Spark 2.3 • Table cache, Parquet, and Arrow also use ColumnVector • ColumnVector becomes a public class to define APIs public final class OnHeapColumnVector Table cache extends ColumnVector { ColumnVector.java // Array for each type. Parquet private byte[] byteData; /** * An interface representing in-memory columnar data in … vectorized readers private float[] floatData; Spark. This interface defines the main APIs … * to access the data, as well as their batched versions. } The batched versions are considered to be * faster and preferable whenever possible. */ @Evolving public abstract class ColumnVector … { float getFloat(…) … public final class ArrowColumnVector Pandas UDF with Arrow UTF8String getUTF8String(…) … extends ColumnVector { … … } } https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 13 #UnifiedAnalytics #SparkAISummit
14 . ColumnVector for Your Columnar • Developers can write an own class, which extends ColumnVector, to support a new columnar or to exchange data with other formats Columnar MyColumnarClass data source extends ColumnVector In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 14 #UnifiedAnalytics #SparkAISummit
15 .Implementation in Spark 2.4 • ORC also uses ColumnVector public final class OnHeapColumnVector Table cache extends ColumnVector { ColumnVector.java // Array for each type. Parquet and ORC private byte[] byteData; /** * An interface representing in-memory columnar data in … vectorized readers private float[] floatData; Spark. This interface defines the main APIs … * to access the data, as well as their batched versions. } The batched versions are considered to be * faster and preferable whenever possible. */ @Evolving public abstract class ColumnVector … { float getFloat(…) … public final class ArrowColumnVector Pandas UDF with Arrow UTF8String getUTF8String(…) … extends ColumnVector { … … } } https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 15 #UnifiedAnalytics #SparkAISummit
16 .Outline • Introduction • Deep dive columnar storage • Deep dive generated code of columnar storage • Next steps In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 16 #UnifiedAnalytics #SparkAISummit
17 .How Spark Program is Executed? • A Spark program is translated into Java code to be executed df = ... Spark Program df.cache df1 = df.selectExpr(“y + 1.2”) Source: Michael et al., Spark SQL: Relational Data Processing in Spark, Catalyst SIGMOD’15 while (rowIterator.hasNext()) { Row row = rowIterator.next; … } Java virtual machine In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 17 #UnifiedAnalytics #SparkAISummit
18 .Access Columnar Storage (before 2.0) • While columnar storage is used, generated code gets data from row storage Data conversion is required df: CachedBatch Spark AI Columnar storage 2.0 1.9 df1 = df.selectExpr(“y + 1.2") Catalyst Data conversion Row while (rowIterator.hasNext()) { row: Spark 2.0 storage Row row = rowIterator.next(); float y = row.getFloat(1); 2.0 float f = y + 1.2; … } In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 18 #UnifiedAnalytics #SparkAISummit
19 .Access Columnar Storage (from 2.0) • When columnar storage is used, reading data elements directly accesses columnar storage – Removed copy for Parquet in 2.0 and table cache in 2.3 df1 = df.selectExpr(“y + 1.2") df: ColumnVector 2.0 1.9 Catalyst ColumnVector column1 = … while (i++ < numRows) { i=0 i=1 float y = column1.getFloat(i); y: 2.0 1.9 float f = y + 1.2; f: 3.2 3.1 … } In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 19 #UnifiedAnalytics #SparkAISummit
20 .Access Columnar Storage (from 2.3) • Generate this pattern for all cases regarding ColumnVector • Use for-loop to encourage compiler optimizations – Hotspot compiler applies loop optimizations to a well-formed loop df1 = df.selectExpr(“y + 1.2") df: ColumnVector 2.0 1.9 Catalyst ColumnVector column1 = … for (int i = 0; i < numRows; i++) { i=0 i=1 float y = column1.getFloat(i); y: 2.0 1.9 float f = y + 1.2; f: 3.2 3.1 … } In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 20 #UnifiedAnalytics #SparkAISummit
21 .How Columnar Storage is used in PySpark • Share data in columnar storages of Spark and Pandas – No serialization and deserialization – 3-100x performance improvements Source: ”Introducing Pandas UDF for PySpark” by Databricks blog ColumnVector @pandas_udf(‘double’) def plus(v): return v + 1.2 Details on “Apache Arrow and Pandas UDF on Apache Spark” by Takuya Ueshin In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 21 #UnifiedAnalytics #SparkAISummit
22 .Outline • Introduction • Deep dive columnar storage • Deep dive generated code of columnar storage • Next steps In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 22 #UnifiedAnalytics #SparkAISummit
23 .Next Steps • Short-term – support an array type in ColumnVector for table cache – support additional external columnar storage • Middle-term – exploit SIMD instructions to process multiple rows in a column in generated code • Extension of SPARK-25728 (Tungsten IR) #UnifiedAnalytics #SparkAISummit 23
24 .Integrate Spark with Others • Frameworks: DL/ML frameworks From rapids.ai GPU • SPARK-24579 • SPARK-26413 • Resources: GPU, FPGA, .. • SPARK-27396 FPGA • SAIS2019: “Apache Arrow-Based Unified Data Sharing and Transferring Format Among CPU and Accelerators” In-Memory Storage Evolution in Apache Spark / Kazuaki Ishizaki 24 #UnifiedAnalytics #SparkAISummit
25 .Takeaway • Columnar storage is used to improve performance for – table cache, Parquet, ORC, and Arrow • Columnar storage from Spark 2.3 – improves performance of PySpark with Pandas UDF using Arrow – can be connected with external other columnar storages by using a public class “ColumnVector” #UnifiedAnalytics #SparkAISummit 25
26 .Thanks Spark Community • Especially, @andrewor14, @bryanCutler, @cloud-fan, @dongjoon-hyun, @gatorsmile, @hvanhovell, @mgaido91, @ueshin, @viirya #UnifiedAnalytics #SparkAISummit 26