当Apache Spark遇见TIDB

在过去的10年中,大数据存储层主要集中在分析用例上。当涉及到分析用例时,用户通常将数据放到到Hadoop集群上,并对HDFS文件执行查询。人们努力处理只附加存储和维护脆弱ETL管道的修改。
展开查看详情

1.When Apache Spark meets TiDB => TiSpark maxiaoyu@pingcap.com

2.Who am I ● Shawn Ma@PingCAP ● Tech Lead of OLAP Team ● Working on OLAP related products and features ● Previously tech lead of Big Data infra team@Netease ● Focus on SQL on Hadoop and Big Data related stuff

3.Agenda ● A little bit about TiDB / TiKV ● What is TiSpark ● Architecture ● Benefit ● What’s Next

4.What’s TiDB ● Open source distributed RDBMS ● Inspired by Google Spanner ● Horizontal Scalability ● ACID Transaction ● High Availability ● Auto-Failover ● SQL at scale ● Widely used in different industries, including Internet, Gaming, Banking, Finance, Manufacture and so on (200+ users)

5.A little bit about TiDB and TiKV Stateless SQL Layer Metadata / Timestamp request TiDB ... TiDB ... TiDB gRPC Placement gRPC Driver (PD) ... ... ... TiKV TiKV TiKV TiKV Control flow: Balance / Failover gRPC Raft Raft Raft Distributed Storage Layer

6.TiKV: The whole picture Client Placement Driver RPC RPC RPC RPC PD 1 Store 1 Store 2 Store 3 Store 4 PD 2 Region 1 Region 1 Region 2 Region 1 Region 2 PD 3 Region 3 Region 2 Region 5 Region 5 Raft Region 5 Region 4 Region 3 Group Region 4 Region 4 Region 3 TiKV node 1 TiKV node 2 TiKV node 3 TiKV node 4 TiKV is powered by RocksDB

7.What is TiSpark ● TiSpark = Spark SQL on TiKV ○ Spark SQL directly on top of a distributed Database Storage ● Hybrid Transactional/Analytical Processing (HTAP) rocks ○ Provide strong OLAP capacity together with TiDB

8.What is TiSpark ● Complex Calculation Pushdown ● Key Range pruning ● Index support ○ Clustered index / Non-Clustered index ○ Index Only Query ● Cost Based Optimization ○ Histogram ○ Pick up right Access Path

9.Architecture Spark Driver gRPC Placement TiSpark Driver (PD) retrieve data location Spark Exec Spark Exec Spark Exec TiSpark TiSpark TiSpark gRPC retrieve real data from TiKV TiKV TiKV TiKV TiKV TiKV Distributed Storage Layer

10.Architecture ● On Spark Driver ○ Translate metadata from TiDB into Spark meta info ○ Transform Spark SQL logical plan, pick up elements to be leverage by storage (TiKV) and rewrite the plan ○ Locate Data based on Region info from Placement Driver and split partitions; ● On Spark Executor ○ Encode Spark SQL plan into TiKV’s coprocessor request ○ Decode TiKV / Coprocessor result and transform result into Spark SQL Rows

11.How everything made possible ● Extension points for Spark SQL Internal ● Extra Strategies allow us to inject our own physical executor and that’s what we leveraged for TiSpark ● Trying best to keep Spark Internal untouched to avoid compatibility issue

12.How everything made possible ● A fat java client module, paying the price of bypassing TiDB ○ Parsing Schema, Type system, encoding / decoding, coprocessor ○ Almost full featured TiKV client (without write support for now) ○ Predicates / Index - Key Range related logic ○ Aggregates pushdown related ○ Limit, Order, Stats related ● A thin layer inside Spark SQL ○ TiStrategy for Spark SQL plan transformation ○ And other utilities for mapping things from Spark SQL to TiKV client library ○ Physical Operators like IndexScan ○ Thin enough for not bothering much of compatibility with Spark SQL

13.Too Abstract? Let’s get concrete select class, avg(score) from student WHERE school = ‘engineering’ and lottery(name) = ‘picked’ and studentId >= 8000 and studentId < 10100 group by class ; ● Above is a table on TiDB named student ● Clustered index on StudentId and a secondary index on School column ● Lottery is an Spark SQL UDF which pick up a name and output ‘picked’ if RNG decided so

