1.Uber Business Metrics Generation and Management Through Apache Flink Uber ࠟӱᚆຽኞ౮Өᓕቘ Uber Engineer at Uber Marketplace Wenrui Meng Miao Yu
2.Uber Mission We ignite opportunity by setting the world in motion. ౯ժୌԻ᭗๐ۓଘ҅ݣԅقቖᅩᆮ᭬ԏᅉ ŉᬩŊۖقቖ҅ᆮՄտ
3.Outline Ø Marketplace Ø ᮱ᳪՕᕨ Ø Project background Ø ᶱፓᙧว Ø Architecture Ø ᔮᕹຝ Ø User case walkthrough Ø ໜֺړຉ Ø Challenges Ø Ø Future work Ø ๕
4.Marketplace Team Marketplace Platform Marketplace Data ૱ଘݣᕟ ૱හഝᕟ Marketplace Dynamics Marketplace Fare ૱ۖாᕟ ૱ᩇአᕟ
5.Marketplace Data ● Mission: Empower teams throughout Uber to understand and improve the efficiency of the marketplace by providing real time data for our Marketplace levers, modeling and analysis ֵғԅ8EHUݱᕟ׀ਫහഝ૱ᬰᤈୌཛྷړຉ҅ᘒๅঅቘᥴදᬰ૱ᬩ֢පሲ ● Data sources: trip lifecycle, matching, rider, driver, fare, mobile across 100+ Apache Kafka topics හഝრғԙᬦᑕ҅܃ᯈ҅ԙਮ҅ݪ҅ᩇአ҅ᑏۖᦡ॓ᒵग़ᐿ.DINDԆ᷌ ● 24 Elasticsaerch clusters, 1000T data, 300+ million queries / week , 30+ customer services හഝ॔ଶᥢཛྷғইӤ
6.Marketplace Data Trip Flow Patterns ԙ᪠ᕚཛྷୗ Human Consumption – Incentives ᄶۜ Demand Patterns ԙਮᵱཛྷୗ Human Consumption - Regional Metrics ܄ऒຽ
9.Project Background ● Usage pattern ֵአཛྷୗ ● Issues ᳯ᷌ ● Solution ොໜ
10.Usage Pattern ● Metrics are aggregated on the fly from raw data. ຽ᮷ฎࣁ୮ڹᵱᥝጱ҅ײፗളܻতහഝሿᦇᓒ ● Metric definitions have a DAG structure. ຽਧԎํӞӻࢴਧጱ'$*ᕮ ● Metricsare usually aggregated with fixed pattern upon a limited set of dimensions, e.g., time, geo ຽ᭗ଉ᮷ฎํࢴਧጱཛྷୗचԭӞԶํᴴጱොᶎᕹᦇᦇᓒጱ
11.Issues ● Resource waste ᩒრၵᩇ ● Bad performance ֗ᚆ ● Inconsistency ● ...
12.Solution ● AnE2E platform Ӟӻᒒکᒒጱଘݣ ○ Defines metrics with a unified DSL for multiple datasources like Kafka, Elastic Search etc. चԭᕹӞጱ'6/ਧԎຽ҅ଚӬ꧋ᦜग़ᐿᬌفහഝᔄࣳ ○ Optimizes execution plan based on objectives including performance, isolation, and etc. ໑ഝӧݶጱፓຽҁᚆ҅ᵍᐶᒵ҂ս۸ಗᤈᦇښ ○ Generates final metrics, instead of generating an intermediate raw dataset ኞ౮๋ᕣአಁֵአጱຽ҅ᘒӧฎኞ౮ӾᳵܻහഝᦏአಁᬰӞྍᦇᓒ
14.Metric Definition ● Formula: a DSL describing an aggregation or arithmetic operation over aggregation لୗғਧԎই֜ᦇᓒຽ metric_A + metric_B * metric_C - avg((table_A where (exists(column_A) AND (column_B != 0))).column_C) ● Dimensions: over which this metric will be aggregated ᖌଶғਧԎຽฎٍ֛ࣁᮎԶᇙਧጱᖌଶᦇᓒ Time, geo, product type, and ... ● Sinks: physical storage of this metric ളݑғਧԎຽጱᇔቘਂؙ Elastic search, Cassandra, MySQL, and ...
15.Execution Optimizer 1. Parse all metric definitions ᥴຉಅํጱຽਧԎ 2. Build a DAG to link all metrics based on data operations ୌӞӻ'$*᭗ᬦහഝ֢ᬳളಅํጱຽᦇᓒ 3. Optimize the DAG to remove duplication, promote isolation and improve performance ຽᬰᤈړᕟ҅ս۸'$*ᘒ݄ᴻ᯿॔҅܋ᵍᐶզ݊දᬰᚆ 4. Generate metric group config files and feed them into execution plan generator ኞ౮ຽړᕟᯈᗝկ҅׀ᕳಗᤈᦇښኞ౮
16.Execution Plan Generator 1. Parse metric group config files ᥴຉຽړᕟᯈᗝկ 2. Given configs, build a DAG of stream operators. Within each stream operator, there could be also a DAG of event operators ୌၞ֢ '$*ٌ҅ӾྯӞӻ '$* ᜓᅩӾฎचԭԪկጱ '$* ᤒᬡلୗᦇᓒ̶ 3. Applied pre-defined operator templates and generate the final Flink execution graph ଫአڹਧԎጱ )OLQN ཛྷኞ౮ )OLQN ಗᤈࢶ
17.Additional Functionalities ● Given a specific set of metrics, export SQL for it. ਧӞᕟຽ҅ݢզڊଫጱ 64/ ● Convert the adhoc query to leverage existing data in platform ਖ਼ԁአಁԻጱັᧃᬰᤈഘ҅ᘒݢզڥአ૪ᕪᦇᓒঅጱຽପ ● Auto debugging to find root cause of data abnormality ᛔۖತԾኞӧྋଉහഝጱ໑ܻࢩ ● Metric discovery ຽറᔱ ● ….
18.Benefits Ø Avoid inconsistency and duplicate effort Ø عӧӞᛘզ݊᯿॔ጱૡ֢ Ø Avoid cluster resource waste Ø عᩒრၵᩇ Ø Improve the metrics definition Ø දᬰຽਧԎ Ø Decouple metrics definition and generation Ø ړᐶຽਧԎᬩᓒ techniques Ø ଘݣӨਂؙጱຂᘠݳ Ø Decouple platform from storage
19.Use Case Walkthrough every 4 seconds Back-end Driver status summary of one minute: चԭړᰦጱݪᇫாᕹᦇ service 1.Driver utilization per hexagon on 1 minute ܔᇿ KH[DJRQ ྯړᰦጱݪܛአఘ٭ Kafka 2.Driver utilization per city on 1 minute ܔӻउ૱ྯړᰦݪܛአఘ٭ 3.Driver total worked minutes per hour Gairos Ingestion ܔӻݪྯੜૡ֢ᳵ 4.Open cars of each hexagon at 1PM in San Francisco Cluster 1 Cluster 2 Cluster 3 ෯ᰂઊ૱ྯӻ KH[DJRQ Ӥᑮݪ
20.Use Case Walkthrough Dimension Value There are two solutions that support these three use cases: Hexagon id Hex_1 ౯ժݪلԏࣁਂڹӷᐿᥴ٬ොໜ Driver id Driver_1 1. Aggregate 1 minute summary into Elasticsearch and query on-the-fly Status Driving_client Others.. .. चԭӞړᰦᕹᦇݪጱᇫாଚਂ( کؙODVWLFVHDUFK ᯾ᶎ҅ᆐݸአಁ໑ഝӧݶᵱັᧃ 2. Each team calculate the exact data needed through streaming job ྯӻᕟֵአၞ॒ቘᦇᓒᵱᥝጱහഝ
21.Use Case Walkthrough Our new framework supports all these use cases but without any duplication or inconsistency. ౯ժෛጱଘݢݣզඪ೮ӤᶎಅํጱᵱଚӬݢզع᯿॔զ݊ӧӞᛘ
22.Challenges Complex topology ॔ጱಏ᭦ᬋ ● Hundreds of metrics ౮ጯӤ܉ጱຽ ● Cluster the tasks and split them into separate jobs connected via Apache Kafka. ਖ਼ಅํጱၞձۓᬰᤈᘸᔄ҅ᆐݸ᮱ᗟ౮ӧݶጱၞૡ֢҅ஂྌԏᳵ᭗ᬦ .DIND ᬳള
23.Challenges Unstable sinks ӧᑞਧጱහഝളݑ ● Metrics stored in different storages maintained by different teams with different SLAs ຽտਂکؙӧݶጱᇔቘਂؙ҅ଚӬտᤩӧݶጱᕟզӧݶጱ 6/$ ᖌಷ ● It’s desirable to separate the sink from the processor ౯ժᵱᥝහഝ॒ቘහഝਂؙጱᬦᑕړ
24.Challenges Metrics evolution ຽᄍ۸ ● Add new metrics and update the existing metric ीےᘏץදຽ ● Distinguish the golden sets from ad-hoc cases ړ܄໐ஞහഝᵞզ݊ԁֵአහഝ
25.Challenges Onboarding ᦏአಁֵአෛහഝ ● 15 teams and 20+ services query the existing platform ౯ժํӻզӤጱᕟզ݊ӻզӤጱᔮᕹࣁֵአ૪ํጱଘݣ ● Auto conversion through the query gateway ࣁັᧃ॒ݗفᬰᤈᛔۖഘ
26.Future Work ● Improve execution optimizer ද࠺౯ժጱಗᤈᦇښս۸ ● Improve tooling ද࠺౯ժጱૡٍ