Koalas - Making an Easy Transition from Pandas to Apache Spark

In this talk, we present Koalas, a new open-source project that aims at bridging the gap between the big data and small data for data scientists and at simplifying Apache Spark for people who are already familiar with the pandas library in Python.

Pandas is the standard tool for data science in python, and it is typically the first step to explore and manipulate a data set by data scientists. The problem is that pandas does not scale well to big data. It was designed for small data sets that a single machine could handle.

When data scientists work today with very large data sets, they either have to migrate to PySpark to leverage Spark or downsample their data so that they can use pandas. This presentation will give a deep dive into the conversion between Spark and pandas dataframes.

Through live demonstrations and code samples, you will understand: – how to effectively leverage both pandas and Spark inside the same code base – how to leverage powerful pandas concepts such as lightweight indexing with Spark – technical considerations for unifying the different behaviors of Spark and pandas

展开查看详情

1. Koalas: Unifying Spark and pandas APIs Tim Hunter Takuya Ueshin Spark + AI Summit Europe 2019 1

2.About Tim Hunter, Software Engineer at Databricks • Co-creator of the Koalas project • Contributes to Apache Spark MLlib, GraphFrames, TensorFrames and Deep Learning Pipelines libraries • Ph.D Machine Learning from Berkeley, M.S. Electrical Engineering from Stanford Takuya Ueshin, Software Engineer at Databricks • Apache Spark committer and PMC member • Focusing on Spark SQL and PySpark • Koalas committer

3.Outline ● pandas vs Spark at a high level ● why Koalas (combine everything in one package) ○ key differences ● current status & new features ● demo ● technical topics ○ InternalFrame ○ Operations on different DataFrames ○ Default Index ● roadmap

4.Typical journey of a data scientist Education (MOOCs, books, universities) → pandas Analyze small data sets → pandas Analyze big data sets → DataFrame in Spark 4

5.pandas Authored by Wes McKinney in 2008 The standard tool for data manipulation and analysis in Python Deeply integrated into Python data science ecosystem, e.g. numpy, matplotlib Can deal with a lot of different situations, including: - basic statistical analysis - handling missing data - time series, categorical variables, strings 5

6.Apache Spark De facto unified analytics engine for large-scale data processing (Streaming, ETL, ML) Originally created at UC Berkeley by Databricks’ founders PySpark API for Python; also API support for Scala, R and SQL 6

7.pandas DataFrame vs Spark DataFrame pandas DataFrame Spark DataFrame Column df['col'] df['col'] Mutability Mutable Immutable Add a column df['c'] = df['a'] + df['b'] df.withColumn('c', df['a'] + df['b']) Rename columns df.columns = ['a','b'] df.select(df['c1'].alias('a'), df['c2'].alias('b')) Value count df['col'].value_counts() df.groupBy(df['col']).count().order By('count', ascending = False) 7

8. A short example import pandas as pd df = (spark.read df = pd.read_csv("my_data.csv") .option("inferSchema", "true") .option("comment", True) df.columns = ['x', 'y', 'z1'] .csv("my_data.csv")) df = df.toDF('x', 'y', 'z1') df['x2'] = df.x * df.x df = df.withColumn('x2', df.x*df.x) 8

9.Koalas Announced April 24, 2019 Pure Python library Aims at providing the pandas API on top of Apache Spark: - unifies the two ecosystems with a familiar API - seamless transition between small and large data 9

10.Quickly gaining traction Bi-weekly releases! > 500 patches merged since announcement > 20 significant contributors outside of Databricks > 8k daily downloads 10

11.A short example import pandas as pd import databricks.koalas as ks df = pd.read_csv("my_data.csv") df = ks.read_csv("my_data.csv") df.columns = ['x', 'y', 'z1'] df.columns = ['x', 'y', 'z1'] df['x2'] = df.x * df.x df['x2'] = df.x * df.x 11

12.Koalas - Provide discoverable APIs for common data science tasks (i.e., follows pandas) - Unify pandas API and Spark API, but pandas first - pandas APIs that are appropriate for distributed dataset - Easy conversion from/to pandas DataFrame or numpy array. 12

