【分会场二05-孟文瑞 于淼】Uber 商业性能指标生成与管理

【分会场二05-孟文瑞 于淼】Uber 商业性能指标生成与管理
展开查看详情

1.Uber Business Metrics Generation and Management Through Apache Flink Uber ࠟӱ௔ᚆ೰ຽኞ౮Өᓕቘ Uber Engineer at Uber Marketplace Wenrui Meng Miao Yu

2.Uber Mission We ignite opportunity by setting the world in motion. ౯ժ຅ୌԻ᭗๐‫ۓ‬ଘ‫҅ݣ‬ԅ‫ق‬ቖᅩᆮ๢᭬ԏᅉ ʼnᬩŊۖ‫ق‬ቖ҅ᆮՄ๢տ

3.Outline Ø Marketplace Ø ᮱ᳪՕᕨ Ø Project background Ø ᶱፓᙧว Ø Architecture Ø ᔮᕹຝ຅ Ø User case walkthrough Ø ໜֺ‫ړ‬ຉ Ø Challenges Ø ೴౴ Ø Future work Ø ઀๕

4.Marketplace Team Marketplace Platform Marketplace Data ૱࣋ଘ‫ݣ‬ᕟ ૱࣋හഝᕟ Marketplace Dynamics Marketplace Fare ૱࣋ۖாᕟ ૱࣋ᩇአᕟ

5.Marketplace Data ● Mission: Empower teams throughout Uber to understand and improve the efficiency of the marketplace by providing real time data for our Marketplace levers, modeling and analysis ֵ޸ғԅ8EHU‫ݱ‬ᕟ൉‫׀‬ਫ෸හഝ๶੒૱࣋ᬰᤈୌཛྷ޾‫ړ‬ຉ҅՗ᘒๅঅࣈቘᥴ޾දᬰ૱࣋ᬩ֢පሲ ● Data sources: trip lifecycle, matching, rider, driver, fare, mobile across 100+ Apache Kafka topics හഝ๶რғԙ᫣ᬦᑕ҅‫܃‬ᯈ҅ԙਮ҅‫ݪ‬๢҅ᩇአ҅ᑏۖᦡ॓ᒵग़ᐿ.DINDԆ᷌ ● 24 Elasticsaerch clusters, 1000T data, 300+ million queries / week , 30+ customer services හഝ॔๥ଶ޾ᥢཛྷғইӤ

6.Marketplace Data Trip Flow Patterns ԙ᫣᪠ᕚཛྷୗ Human Consumption – Incentives ᄶۜ Demand Patterns ԙਮᵱ࿢ཛྷୗ Human Consumption - Regional Metrics ‫܄‬ऒ೰ຽ

7.

8.Real-Time Analytics

9.Project Background ● Usage pattern ֵአཛྷୗ ● Issues ᳯ᷌ ● Solution ොໜ

10.Usage Pattern ● Metrics are aggregated on the fly from raw data. ೰ຽ᮷ฎࣁ୮‫ڹ‬ᵱᥝጱ෸‫҅ײ‬ፗള՗ܻতහഝሿ࣋ᦇᓒ ● Metric definitions have a DAG structure. ೰ຽਧԎํӞӻࢴਧጱ'$*ᕮ຅ ● Metricsare usually aggregated with fixed pattern upon a limited set of dimensions, e.g., time, geo ೰ຽ᭗ଉ᮷ฎํࢴਧጱཛྷୗचԭӞԶํᴴጱොᶎ๶ᕹᦇᦇᓒጱ

11.Issues ● Resource waste ᩒრၵᩇ ● Bad performance ֗௔ᚆ ● Inconsistency ● ...

12.Solution ● AnE2E platform Ӟӻᒒ‫ک‬ᒒጱଘ‫ݣ‬ ○ Defines metrics with a unified DSL for multiple datasources like Kafka, Elastic Search etc. चԭᕹӞጱ'6/๶ਧԎ೰ຽ҅ଚӬ꧋ᦜग़ᐿᬌ‫ف‬හഝᔄࣳ ○ Optimizes execution plan based on objectives including performance, isolation, and etc. ໑ഝӧ‫ݶ‬ጱፓຽҁ௔ᚆ҅ᵍᐶᒵ҂๶ս۸ಗᤈᦇ‫ښ‬ ○ Generates final metrics, instead of generating an intermediate raw dataset ኞ౮๋ᕣአಁֵአጱ೰ຽ҅ᘒӧฎኞ౮ӾᳵܻහഝᦏአಁᬰӞྍᦇᓒ

