- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
使用Apache Spark Pulkit Bhanot进行大规模的特征聚合
展开查看详情
1 .Large-scale Feature Aggregation Using Apache Spark Pulkit Bhanot, Amit Nene Risk Platform #Dev1SAIS
2 .Agenda • Motivation • Challenges • Architecture Deep Dive • Role of Spark • Takeaways #Dev1SAIS 2
3 .Team Mission Build a scalable, self-service Feature Engineering Platform for predictive decisioning (based on ML and Business Rules) • Feature Engineering: use of domain knowledge to create Features • Self-Service for Data Scientists and Analysts without reliance on Engineers • Generic Platform: consolidating work towards wider ML efforts at Uber #Dev1SAIS 3
4 .Sample use case: Fraud detection Detect and prevent bad actors in real-time number of trips over X hours/weeks trips cancelled over Y months count of referrals over lifetime ... Payments Promotions #Dev1SAIS 4
5 .Time Series Aggregations Needs • Lifetime of entity Warehouse Streaming Indexed • Sliding long-window: days/weeks/months aggregations databases • Sliding short-window: mins/hours • Real-time Existing solutions None • None satisfies all of above fits the • Complex onboarding bill! #Dev1SAIS 5
6 .Technical Challenges • Scale: 1000s of aggregations for 100s million of business entities • Long-window aggregation queries slow even with indexes (seconds). Millis at high QPS needed. • Onboarding complexity: many moving parts • Recovery from accrual of errors #Dev1SAIS 6
7 .Our approach One-stop shop for aggregation • Single system to interact with • Single spec: automate configurations of underlying system Scalable • Leverage the scale of Batch system for long window • Combine with real-time aggregation for freshness • Rollups: aggregate over time intervals • Fast query over rolled up aggregates • Caveat: summable functions Self-healing • Batch system auto-corrects any errors #Dev1SAIS 7
8 .Aggregator as a black box Input parameters to black box Raw Events: Streaming+Hive Aggregated ➔ Source events Features Aggregation Aggregator Function: ➔ Grain size sum, count, etc. ➔ Aggregation Aggregation Window: functions LTD, 7 days, 30 days, etc. ➔ Aggregation windows Grain Size: 5 min (realtime), 1 day (offline), etc. #Dev1SAIS 8
9 .Overall architecture • Batch (Spark) 1 Batch Aggregator – Long-window: weeks, months (Spark Apps) – Bootstrap, incremental modes 2 • Streaming (e.g. Kafka events) 3 – Short-window (<24 hrs) – Near-real time Specs 4 Feature Access Feature Store (Microservice) • Real-time Access – Merge offline and streaming • Feature Store 2 – Save rolled-up aggregates in Hive and Cassandra 1 Real-time Aggregator (Streaming) #Dev1SAIS 9
10 .Batch Spark Engine Optimized Snapshot 5 Full Snapshot Decisioning 3 Incremental System Snapshot Computation Hive Feature Scheduler Extractor Optimized Tbl1:<2018-04-10> Bootstrap 4 6 Snapshot 2 Rollup Optimizer Dispersal Feature Store Tb1:<2018-04-09> Generator (Cassandra) Periodic 7 Snapshot 9 Manager 1 Batch Aggregator (Spark Apps) 10 8 Feature Specs Access #Dev1SAIS 10
11 .Batch Storage Features involving Lifetime computation. Features involving sliding window computation. Hive ATable-1_Lifetime -partition-<2018-04-10> Daily Lifetime uuid, f1_ltd, f2_ltd Snapshot Table-1 Tbl1:<2018-04-10> -partition-<2018-04-11> -partition-<2018-04-10> ……. uuid, f1_ltd, f2_ltd ATable-1_joined col1, col2, col3, col4 -partition-<2018-04-12> -partition-<2018-04-10> -partition-<2018-04-11> Tbl1:<2018-04-13> uuid, f1_ltd, f2_ltd uuid, f1, f2, f1_ltd, f2_ltd col1, col2, col3, col4 -partition-<2018-04-13> ATable-1_LTD:<2018-04-13> -partition-<2018-04-11> -partition-<2018-04-12> uuid, f1_ltd, f2_ltd uuid, f1, f2, f1_ltd, f2_ltd col1, col2, col3, col4 -partition-<2018-04-12> -partition-<2018-04-13> Tbl1:<2018-04-13> ATable-1_daily:<2018-04-13> uuid, f1, f2, f1_ltd, f2_ltd col1, col2, col3, col4 ATable-1_daily -partition-<2018-04-13> -partition-<2018-04-10> uuid, f1, f2, f1_ltd, f2_ltd uuid, f1, f2 Daily Partitioned -partition-<2018-04-11> Source Tables uuid, f1, f2 Daily Incremental -partition-<2018-04-12> uuid, f1, f2 Rollup Dispersed to real- -partition-<2018-04-13> time store uuid, f1, f2 Rolled-up Tables #Dev1SAIS 11
12 .Role of Spark ● Orchestrator of ETL pipelines e.g of an optimization in bootstrap dispersal ○ Scheduling of subtasks dailyDataset.join( ltdData, ○ Record incremental progress JavaConverters.asScalaIteratorConverter( Arrays.asList(pipelineConfig.getEntityKey()).iterator()) ● Optimally resize HDFS files: scale with .asScala() .toSeq(), size of data set. "outer"); ● Rich set of APIs to enable complex optimizations uuid _ltd daily_buckets 44b7dc88 1534 [{"2017-10-24":"4"},{"2017-08- 22":"3"},{"2017-09-21":"4"},{"2017- 08-08":"3"},{"2017-10- 03":"3"},{"2017-10-19":"5"},{"2017- 09-06":"1"},{"2017-08- 17":"5"},{"2017-09-09":"12"},{"2017- 10-05":"5"},{"2017-09- 25":"4"},{"2017-09-17":"13"}] #Dev1SAIS 12
13 .Role of Spark (continued) • Ability to disperse billions of records – HashPartitioner to the rescue P1 //Partition the data by hash HashPartitioner hashPartitioner = new HashPartitioner(partitionNumber); JavaPairRDD<String, Row> hashedRDD = keyedRDD.partitionBy(hashPartitioner); P2 Process //Fetch each hash partition and process Paren Each foreach partition{ t P3 Partition JavaRDD<Tuple2<String, Row>> filteredHashRDD = filterRows(hashedRDD, index, paritionId); RDD raise error if partition mismatch Dataset<Row> filteredDataSet = ….. etlContext.getSparkSession().createDataset(filteredHashRDD.map(tuple -> tuple._2()).rdd(), data.org$apache$spark$sql$Dataset$$encoder); Pn //repartition filteredDataSet, update checkpoint and records processed after successful completion. #Dev1SAIS 13
14 .Role of Spark in Dispersal • Global Throttling 2018-02-01 Bootstrap – Feature Store can be the bottleneck 2018-02-01 – coalesce() to limit the executors • Inspect data 2018-02-02 2018-02-02 – Disperse only if any column has Dispersal C* changed 2018-02-03 • Monitoring and alert 2018-02-03 – create custom metrics …. 2018-03-01 2018-03-01 Full computation snapshots Optimized snapshots #Dev1SAIS 14
15 .Real-time streaming engine ● Real-time, summable streaming time computation window aggregations for < 24 hours pipelines aggregator xform_0 ● Semantically equivalent to offline computation event xform_1 enrichment C* raw kafka ● Aggregation rollups (5 mins) events xform_2 maintained in feature store Uber Athena streaming (Cassandra) RPCs microservices #Dev1SAIS 15
16 .Final aggregation in real time entity_common_aggr_bt_ltd ● Uses time series and clustering UUID (PK) trip_count_ltd key support in Cassandra ○ 1 table for Lifetime & LTD entity_common_aggr_bt Metadata Service values. UUID (PK) eventbucket (CK) trip_count Query ○ Multiple tables for realtime Planner values with grain size 5M entity_common_aggr_rt_2018_05_08 Feature access Service ● Consult metadata and assemble entity_common_aggr_rt_2018_05_09 into single result at feature entity_common_aggr_rt_2018_05_10 e.g - lifetime trip count UUID eventbucket trip_coun access time (PK) (CK) t - trips over last 51 hrs - trips over previous 2 days #Dev1SAIS 16
17 .Self-service onboarding Create Query (Spark SQL) Configure Spec Test Spec Commit to Prod #Dev1SAIS 17
18 . Machine learning support Backfill Support: what is the value of a feature f1 for an entity E1 from Thist to Tnow Last 30 day T-120 trips at T-90 • Bootstrap to historic point in time: Thist T-119 • Incrementally compute from Thist to Tnow ….. Last 30 day Lifetime trips at T-89 value on a given date How ? T-90 ….. • Lifetime: feature f1 on Thist access partition Thist • Windowed: feature f2 on Thist with window N days T-1 • Merge partitions Thist-N to Thist T #Dev1SAIS 18
19 .Takeaways ● Use of Spark to achieve massive scale ● Combine with Streaming aggregation for freshness ● Low latency access in production (P99 <= 20ms) at high QPS ● Simplify onboarding via single spec, onboarding time in hours ● Huge computational cost improvements #Dev1SAIS 19
20 . bhanotp@uber.com anene@uber.com Proprietary and confidential © 2018 Uber Technologies, Inc. All rights reserved. No part of this document may be reproduced or utilized in any form or by any means, electronic or mechanical, including photocopying, recording, or by any information storage or retrieval systems, without permission in writing from Uber. This document is intended only for the use of the individual or entity to whom it is addressed and contains information that is privileged, confidential or otherwise exempt from disclosure under applicable law. All recipients of this document are notified that the information contained herein includes proprietary and confidential information of Uber, and recipient may not make use of, disseminate, or in any way disclose this document or any of the enclosed information to any person other than employees of addressee to the extent necessary for consultations with authorized personnel of Uber.