14.Predicates Processing WHERE school = ‘engineering’ and lottery(name) = ‘picked’ and studentId >= 8000 and studentId < 10100 Predicates are converted into key ranges based on indexes StudentId >= 8000 StudentId < 10100 Key Range: [8000, 10100) School = ‘engineering’ school = ‘engineering’ Lottery(name) = ‘picked’ Construct Tasks 1. Append remaining predicates if supported by Spark Task 1 Spark Task 2 coprocessor Region2 Region3 2. Push back whatever needs to be computed by Spark [8000, 10000) [10000, 10100) SQL, e.g. UDFs, prefix index predicates COP Request COP Request 3. Cut them into tasks according to Region/Range 4. Encode into coprocessor request gRPC via Spark worker Region 1 Region 2 Region 3 StudentId StudentId StudentId [0-5000) [5000-10000) [10000-15000)

15. Index Scan WHERE school = ‘engineering’ and lottery(name) = ‘picked’ and (studentId >= 8000 and studentId < 10100) ● Secondary Index is encode as key-value pair ○ Key is comparable bytes format of Executor Executor all index keys in defined order ○ Value is the row ID pointing to table row data 1,2,3,4,5,7,8,10,88 Sort and cut row keys into ranges according to [1,5) ● Reading data via Secondary Index usually Batch Scan for index according 5,7,9 10 88 Key range in region requires a double read. to predicates range ○ First, read secondary index in range just like reading primary keys in previous slide. Index Data for student_school Row Data for student ○ Shuffle Row IDs according to region ○ Sort all row IDs retrieved and combine them into ranges if possible ○ Encoding row IDs into row keys for the table TiKV Region Data TiKV Region Data TiKV Region Data ○ Send those mini requests in batch concurrently ● Optimize away second read operation ○ If all required column covered by index itself already

16.Index Selection WHERE school = ‘engineering’ and lottery(name) = ‘picked’ and (studentId >= 8000 and studentId < 10100) or studentId in (10323, 10327) Clustered Index on StudentID + 1k Rows predicates related 1K * Clustered Index StudentId matched Access Cost < Secondary Index on 800 * Secondary School + 800 Rows Index Access Cost predicates related School matched Histogram ● If the columns referred are all covered by index, then instead of retrieving actual rows, we apply index only query and cost function will be different ● If histogram not exists, TiSpark using pseudo selection logic.

17.Aggregates Processing select class, avg(score) from student ……. group by class ; Spark SQL plan received in TiStrategy AVG are rewritten into SUM and COUNT SUM(score) / COUNT(score) Group BY class AVG(score) Group BY class Construct Schema Transformation Rules Reduce Task 1 Reduce Task 2 TiDB has totally different type system and infer rules Spark Schema by its own type infer rules [SUM, COUNT, class] Map Task 1 Map Task 2 TiKV Schema to Spark Schema [groupBy keys as bytes, SUM as Decimal, COUNT as BigInt ] gRPC via Spark worker ● After coprocessor preprocessing, Region 1 Region 2 Region 3 TiSpark still rely on normal Spark StudentId StudentId StudentId aggregation strategy [0-5000) [5000-10000) [10000-15000)

18.Benefit ● Analytical / Transactional support all on one platform ○ No need for ETL and query data in real-time ○ High throughput and consistent snapshot read from database ○ Simplify your platform and reduce maintenance cost ● Embrace Apache Spark and its eco-system ○ Support of complex transformation and analytics beyond SQL ○ Cooperate with other projects in eco-system (like Apache Zeppelin) ○ Apache Spark bridges your data sources

19.Ease of Use ● Working on your existing Spark Cluster ○ Just a single jar like other Spark connector ● Workable as standalone application, spark-shell, thrift-server, pyspark and R ● Work just like another data source val ti = new org.apache.spark.sql.TiContext(spark) // Map all TiDB tables from database tpch as Spark SQL tables ti.tidbMapDatabase("sampleDB") spark.sql("select count(*) from sampleTable").show

20.What’s Next ● Batch Write Support (writing directly as TiKV native format) ● JSON Type support (since TiDB already supported) ● Partition Table support (both Range and Hash) ● Join optimization based on range and partition table ● (Maybe) Join Reorder with TiDB’s own Histogram ● Another separate columnar storage project using Spark as its execution engine (not released yet)

21.Thanks! Contact me: maxiaoyu@pingcap.com www.pingcap.com https://github.com/pingcap/tispark https://github.com/pingcap/tidb https://github.com/pingcap/tikv