Automating Predictive Modeling at Zynga with PySpark and Pandas UDFs

Building propensity models at Zynga used to be a time-intensive task that required custom data science and engineering work for every new model. We’ve built an automated model pipeline that uses PySpark and feature generation to automate this process. The challenge that we faced was that the Featuretools library that we wanted to use for automated feature engineering works only on Pandas data frames, limiting the size of data sets that we could handle. Our solution to this problem is to use Pandas UDFs to scale the feature engineering process to our entire player base. We start with our full set of players, partition the data into smaller chucks that can be loaded into memory, apply the feature engineering step on these subsets of data, and then combine the results back into one large data set. This presentation will outline how we use Pandas UDFs in production to automate propensity modeling at Zynga. The outcome of this approach is that we now have hundreds of propensity models in production that teams can use to personalize game experiences. Instead of spending time on feature engineering and model fitting, our data scientists are now spending more of their time engaging with game teams to help build new features.
展开查看详情

1.WIFI SSID:SparkAISummit | Password: UnifiedAnalytics

2.Automating Predictive Modeling at Zynga with Pandas UDFs Ben Weber, Zynga #UnifiedAnalytics #SparkAISummit

3.Zynga Analytics #UnifiedAnalytics #SparkAISummit 3

4.Zynga Portfolio #UnifiedAnalytics #SparkAISummit 4

5.Our Challenge • We want to build game-specific models for behaviors such as likelihood to purchase • Our games have diverse event taxonomies • We have tens of millions of players and dozens of games across multiple platforms #UnifiedAnalytics #SparkAISummit 5

6.Our Approach • Featuretools for automating feature engineering • Pandas UDFs for distributing Featuretools • Databricks for building our model pipeline #UnifiedAnalytics #SparkAISummit 6

7.AutoModel • Zynga’s first portfolio-scale data product • Generates hundreds of propensity models • Powers features in our games & live services #UnifiedAnalytics #SparkAISummit 7

8.AutoModel Pipeline Data Feature Feature Model Model Extract Engineering Application Training Publish #UnifiedAnalytics #SparkAISummit 8

9. Data Extraction Data Feature Feature Model Model Extract Engineering Application Training Publish S3 & Parquet #UnifiedAnalytics #SparkAISummit 9

10.Feature Engineering Data Feature Feature Model Model Extract Engineering Application Training Publish #UnifiedAnalytics #SparkAISummit 10

11.Automated Feature Engineering • Goals – Translate our narrow and deep data tables into a shallow and wide representation – Support dozens of titles with diverse event taxonomies – Scale to billions of records and millions of players – Minimize manual data science workflows #UnifiedAnalytics #SparkAISummit 11

12.Feature Tools • A python library for deep feature synthesis • Represents data as entity sets • Identifies feature descriptors for transforming your data into new representations #UnifiedAnalytics #SparkAISummit 12

13.Entity Sets • Define the relationships between tables • Work with Pandas data frames Entityset: transactions Entities: customers (shape = [5, 3]) transactions (shape = [500, 5]) Relationships: transactions.customer_id -> customers.customer_id #UnifiedAnalytics #SparkAISummit 13

14. Feature Synthesis import featuretools as ft feature_matrix, features_defs = ft.dfs(entityset=es, target_entity="customers") feature_matrix.head(5) customer_id zip_code count(transactions) sum(transactions.amounts) 1 91000 0 0 2 91000 10 120.5 3 91005 5 17.96 4 91005 2 9.99 5 91000 3 29.97 #UnifiedAnalytics #SparkAISummit 14

15.Using Featuretools import featuretools as ft # 1-hot encode the raw event data es = ft.EntitySet(id="events") es = es.entity_from_dataframe(entity_id="events", dataframe=rawDataDF) feature_matrix, defs = ft.dfs(entityset=es, target_entity="events", max_depth=1) encodedDF, encoders = ft.encode_features(feature_matrix, defs) # perform deep feature synthesis on the encoded data es = ft.EntitySet(id="events") es = es.entity_from_dataframe(entity_id="events", dataframe=encodedDF) es = es.normalize_entity(base_entity_id="events", new_entity_id="users", index="user_id") generated_features, descriptors = ft.dfs(entityset=es, target_entity="users", max_depth=3) #UnifiedAnalytics #SparkAISummit 15

16.Scaling Up • Parallelize the process • Translate feature descriptions to Spark SQL • Find a way to distribute the task #UnifiedAnalytics #SparkAISummit 16

17.Feature Application Data Feature Feature Model Model Extract Engineering Application Training Publish Pandas UDFs #UnifiedAnalytics #SparkAISummit 17

