批量定制流媒体流的大规模模糊名称的匹配

荷兰国际集团银行是一家荷兰跨国多产品银行,在40多个国家向3300万零售和商业客户提供银行服务。在这种规模下,ING自然会面对不同数据源的大量数据整合任务。常见的合并问题是模糊名称匹配:给定名称(流)或名称列表(批处理),从不同的列表中找出最相似的名称。
展开查看详情

1.Large Scale Fuzzy Name Matching ING Wholesale Banking Advanced Analytics Zhe Sun & Daniel van der Ende 06/06/2018 #MLSAIS17

2.ING in a nutshell Worldwide financial institution Active in over 40 countries Approximately 50k employees Almost 40M customers 2

3.Wholesale Banking Advanced Analytics (WBAA) Data Engineers, Data Scientists, Business Developers, Product Owners, UX Designers and Software Developers internal external 3

4. What is Name Matching? Names to be matched Ground truth (Moody’s Credit Ratings) (ING Customer Records) ID Name Moody’s Rating ID Name 1 Daniel Dutch ? Daniel Dutch B.V. AA Name 2 Daniel Irish ? Zhe General Ltd. AAA Matching 3 General Zhe Ground Truth Similarity ID Name Moody’s Rating name score 1 Daniel Dutch B.V. Daniel Dutch 0.7 AA 2 Zhe General Ltd. General Zhe 0.8 AAA 4

5.From business problem to data science problem Compute pairwise similarity between ground truth names (GT) and names to be matched (NM), and pick the most similar ones as matching result GT GT_1 similarity NM GT score GT_2 N_1 GT_1 f(N_1, GT_1) Similarity GT_3 NM GT score N_1 GT_2 f(N_1, GT_2) N_1 GT_1 f(N_1, GT_1) N_1 GT_3 f(N_1, GT_3) NM N_2 GT_3 f(N_2, GT_3) N_2 GT_1 f(N_2, GT_1) N_1 N_2 GT_2 f(N_2, GT_2) N_2 N_2 GT_3 f(N_2, GT_3) Pairwise similarity computation Best match selection 5

6.Name Matching model: token-based cosine similarity The scale of the problem at ING: • Match 160 million names to 10 million names ≈ 1.6 ∗ 10'( similarity computations • Popular approaches such as Levenshtein distance will be very slow! We chose token-based cosine similarity, for the sake of speed and accuracy Cosine Candidate Preprocessing Vectorization similarity selection 6

7.Name Matching Pipeline GT GT Names Names Sparse Vector Daniel Dutch Daniel Dutch [0, 0.2, …, 0.8, …] Daniel Irish Daniel Irish [0, 0.2, 0.6, …, 0] Zhe General Zhe General [0.6, 0, …, 0, 0.9] Step 1 Step 2 Step 3 preprocessing Vectorization NM Cosine similarity NM Names Names Sparse Vector Daniel Daniel [0, 0.3, …, 0.7, …] Dutch B.V. Dutch B.V. Names GT name Score Names GT name Score Step 4 Daniel Daniel Dutch 0.8 Daniel Candidate selection Daniel Dutch 0.8 Dutch B.V. Daniel Irish 0.7 Dutch B.V. 7

8.Scaling things up: distributed sparse matrix multiplication K M P NM K GT map Broadcast M M M K K K !" NM1 K !# NM2 K !$ NMx K GT GT GT … N N N !" Top N !# Top N !$ Top N names names names Executor 1 Executor 2 Executor X N Reduce Top N P names 8

