Smart Join Algorithms for Fighting Skew at Scale

Consumer apps like Yelp generate log data at huge scale, and often this is distributed according to a power law, where a small number of users, businesses, locations, or pages are associated with a disproportionately large amount of data. This kind of data skew can cause problems for distributed algorithms, especially joins, where all the rows with the same key must be processed by the same executor. Even just a single over-represented entity can cause a whole job to slow down or fail. One approach to this problem is to remove outliers before joining, and this might be fine when training a machine learning model, but sometimes you need to retain all the data. Thankfully, there are a few tricks you can use to counteract the negative effects of skew while joining, by artificially redistributing data across more machines. This talk will walk through some of them, with code examples.

1.Spark+AI Summit April 24, 2019

2.Smart join algorithms for fighting skew at scale Andrew Clegg, Applied Machine Learning Group @ Yelp @andrew_clegg

3.Overview Data skew, outliers, power laws, and their symptoms How do joins work in Spark, and why skewed data hurts Joining skewed data more efficiently Extensions and tips

4.Data skew

5.DATA SKEW Classic skewed distributions Image by Rodolfo Hermans, CC BY-SA 3.0, via Wikiversity

6.DATA SKEW Outliers Image from Hedges & Shah, “Comparison of mode estimation methods and application in molecular clock analysis”, CC BY 4.0

7.DATA SKEW Power laws Word rank vs. word frequency in an English text corpus

8.Power laws are Electrostatic and gravitational forces (inverse square law) everywhere (approximately) Distribution of earthquake magnitudes ‘80/20 rule’ in distribution of income (Pareto principle) Relationship between body size and metabolism (Kleiber’s law)

9.Power laws are Word frequencies in natural language corpora (Zipf’s law) everywhere (approximately) Degree distribution in social networks (‘Bieber problem’) Participation inequality on wikis and forum sites (‘1% rule’) Popularity of websites and their content

10.Why is this a problem?

11.Popularity hotspots: Hot shards in databases — salt keys, change schema symptoms and fixes Slow load times for certain users — look for O(n2) mistakes Hot mappers in map-only tasks — repartition randomly Hot reducers during joins and aggregations … ?

12.POPULARITY HOTSPOTS !? Diagnosing hot executors

13.Spark joins under the hood

14.SPARK JOINS Shuffled hash join Shuffle Shuffle DataFrame 2 DataFrame 1 Partitions

15.SPARK JOINS Shuffled hash join with very skewed data Shuffle Shuffle DataFrame 2 DataFrame 1 Partitions

16.SPARK JOINS Broadcast join can help Broadcast Map DataFrame 2 DataFrame 1 Joined

17.SPARK JOINS Broadcast join can help sometimes Broadcast Map DataFrame 2 Must fit in RAM on each executor DataFrame 1 Joined

18.Joining skewed data faster

19.Splitting a single key Append random int in [0, R) to each key in skewed data across multiple tasks Replicate each row in non-skewed data, R times Append replica ID to original key in non-skewed data Join on this newly-generated key R = replication factor

20.FASTER JOINS Replica IDs 0 Replicated join 1 Shuffle Shuffle 2 DataFrame 2_0 0 1 DataFrame 2_1 2 0 DataFrame 1 1 2 DataFrame 2_2 Logical Partitions

21.replication_factor = 10 # spark.range creates an 'id' column: 0 <= id < replication_factor replication_ids = F.broadcast( spark.range(replication_factor).withColumnRenamed('id', 'replica_id') ) # Replicate uniform data, one copy of each row per bucket # composite_key looks like: 12345@3 (original_id@replica_id) uniform_data_replicated = ( uniform_data .crossJoin(replication_ids) .withColumn( 'composite_key', F.concat('original_id', F.lit('@'), 'replica_id') ) )

22.def randint(limit): return F.least( F.floor(F.rand() * limit), F.lit(limit - 1), # just to avoid unlikely edge case ) # Randomly assign rows in skewed data to buckets # composite_key has same format as in uniform data skewed_data_tagged = ( skewed_data .withColumn( 'composite_key', F.concat( 'original_id', F.lit('@'), randint(replication_factor), ) ) )

23.# Join them together on the composite key joined = skewed_data_tagged.join( uniform_data_replicated, on='composite_key', how='inner', )

24.# Join them together on the composite key joined = skewed_data_tagged.join( uniform_data_replicated, on='composite_key', WARNING how='inner', ) Inner and left outer joins only

25.FASTER JOINS Remember duplicates… Same row All different rows Same row Same row

26.Experiments with 100 million rows of data with uniformly-distributed keys synthetic data 100 billion rows of data with Zipf-distributed keys Standard inner join ran for 7+ hours then I killed it! 10x replicated join completed in 1h16m

27.Can we do better?

28.Differential replication Very common keys should be replicated many times Rare keys don’t need to be replicated as much (or at all?) Identify frequent keys before replication Use different replication policy for those

29.replication_factor_high = 50 replication_high = F.broadcast( spark .range(replication_factor_high) .withColumnRenamed('id', 'replica_id') ) replication_factor_low = 10 replication_low = … # as above # Determine which keys are highly over-represented top_keys = F.broadcast( skewed_data .freqItems(['original_id'], 0.0001) # return keys with frequency > this .select( F.explode('id_freqItems').alias('id_freqItems') ) )