13.Architecture

14.Metric Definition ● Formula: a DSL describing an aggregation or arithmetic operation over aggregation ‫ل‬ୗғਧԎই֜ᦇᓒ೰ຽ metric_A + metric_B * metric_C - avg((table_A where (exists(column_A) AND (column_B != 0))).column_C) ● Dimensions: over which this metric will be aggregated ᖌଶғਧԎ೰ຽฎٍ֛ࣁᮎԶᇙਧጱᖌଶ๶ᦇᓒ Time, geo, product type, and ... ● Sinks: physical storage of this metric ള‫ݑ‬࿰ғਧԎ೰ຽጱᇔቘਂ‫࢏ؙ‬ Elastic search, Cassandra, MySQL, and ...

15.Execution Optimizer 1. Parse all metric definitions ᥴຉಅํጱ೰ຽਧԎ 2. Build a DAG to link all metrics based on data operations ຅ୌӞӻ'$*๶᭗ᬦහഝ඙֢ᬳളಅํጱ೰ຽᦇᓒ 3. Optimize the DAG to remove duplication, promote isolation and improve performance ੒೰ຽᬰᤈ‫ړ‬ᕟ҅ս۸'$*՗ᘒ݄ᴻ᯿॔҅൉‫܋‬ᵍᐶզ݊දᬰ௔ᚆ 4. Generate metric group config files and feed them into execution plan generator ኞ౮೰ຽ‫ړ‬ᕟᯈᗝ෈կ҅൉‫׀‬ᕳಗᤈᦇ‫ښ‬ኞ౮࢏

16.Execution Plan Generator 1. Parse metric group config files ᥴຉ೰ຽ‫ړ‬ᕟᯈᗝ෈կ 2. Given configs, build a DAG of stream operators. Within each stream operator, there could be also a DAG of event operators ຅ୌၞ඙֢ '$*ٌ҅ӾྯӞӻ '$* ᜓᅩӾฎचԭԪկጱ '$* ๶ᤒᬡ‫ل‬ୗᦇᓒ̶ 3. Applied pre-defined operator templates and generate the final Flink execution graph ଫአ൉‫ڹ‬ਧԎጱ )OLQN ཛྷ຃๶ኞ౮ )OLQN ಗᤈࢶ

17.Additional Functionalities ● Given a specific set of metrics, export SQL for it. ᭌਧӞᕟ೰ຽ҅‫ݢ‬զ੕‫ڊ‬੒ଫጱ 64/ ● Convert the adhoc query to leverage existing data in platform ਖ਼ԁ෸አಁ൉Իጱັᧃᬰᤈ᫨ഘ҅՗ᘒ‫ݢ‬զ‫ڥ‬አ૪ᕪᦇᓒঅጱ೰ຽପ ● Auto debugging to find root cause of data abnormality ᛔۖ੔ತԾኞӧྋଉහഝጱ໑๜ܻࢩ ● Metric discovery ೰ຽറᔱ ● ….

18.Benefits Ø Avoid inconsistency and duplicate effort Ø ᭿‫ع‬ӧӞᛘ௔զ݊᯿॔ጱૡ֢ Ø Avoid cluster resource waste Ø ᭿‫ع‬ᩒრၵᩇ Ø Improve the metrics definition Ø දᬰ೰ຽਧԎ Ø Decouple metrics definition and generation Ø ‫ړ‬ᐶ೰ຽਧԎ޾ᬩᓒ techniques Ø ଘ‫ݣ‬Өਂ‫ؙ‬ጱຂᘠ‫ݳ‬ Ø Decouple platform from storage