18.Pandas UDFs • Introduced in Spark 2.3 • Provide Scalar and Grouped map operations • Partitioned using a groupby clause • Enable distributing code that uses Pandas #UnifiedAnalytics #SparkAISummit 18

19.Grouped Map UDFs Spark Input Pandas Pandas Pandas Pandas Pandas Input Input Input Input Input UDF UDF UDF UDF UDF Pandas Pandas Pandas Pandas Pandas Output Output Output Output Output Spark Output #UnifiedAnalytics #SparkAISummit 19

20.When to use UDFs? • You need to operate on Pandas data frames • Your data can be represented as a single Spark data frame • You can partition your data set #UnifiedAnalytics #SparkAISummit 20

21.Distributing SciPy schema = StructType([StructField('ID', LongType(), True), StructField('b0', DoubleType(), True), StructField('b1', DoubleType(), True)]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def analyze_player(player_pd): result = leastsq(fit, [1, 0], args=(player_pd.shots, player_pd.hits)) return pd.DataFrame({'ID': [player_pd.player_id[0]], 'b0' : result[0][1], 'b1' : result[0][1] }) result_spark_df = spark_df.groupby('player_id').apply(analyze_player) #UnifiedAnalytics #SparkAISummit 21

22.Step 1: Define the schema schema = StructType([StructField('ID', LongType(), True), StructField('b0', DoubleType(), True), StructField('b1', DoubleType(), True)]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def analyze_player(player_pd): result = leastsq(fit, [1, 0], args=(player_pd.shots, player_pd.hits)) return pd.DataFrame({'ID': [player_pd.player_id[0]], 'b0' : result[0][1], 'b1' : result[0][1] }) result_spark_df = spark_df.groupby('player_id').apply(analyze_player) #UnifiedAnalytics #SparkAISummit 22

23.Step 2: Choose a partition schema = StructType([StructField('ID', LongType(), True), StructField('b0', DoubleType(), True), StructField('b1', DoubleType(), True)]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def analyze_player(player_pd): result = leastsq(fit, [1, 0], args=(player_pd.shots, player_pd.hits)) return pd.DataFrame({'ID': [player_pd.player_id[0]], 'b0' : result[0][1], 'b1' : result[0][1] }) result_spark_df = spark_df.groupby('player_id').apply(analyze_player) #UnifiedAnalytics #SparkAISummit 23

24.Step 3: Use Pandas schema = StructType([StructField('ID', LongType(), True), StructField('b0', DoubleType(), True), StructField('b1', DoubleType(), True)]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def analyze_player(player_pd): result = leastsq(fit, [1, 0], args=(player_pd.shots, player_pd.hits)) return pd.DataFrame({'ID': [player_pd.player_id[0]], 'b0' : result[0][1], 'b1' : result[0][1] }) result_spark_df = spark_df.groupby('player_id').apply(analyze_player) #UnifiedAnalytics #SparkAISummit 24

25.Step 4: Return Pandas schema = StructType([StructField('ID', LongType(), True), StructField('b0', DoubleType(), True), StructField('b1', DoubleType(), True)]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def analyze_player(player_pd): result = leastsq(fit, [1, 0], args=(player_pd.shots, player_pd.hits)) return pd.DataFrame({'ID': [player_pd.player_id[0]], 'b0' : result[0][1], 'b1' : result[0][1] }) result_spark_df = spark_df.groupby('player_id').apply(analyze_player) #UnifiedAnalytics #SparkAISummit 25

26.Distributing Featuretools @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def apply_feature_generation(pandasInputDF): # create Entity Set representation es = ft.EntitySet(id="events") es = es.entity_from_dataframe(entity_id="events", dataframe=pandasInputDF) es = es.normalize_entity(base_entity_id="events", new_entity_id="users", index="user_id") # apply the feature calculation and return the result return ft.calculate_feature_matrix(saved_features, es) sparkFeatureDF = sparkInputDF.groupby('user_group').apply(apply_feature_generation) #UnifiedAnalytics #SparkAISummit 26

27.Issues with Pandas UDFs • Debugging is a challenge • Pushes the limits of Apache Arrow • Data type mismatches • Schema needs to be known before execution #UnifiedAnalytics #SparkAISummit 27

28.Model Training & Scoring Data Feature Feature Model Model Extract Engineering Application Training Publish MLlib #UnifiedAnalytics #SparkAISummit 28

29.Propensity Models • Classification models – Gradient-Boosted Trees – XGBoost • Hyperparameter tuning – ParamGridBuilder – CrossValidator #UnifiedAnalytics #SparkAISummit 29