- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Large-Scale, Near Real-Time Pipelines at Uber
展开查看详情
1 .Session hashtag: #SAISEco10
2 .
3 .
4 .● ● ● ● ● ●
5 .
6 .
7 .
8 .
9 .
10 . Normal Table (Hive/Spark/Presto) Changelog Changelog Dataset
11 .
12 . Hive Index Queries Hudi Presto DataSource Data Files Queries (Spark) Store & Index Read data Spark Data Timeline DAGs Metadata Storage Views Type Dataset On Hadoop FS
13 . REALTIME Cost READ OPTIMIZED Latency
14 .
15 .
16 .
17 .
18 .// Command to extract incrementals using sqoop // Spark Datasource bin/sqoop import \ Import com.uber.hoodie.DataSourceWriteOptions._ -Dmapreduce.job.user.classpath.first=true \ // Use Spark datasource to read avro --connect jdbc:mysql://localhost/users \ Dataset<Row> inputDataset --username root \ spark.read.avro(‘s3://tmp/sqoop/import-1/users/*’); --password ******* \ --table users \ // save it as a Hoodie dataset --as-avrodatafile \ inputDataset.write.format(“com.uber.hoodie”) --target-dir \ .option(HoodieWriteConfig.TABLE_NAME, “hoodie.users”) s3:///tmp/sqoop/import-1/users .option(RECORDKEY_FIELD_OPT_KEY(), "userID") .option(PARTITIONPATH_FIELD_OPT_KEY(),"country") .option(PRECOMBINE_FIELD_OPT_KEY(), "last_mod") .option(OPERATION_OPT_KEY(), UPSERT_OPERATION_OPT_VAL()) .mode(SaveMode.Append); .save(“/path/on/dfs”)
19 .// Deltastreamer command to ingest sqoop incrementals spark-submit \ --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer \ /path/to/hoodie-utilities-*-SNAPSHOT.jar` \ --props s3://path/to/dfs-source.properties \ --schemaprovider-class com.uber.hoodie.utilities.schema.FilebasedSchemaProvider \ --source-class com.uber.hoodie.utilities.sources.AvroDFSSource \ --source-ordering-field last_mod \ --target-base-path s3:///path/on/dfs --target-table uber.employees \ --op UPSERT // dfs-source-properties include=base.properties # Key generator props hoodie.datasource.write.recordkey.field=_userID hoodie.datasource.write.partitionpath.field=country # Schema provider props hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=s3:///path/to/users.avsc # DFS Source hoodie.deltastreamer.source.dfs.root=s3:///tmp/sqoop
20 .
21 .
22 .// Deltastreamer command to ingest kafka events, dedupe, ingest spark-submit --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer \ /path/to/hoodie-utilities-*-SNAPSHOT.jar` \ --props s3://path/to/kafka-source.properties \ --schemaprovider-class com.uber.hoodie.utilities.schema.SchemaRegistryProvider \ --source-class com.uber.hoodie.utilities.sources.AvroKafkaSource \ --source-ordering-field time \ --target-base-path s3:///hoodie-deltastreamer/impressions --target-table uber.impressions \ --op BULK_INSERT --filter-dupes // kafka-source-properties include=base.properties # Key fields, for kafka example hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=datestr # schema provider configs hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/v ersions/latest # Kafka Source hoodie.deltastreamer.source.kafka.topic=impressions #Kafka props metadata.broker.list=localhost:9092 auto.offset.reset=smallest schema.registry.url=http://localhost:8081
23 .
24 . New Data Unaffected Data changes Updated Data Source table ETL table A ETL table B
25 .// Spark Datasource Import com.uber.hoodie.{DataSourceWriteOptions, DataSourceReadOptions}._ // Use Spark datasource to read avro Dataset<Row> hoodieIncViewDF = spark.read().format("com.uber.hoodie") .option(VIEW_TYPE_OPT_KEY(), VIEW_TYPE_INCREMENTAL_OPT_VAL()) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(),commitInstantFor8AM) .load(“s3://tables/raw_trips”); Dataset<Row> stdDF = standardize_fare(hoodieIncViewDF) // save it as a Hoodie dataset inputDataset.write.format(“com.uber.hoodie”) .option(HoodieWriteConfig.TABLE_NAME, “hoodie.std_trips”) .option(RECORDKEY_FIELD_OPT_KEY(), "id") .option(PARTITIONPATH_FIELD_OPT_KEY(),"datestr") .option(PRECOMBINE_FIELD_OPT_KEY(), "time") .option(OPERATION_OPT_KEY(), UPSERT_OPERATION_OPT_VAL()) .mode(SaveMode.Append); .save(“/path/on/dfs”)
26 .
27 .
28 .. Questions?