19.Use Case Walkthrough every 4 seconds Back-end Driver status summary of one minute: चԭ‫ړ‬ᰦጱ‫ݪ‬๢ᇫாᕹᦇ service 1.Driver utilization per hexagon on 1 minute ‫ܔ‬ᇿ KH[DJRQ ྯ‫ړ‬ᰦጱ‫ݪ‬๢‫ܛ‬አఘ‫٭‬ Kafka 2.Driver utilization per city on 1 minute ‫ܔ‬ӻउ૱ྯ‫ړ‬ᰦ‫ݪ‬๢‫ܛ‬አఘ‫٭‬ 3.Driver total worked minutes per hour Gairos Ingestion ‫ܔ‬ӻ‫ݪ‬๢ྯੜ෸ૡ֢෸ᳵ 4.Open cars of each hexagon at 1PM in San Francisco Cluster 1 Cluster 2 Cluster 3 ෯ᰂઊ૱ྯӻ KH[DJRQ Ӥᑮ᫣‫ݪ‬๢

20.Use Case Walkthrough Dimension Value There are two solutions that support these three use cases: Hexagon id Hex_1 ౯ժ‫ݪل‬ԏ‫ࣁਂڹ‬ӷᐿᥴ٬ොໜ Driver id Driver_1 1. Aggregate 1 minute summary into Elasticsearch and query on-the-fly Status Driving_client Others.. .. चԭӞ‫ړ‬ᰦ๶ᕹᦇ‫ݪ‬๢ጱᇫாଚਂ‫( کؙ‬ODVWLFVHDUFK ᯾ᶎ҅ᆐ‫ݸ‬አಁ໑ഝӧ‫ݶ‬ᵱ࿢๶ັᧃ 2. Each team calculate the exact data needed through streaming job ྯӻᕟֵአၞ॒ቘ๶ᦇᓒᵱᥝጱහഝ

21.Use Case Walkthrough Our new framework supports all these use cases but without any duplication or inconsistency. ౯ժෛጱଘ‫ݢݣ‬զඪ೮Ӥᶎಅํጱᵱ࿢ଚӬ‫ݢ‬զ᭿‫ع‬᯿॔զ݊ӧӞᛘ௔

22.Challenges Complex topology ॔๥ጱ೐ಏ᭦ᬋ ● Hundreds of metrics ౮ጯӤ‫܉‬ጱ೰ຽ ● Cluster the tasks and split them into separate jobs connected via Apache Kafka. ਖ਼ಅํጱၞձ‫ۓ‬ᬰᤈᘸᔄ҅ᆐ‫ݸ‬᮱ᗟ౮ӧ‫ݶ‬ጱၞૡ֢҅ஂྌԏᳵ᭗ᬦ .DIND ๶ᬳള

23.Challenges Unstable sinks ӧᑞਧጱහഝള‫ݑ‬࿰ ● Metrics stored in different storages maintained by different teams with different SLAs ೰ຽտਂ‫کؙ‬ӧ‫ݶ‬ጱᇔቘਂ‫҅࢏ؙ‬ଚӬտᤩӧ‫ݶ‬ጱᕟզӧ‫ݶ‬ጱ 6/$ ๶ᖌಷ ● It’s desirable to separate the sink from the processor ౯ժᵱᥝ಩හഝ॒ቘ޾හഝਂ‫ؙ‬ጱᬦᑕ‫୏ړ‬

24.Challenges Metrics evolution ೰ຽᄍ۸ ● Add new metrics and update the existing metric ी‫ے‬౲ᘏ‫ץ‬ද೰ຽ ● Distinguish the golden sets from ad-hoc cases ‫ړ܄‬໐ஞහഝᵞզ݊ԁ෸ֵአහഝ

25.Challenges Onboarding ᦏአಁֵአෛහഝ ● 15 teams and 20+ services query the existing platform ౯ժํӻզӤጱᕟզ݊ӻզӤጱᔮᕹࣁֵአ૪ํጱଘ‫ݣ‬ ● Auto conversion through the query gateway ࣁັᧃ‫॒ݗف‬ᬰᤈᛔۖ᫨ഘ

26.Future Work ● Improve execution optimizer ද࠺౯ժጱಗᤈᦇ‫ښ‬ս۸࢏ ● Improve tooling ද࠺౯ժጱૡٍ

27.

28.