13.Key Differences Spark is more lazy by nature: - most operations only happen when displaying or writing a DataFrame Spark does not maintain row order Performance when working at scale 13

14.Current status Bi-weekly releases, very active community with daily changes The most common functions have been implemented: - 60% of the DataFrame / Series API - 60% of the DataFrameGroupBy / SeriesGroupBy API - 15% of the Index / MultiIndex API - to_datetime, get_dummies, … 14

15.New features - 80% of the plot functions (0.16.0-) - Spark related functions (0.8.0-) - IO: to_parquet/read_parquet, to_csv/read_csv, to_json/read_json, to_spark_io/read_spark_io, to_delta/read_delta, ... - SQL - cache - Support for multi-index columns (90%) (0.16.0-) - Options to configure Koalas’ behavior (0.17.0-) 15

16.Demo 16

17.How Virgin Hyperloop One reduced processing time from hours to minutes with Koalas Challenge: increasing scale and complexity of data operations Struggling with the “Spark switch” from pandas More than 10X faster with less than 1% code changes

18.Internal Frame Internal immutable metadata. - holds the current Spark DataFrame - manages mapping from Koalas column names to Spark column names - manages mapping from Koalas index names to Spark column names - converts between Spark DataFrame and pandas DataFrame 18

19.InternalFrame InternalFrame Koalas Spark - column_index DataFrame DataFrame - index_map 19

20.InternalFrame InternalFrame Koalas Spark - column_index DataFrame DataFrame - index_map API call copy with new state InternalFrame Koalas Spark - column_index DataFrame DataFrame - index_map 20

21.InternalFrame InternalFrame Koalas Spark - column_index DataFrame DataFrame - index_map API call copy with new state InternalFrame Only updates metadata Koalas - column_index DataFrame - index_map kdf.set_index(...) 21

22.InternalFrame InternalFrame Koalas Spark - column_index DataFrame DataFrame - index_map API call copy with new state e.g., inplace=True InternalFrame Spark - column_index DataFrame kdf.dropna(..., inplace=True) - index_map 22

23.Operations on different DataFrames We only allow Series derived from the same DataFrame by default. OK Not OK - df.a + df.b - df1.a + df2.b - df['c'] = df.a * df.b - df1['c'] = df2.a * df2.b 23

24.Operations on different DataFrames We only allow Series derived from the same DataFrame by default. OK Not OK - df.a + df.b - df1.a + df2.b - df['c'] = df.a * df.b - df1['c'] = df2.a * df2.b Equivalent Spark code? ??? sdf.select( sdf1.select( sdf['a'] + sdf['b']) sdf1['a'] + sdf2['b']) 24

25.Operations on different DataFrames We only allow Series derived from the same DataFrame by default. OK Not OK - df.a + df.b - df1.a + df2.b - df['c'] = df.a * df.b - df1['c'] = df2.a * df2.b Equivalent Spark code? ??? !! e p t i o n sdf.select( sdf1.select( a l y s i s Exc sdf['a'] + sdf['b']) An sdf1['a'] + sdf2['b']) 25

26.Operations on different DataFrames ks.set_option('compute.ops_on_diff_frames', True) OK OK - df.a + df.b - df1.a + df2.b - df['c'] = df.a * df.b - df1['c'] = df2.a * df2.b Equivalent Spark code? sdf1.join(sdf2, sdf.select( on="_index_") sdf['a'] + sdf['b']) .select('a * b') 26

27.Default Index Koalas manages a group of columns as index. The index behaves the same as pandas’. If no index is specified when creating a Koalas DataFrame: it attaches a “default index” automatically. Each “default index” has Pros and Cons. 27

28.Default Indexes Configurable by the option “compute.default_index_type” requires to collect data requires shuffle continuous increments into single node sequence YES YES YES distributed-sequence NO YES / NO YES / NO distributed NO NO NO See also: https://koalas.readthedocs.io/en/latest/user_guide/options.html#default-index-type 28

29.What to expect? • Improve pandas API coverage - rolling/expanding • Support categorical data types • More time-series related functions • Improve performance - Minimize the overhead at Koalas layer - Optimal implementation of APIs 29