优步真正需要为数据消费者和产品提供更快,更新鲜的数据,每天运行数十万个分析查询。 优步工程师将分享第二代'Hudi'的设计,架构和用例,这是一个独立的Apache Spark库,用于构建旨在满足此类需求及其他需求的大规模分析数据集。 Hudi(以前称为Hoodie)的创建是为了有效管理分布式存储上的数PB分析数据,同时支持快速摄取和查询。 在本次演讲中,我们将讨论如何利用Spark作为通用分布式执行引擎来构建Hudi,详细说明权衡和操作体验。 我们还将展示使用Spark Datasource / Streaming API将数据摄取到Hudi中,并使用Spark SQL在顶部构建笔记本/仪表板。

注脚

展开查看详情

1.Session hashtag: #SAISEco10

2.

3.

4.● ● ● ● ● ●

5.

6.

7.

8.

9.

10. Normal Table (Hive/Spark/Presto) Changelog Changelog Dataset

11.

12. Hive Index Queries Hudi Presto DataSource Data Files Queries (Spark) Store & Index Read data Spark Data Timeline DAGs Metadata Storage Views Type Dataset On Hadoop FS

13. REALTIME Cost READ OPTIMIZED Latency

14.

15.

16.

17.

18.// Command to extract incrementals using sqoop // Spark Datasource bin/sqoop import \ Import com.uber.hoodie.DataSourceWriteOptions._ -Dmapreduce.job.user.classpath.first=true \ // Use Spark datasource to read avro --connect jdbc:mysql://localhost/users \ Dataset<Row> inputDataset --username root \ spark.read.avro(‘s3://tmp/sqoop/import-1/users/*’); --password ******* \ --table users \ // save it as a Hoodie dataset --as-avrodatafile \ inputDataset.write.format(“com.uber.hoodie”) --target-dir \ .option(HoodieWriteConfig.TABLE_NAME, “hoodie.users”) s3:///tmp/sqoop/import-1/users .option(RECORDKEY_FIELD_OPT_KEY(), "userID") .option(PARTITIONPATH_FIELD_OPT_KEY(),"country") .option(PRECOMBINE_FIELD_OPT_KEY(), "last_mod") .option(OPERATION_OPT_KEY(), UPSERT_OPERATION_OPT_VAL()) .mode(SaveMode.Append); .save(“/path/on/dfs”)

19.// Deltastreamer command to ingest sqoop incrementals spark-submit \ --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer \ /path/to/hoodie-utilities-*-SNAPSHOT.jar` \ --props s3://path/to/dfs-source.properties \ --schemaprovider-class com.uber.hoodie.utilities.schema.FilebasedSchemaProvider \ --source-class com.uber.hoodie.utilities.sources.AvroDFSSource \ --source-ordering-field last_mod \ --target-base-path s3:///path/on/dfs --target-table uber.employees \ --op UPSERT // dfs-source-properties include=base.properties # Key generator props hoodie.datasource.write.recordkey.field=_userID hoodie.datasource.write.partitionpath.field=country # Schema provider props hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=s3:///path/to/users.avsc # DFS Source hoodie.deltastreamer.source.dfs.root=s3:///tmp/sqoop

20.

21.

22.// Deltastreamer command to ingest kafka events, dedupe, ingest spark-submit --class com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer \ /path/to/hoodie-utilities-*-SNAPSHOT.jar` \ --props s3://path/to/kafka-source.properties \ --schemaprovider-class com.uber.hoodie.utilities.schema.SchemaRegistryProvider \ --source-class com.uber.hoodie.utilities.sources.AvroKafkaSource \ --source-ordering-field time \ --target-base-path s3:///hoodie-deltastreamer/impressions --target-table uber.impressions \ --op BULK_INSERT --filter-dupes // kafka-source-properties include=base.properties # Key fields, for kafka example hoodie.datasource.write.recordkey.field=id hoodie.datasource.write.partitionpath.field=datestr # schema provider configs hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/v ersions/latest # Kafka Source hoodie.deltastreamer.source.kafka.topic=impressions #Kafka props metadata.broker.list=localhost:9092 auto.offset.reset=smallest schema.registry.url=http://localhost:8081

23.

24. New Data Unaffected Data changes Updated Data Source table ETL table A ETL table B

25.// Spark Datasource Import com.uber.hoodie.{DataSourceWriteOptions, DataSourceReadOptions}._ // Use Spark datasource to read avro Dataset<Row> hoodieIncViewDF = spark.read().format("com.uber.hoodie") .option(VIEW_TYPE_OPT_KEY(), VIEW_TYPE_INCREMENTAL_OPT_VAL()) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(),commitInstantFor8AM) .load(“s3://tables/raw_trips”); Dataset<Row> stdDF = standardize_fare(hoodieIncViewDF) // save it as a Hoodie dataset inputDataset.write.format(“com.uber.hoodie”) .option(HoodieWriteConfig.TABLE_NAME, “hoodie.std_trips”) .option(RECORDKEY_FIELD_OPT_KEY(), "id") .option(PARTITIONPATH_FIELD_OPT_KEY(),"datestr") .option(PRECOMBINE_FIELD_OPT_KEY(), "time") .option(OPERATION_OPT_KEY(), UPSERT_OPERATION_OPT_VAL()) .mode(SaveMode.Append); .save(“/path/on/dfs”)

26.

27.

28.. Questions?

user picture
由Apache Spark PMC & Committers发起。致力于发布与传播Apache Spark + AI技术,生态,最佳实践,前沿信息。

相关文档