EBay - MPP迁移至Spark

展开查看详情

1.,

2.About US • DSS(Data Services & Solutions) team in eBay. • Focus on Big data development and optimization on MPP RDBMS, Spark, Hadoop systems, data modeling and data services. • Now , more time on the migration from MPP to Spark.

3.Talks from our team • Experience Of Optimizing Spark SQL When Migrating from MPP Database , Yucai Yu & Yuming Wang, Spark Summit 2018, London. • Analytical DBMS to Apache Spark Auto Migration Framework Edward Zhang, Spark Summit 2018, London. 3

4.Agenda Background Use Cases and Best Practices Auto Migration Deep Dive 4

5.Why Migrate to Spark MORE COMPLEX BIG STREAMING, GRAPH MACHINE LEARNING EXTREME DATA PROCESSING COMPUTATION, USE CASES PERFORMANCE NEEDS OPTIMIZATION NEED 5

6.Spark as DW Processing Engine DS Model (Data Science) RT Data Service Batch Service Metadata Service DW Knowledge Metadata Integrated Data Layer Graph (Data Warehouse) ODS Layer DI ZETA (Data Infrastructure) Compute/Storage 6

7.Spark Cluster Environment Spark Hadoop Hive 2.1.0/2.3.1 2.7.1 1.2.1 1900 460TB Nodes Memory 7

8.Agenda Background Use Cases and Best Practices Auto Migration Deep Dive 8

9. Table Schema Translation SQL Conversion Migration Historical Data Copy Steps SQL run on Yarn cluster Overview Post Data Quality Check Logging and Error Parsing 9

10.Table Schema Translation Single Partitioned Table Is Not Enough Column Name Is Case Sensitive Column Type Mapping Tips 10

11.Single Partitioned Table Is Not Enough Ø “Cannot overwrite a path that is also being read from.” regardless of different partitions. See SPARK-18107. Instead , create 2 tables : TableX & TableX_Merge. TableX_Merge is partitioned table and keeps daily snapshots and TableX point to the latest snapshot for downstream reference. 11

12.Table Design Sample CREATE TABLE Table_X_Merge( CREATE TABLE Table_X ( … ….. dt string ) ) USING parquet USING parquet OPTIONS ( OPTIONS ( path path 'hdfs://hercules/table_x/snapshot/ 'hdfs://hercules/table_x/snapshot/' dt=20190311’ ) ) ---point latest partition PARTITIONED BY (dt) 12

13.Column Name Is Case Sensitive Ø Lowercase the column name. For Hive/Spark Parquet file interoperation , otherwise you may see “NULL” fields, wrong result or errors . (SPARK-25132) 13

14.Spark 2.3.1 returns wrong result silently. 14

15.Spark 2.1.0 throw error : “Caused by: java.lang.IllegalArgumentException: Column [id] was not found in schema!” 15

16.Column Type Mapping Tips Ø Decimal typed integer map to Integer For Parquet filter push down to accelerate file scan. 16

17.Sample For Parquet filter push down to accelerate file scan.(SPARK-24549 ) 17

18.Query Improvements – Predicate Pushdown [SPARK-25419] Improvement parquet predicate pushdown • [SPARK-23727] Support Date type • [SPARK-24549] Support Decimal type • [SPARK-24718] Support Timestamp type • [SPARK-24706] Support Byte type and Short type • [SPARK-24638] Support StringStartsWith predicate • [SPARK-17091] Support IN predicate DATA SERVICES AND SOLUTIONS

19.SQL Conversion Update & Delete Conversion Insert Conversion Number Expression String Expression Recursive Query Conversion 19

20.SQL Conversion- Update/Delete Spark-SQL does not support update/delete yet. Transform the update/delete to insert or insert overwrite. 20

21.MPP Use case update tgt from database.tableX tgt, database.Delta ods set AUCT_END_DT = ods.AUCT_END_DT where tgt.LSTG_ID = ods.LSTG_ID ; insert into database.tableX( LSTG_ID,AUCT_END_DT) select LSTG_ID ,AUCT_END_DT from database.Delta ods Yesterday left outer join database.tableX tgt Full Data Delta on tgt.LSTG_ID = ods.LSTG_ID where tgt.LSTG_ID is null; 21

22.Spark-SQL sample insert overwrite table TableX_merge partition(dt='20190312') select coalesce(tgt.LSTG_ID,ods.LSTG_ID) as LSTG_ID ,IF(ods.LSTG_ID is not null, ods.AUCT_END_DT,tgt.AUCT_END_DT) as AUCT_END_DT from TableX as tgt full outer join Delta ods on tgt.LSTG_ID = ods.LSTG_ID ; alter table TableX set location ‘xxxx/dt=20190312’; 22

23.SQL Conversion- Insert Ø MPP DB will implicitly dedupe data when insert into SET table(the default case for new tables). Then, for such case, a “group by” or “distinct” is necessary. 23

24.MPP Use case (TableY is defined a SET table ) insert into TableY( LSTG_ID,AUCT_END_DT) select LSTG_ID ,AUCT_END_DT from ods_tableY tgt 24

25.Spark-SQL sample insert overwrite table TableY_merge partition(dt='20190312') select distinct * from ( select LSTG_ID, AUCT_END_DT FROM TableY tgt UNION ALL select LSTG_ID, AUCT_END_DT FROM ODS_TableY) tmp; 25

26.SQL Conversion – Number Expression Ø Rounding behavior MPP DW round with “HALF_EVEN” rule by default, but Spark-SQL use “HAFL_UP”. Use bround to replace direct ”cast” 26

27.MPP Sample select cast(2.5 as decimal(4,0)) as result; 2. select cast(3.5 as decimal(4,0)) as result; 4. 27

28.Spark-SQL Result spark-sql> select cast(2.5 as decimal(4,0)) 3 spark-sql> select bround(2.5,0) as col1; 2 28

29.SQL Conversion – Number Expression Ø Number division result MPP return closest Integer for Integer division, while Spark always return double . Explicit cast division result to integer. 29

腾讯云Hadoop + Spark生态技术开放日,围绕Hadoop和Spark等大数据开源技术实践和行业应用,持续为企业和开发者们,带来丰富的实战技术干货,以及提供一个开源技术交流的专有平台。
关注他