- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Smart Join Algorithms for Fighting Skew at Scale
展开查看详情
1 .Spark+AI Summit April 24, 2019
2 .Smart join algorithms for fighting skew at scale Andrew Clegg, Applied Machine Learning Group @ Yelp @andrew_clegg
3 .Overview Data skew, outliers, power laws, and their symptoms How do joins work in Spark, and why skewed data hurts Joining skewed data more efficiently Extensions and tips
4 .Data skew
5 .DATA SKEW Classic skewed distributions Image by Rodolfo Hermans, CC BY-SA 3.0, via Wikiversity
6 .DATA SKEW Outliers Image from Hedges & Shah, “Comparison of mode estimation methods and application in molecular clock analysis”, CC BY 4.0
7 .DATA SKEW Power laws Word rank vs. word frequency in an English text corpus
8 .Power laws are Electrostatic and gravitational forces (inverse square law) everywhere (approximately) Distribution of earthquake magnitudes ‘80/20 rule’ in distribution of income (Pareto principle) Relationship between body size and metabolism (Kleiber’s law)
9 .Power laws are Word frequencies in natural language corpora (Zipf’s law) everywhere (approximately) Degree distribution in social networks (‘Bieber problem’) Participation inequality on wikis and forum sites (‘1% rule’) Popularity of websites and their content
10 .Why is this a problem?
11 .Popularity hotspots: Hot shards in databases — salt keys, change schema symptoms and fixes Slow load times for certain users — look for O(n2) mistakes Hot mappers in map-only tasks — repartition randomly Hot reducers during joins and aggregations … ?
12 .POPULARITY HOTSPOTS !? Diagnosing hot executors
13 .Spark joins under the hood
14 .SPARK JOINS Shuffled hash join Shuffle Shuffle DataFrame 2 DataFrame 1 Partitions
15 .SPARK JOINS Shuffled hash join with very skewed data Shuffle Shuffle DataFrame 2 DataFrame 1 Partitions
16 .SPARK JOINS Broadcast join can help Broadcast Map DataFrame 2 DataFrame 1 Joined
17 .SPARK JOINS Broadcast join can help sometimes Broadcast Map DataFrame 2 Must fit in RAM on each executor DataFrame 1 Joined
18 .Joining skewed data faster
19 .Splitting a single key Append random int in [0, R) to each key in skewed data across multiple tasks Replicate each row in non-skewed data, R times Append replica ID to original key in non-skewed data Join on this newly-generated key R = replication factor
20 .FASTER JOINS Replica IDs 0 Replicated join 1 Shuffle Shuffle 2 DataFrame 2_0 0 1 DataFrame 2_1 2 0 DataFrame 1 1 2 DataFrame 2_2 Logical Partitions
21 .replication_factor = 10 # spark.range creates an 'id' column: 0 <= id < replication_factor replication_ids = F.broadcast( spark.range(replication_factor).withColumnRenamed('id', 'replica_id') ) # Replicate uniform data, one copy of each row per bucket # composite_key looks like: 12345@3 (original_id@replica_id) uniform_data_replicated = ( uniform_data .crossJoin(replication_ids) .withColumn( 'composite_key', F.concat('original_id', F.lit('@'), 'replica_id') ) )
22 .def randint(limit): return F.least( F.floor(F.rand() * limit), F.lit(limit - 1), # just to avoid unlikely edge case ) # Randomly assign rows in skewed data to buckets # composite_key has same format as in uniform data skewed_data_tagged = ( skewed_data .withColumn( 'composite_key', F.concat( 'original_id', F.lit('@'), randint(replication_factor), ) ) )
23 .# Join them together on the composite key joined = skewed_data_tagged.join( uniform_data_replicated, on='composite_key', how='inner', )
24 .# Join them together on the composite key joined = skewed_data_tagged.join( uniform_data_replicated, on='composite_key', WARNING how='inner', ) Inner and left outer joins only
25 .FASTER JOINS Remember duplicates… Same row All different rows Same row Same row
26 .Experiments with 100 million rows of data with uniformly-distributed keys synthetic data 100 billion rows of data with Zipf-distributed keys Standard inner join ran for 7+ hours then I killed it! 10x replicated join completed in 1h16m
27 .Can we do better?
28 .Differential replication Very common keys should be replicated many times Rare keys don’t need to be replicated as much (or at all?) Identify frequent keys before replication Use different replication policy for those
29 .replication_factor_high = 50 replication_high = F.broadcast( spark .range(replication_factor_high) .withColumnRenamed('id', 'replica_id') ) replication_factor_low = 10 replication_low = … # as above # Determine which keys are highly over-represented top_keys = F.broadcast( skewed_data .freqItems(['original_id'], 0.0001) # return keys with frequency > this .select( F.explode('id_freqItems').alias('id_freqItems') ) )