9.Scaling things up: hack SciPy implementation l Combined multiplication and Top-N selection in single operation l Implement by C++ and Cython l Less memory and 40% faster l Blog post: Boosting the selection of the most similar entities in large scale datasets (https://medium.com/p/450b3242e618) l https://github.com/ing-bank/sparse_dot_topn 9

10.Customized stage: easily wrap complex tasks class CosSimMatcherModel(): def __init__(): spark.sparkContext.broadcast(gt_features.T) def _transform(self, names_df): matched_rdd = (names_df .select(col1, col2, col3) .rdd .mapPartitions(match_chunk_of_names) .flatMap(lambda x: x)) return matched_rdd.toDF(output_schema) 10

11.Final spark ML pipeline: elegant and easily maintainable stages += [Preprocessor(params['preprocessor'], input_col=params['name_col’], output_col='preprocessed')] stages += [RegexTokenizer(inputCol='preprocessed', outputCol='tokens', pattern=r"\w+", gaps=False)] stages += [NGram(inputCol='tokens', outputCol='ngram_tokens', n=params['ngram'])] stages += [CountVectorizer(inputCol='ngram_tokens', outputCol='tf', vocabSize=2<<24)] stages += [NormalizedTfidf(count_col="tf", token_col="ngram_tokens", output_col="features”)] stages += [CosSimMatcher(num_candidates=params['num_candidates'], cos_sim_lower_bound=params['cos_sim_lower_bound'], index_col=params['index_col'], name_col=params['name_col'], chunk_size=params['chunk_size’])] snm = load_pickle(params['supervised_model_filename'], params['supervised_model_path']) stages += [SupervisedNMTransformer(snm)] self.pipeline = Pipeline(stages=stages) 11

12.160M names matched to 10M ground truth names on 10 node cluster in 5 hours Approximately 8000 names matched per second 12

13.Name Matching can be applied to multiple problems Matching ‘new’ names to existing ones is not a specific problem Exposing this capability via an API adds value for other products, departments, and perhaps companies. This use case changes the underlying design however: • The number of names to match will be significantly lower (i.e. thousands instead of millions). • Near-real-time results are appreciated, especially for small datasets • The ground truth may change, depending on the needs of the user

14. Structured Streaming “ Structured Streaming provides fast, scalable, fault- tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming ” 14

15.Our setup Structured Streaming ML 15

16.Structured Streaming nm_obj = NameMatching({ 'streaming': True, 'supervised_on': False }) nm_obj.fit(ground_truth_df) lines = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", servers) \ .option("subscribe", topic_in) \ .option("failOnDataLoss", "false") \ .load() 16

17.Structured Streaming (2) names_to_match = lines \ .select(lines.value.cast('string').alias("name")) nm_obj \ .transform(names_to_match) \ .selectExpr("extract_json(candidates, name) AS value")\ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", servers) \ .option("topic", kafka_topic_out) \ .start() \ .awaitTermination() 17

18.A small, yet big change needed for Structured Streaming Remove all actions • Actions are not allowed in Structured Streaming matched_rdd = names_df .select(col1, col2, col3) .rdd .mapPartitions(match_chunk_of_names) .flatMap(lambda x: x) # Make output a dataframe again return matched_rdd.toDF(output_schema) 18

19.No more actions PySpark does not support map(Partitions) on a (streaming) dataframe. We need to use a UDF: match_name_udf = udf(match_chunk_of_names, candidate_list_schema) matched_df = names_df .withColumn('candidates', match_name_udf(names_df.features)) 19

20.However… 20

21. Varying Ground Truth sizes. ‘RDD-method’ vs. ‘UDF-method’ 4 3.5 Transform time per name (s) 3 2.5 2 1.5 1 0.5 0 1M 2M 4M 6M 12M GT size RDD UDF 21

22.Python UDFs Executor 1 Spark DataFrame UDF Driver Program (JVM) (Python) Driver Program (Python) SparkSession Executor 2 (JVM) Spark DataFrame UDF (JVM) (Python) 22

23.Possible workarounds • Scala UDF with Java Native Interface connection to C++ matrix multiplication (blogpost: https://medium.com/p/b70033dd69b9 ) • mapPartitions for Python (streaming) dataframes • Sparse matrix multiplication for Scala/Spark • Compute the cosine similarity outside of Spark 23

24.To broadcast or not to broadcast Executor 1 GT Name1 Name3 Tracer Driver Name2 Name4 GT Executor 2 GT 24

25.Wrapping up We built a large scale fuzzy name matching system in batch and streaming Spark ML is an elegant, powerful, easy-to-use abstraction for data science pipelines/models on Spark Combining Spark ML with Structured Streaming is easy, but optimizing and tweaking it can be hard and take a lot of time. Monitoring Spark ML Stages in Structured Streaming is challenging, haven’t found a satisfactory solution yet. 25

26.26

27.What to broadcast? Executor 1 Ground Truth PARTITION A Reduce Name1 Name1 Name2 Name2 Driver Executor 2 Ground Truth PARTITION B 27

28.Spark ML pipeline: standard + customized stages Example input Step Stage Customized Description HANS Investment B.V., Willem Barentszstraat • Strip punctuation • Accents to unicode hans investment bv willem 1 Preprocessing Y • All characters to lower case barentszstr • Shorthands and abbreviations replacement [hans, investment, by, willem, 2 Tokenizer N Splits the input string by white spaces barentszstr] Converts the input tokens into an array of n- [hans, investment, by, willem, 3 NGram N grams barentszstr] 4 CountVectorizer N Extracts a vocabulary from document collections (5, [0, 1, 2, 3, 4], [1.0, 1,0, 1.0, 1.0, 1.0]) Compute the Term Frequency Inverse Document Normalized (5, [0, 1, 2, 3, 4], [0.5, 0.1, 0.01, 0.2, 5 Y Frequency (TF-IDF). We need a custom stage to TFIDF 0.8]) deal with previously unseen tokens. Cosine Compute cosine similarity between input and 6 Y Similarity ground truth Candidate 7 Y Pick the most similar names by supervised model selection 28