基于streaming构建统一的数据处理引擎的挑战与实践 部分2

展开查看详情

1. 01 Dynamic Table ቘᦞचᏐғၞᤒ੒‫؍‬௔҅ۖாᤒ Theoretical Basis: Duality of Streams and Tables, Dynamic Table https://flink.apache.org/news/2017/04/04/dynamic-tables.html

2. 02 Improve Architecture Table API & SQL Table API & SQL Relational Relational DataStream API DataSet API Query Processor Stream Processing Batch Processing Query Optimizer & Query Executor Runtime Runtime Distributed Streaming Dataflow DAG API & Stream Operators Local Cluster Cloud Local Cluster Cloud Single JVM Standalone, YARN GCE, EC2 Single JVM Standalone, YARN GCE, EC2

3. 02.1 Unified Operator Framework StreamGraph Unified Operator Abstraction l Driver => StreamOperator Unified l Operators can choose inputs Operator l Flexible Chaining @dev [DISCUSS] Unified Core API for Streaming and Batch: https://goo.gl/CvfKuZ

4. 02 Improve Architecture Table API & SQL Table API & SQL Relational Relational DataStream API DataSet API Query Processor Stream Processing Batch Processing Query Optimizer & Query Executor Runtime Runtime Distributed Streaming Dataflow DAG API & Stream Operators Local Cluster Cloud Local Cluster Cloud Single JVM Standalone, YARN GCE, EC2 Single JVM Standalone, YARN GCE, EC2

5. 02.2 Unified Query Processing SQL Execution Logical Plan Optimizer Physical Plan & Table API DAG ਠ‫ق‬Ӟ໏ य़᮱‫وړ‬አ ᮱‫وړ‬አ totally same most shared some shared

6. 03 Unification of the Optimizer Batch Most shared Stream specific 80+ rules specific Sort related rules State related rules

7. 03.1 push Aggregate past Join 6 hours -> 14min (25x) in stream Simplified TPCH13 Before Optimization After Optimization SELECT Agg Calc c.c_custkey, COUNT(o.o_orderkey) Calc Join FROM customer c LEFT JOIN orders o 150million 100million Join Customer Agg ON c.c_custkey = o.o_custkey GROUP BY c.c_custkey 150million 1.5billion 1.5billion Customer Orders Orders ၞӨಢ᮷ᭇአԭᬯӻս۸ᥢ‫ڞ‬ Batch and Stream both use this rule .

8. 04 Unification of Basic Data Structure Old row format: Row n Java ੒᨝ጱᑮᳵ୏ᲀṛ High space overhead of Java objects Integer(321) Row n Ԇᔄࣳጱᤰᓟ޾ೆᓟ୏ᲀ Object[] String(“awesome”) Boxing & Unboxing for primitive types String(“flink”) n ෼ᩃጱ hashCode() ޾ ‫ ݍ‬ଧ‫ڜ‬۸ Expensive hashCode() & (de)serialization

9. 04 Unification of Basic Data Structure New row format: BinaryRow n ᭿‫ع‬ԧஉग़‫ݍ‬ଧ‫ڜ‬۸୏ᲀ ӧՐࣁಢ॒ቘӾᤒሿ‫ڊ‬ᜋ҅ Reduce lots of deserialization cost ࣁၞ॒ቘӾԞත឴ԧӞ‫׭‬ጱ൉‫܋‬ n Өٖਂᓕቘᔲੂᕮ‫ݳ‬ Tight integration with memory management Not only worked perfectly in batch, but also increase 1X throughput in streaming cases n CPU ᖨਂ‫݋‬অ CPU Cache Friendly offset to var length data null bits 0x000.. 321 32L 7 39L 5 “awesome” “flink” Fixed length part Variable length part

10. 05 Sharing of Runtime Implementation TableSource Correlate TableSink DimensionTable CodeGeneration Calc Memory Management ... n ᖌᤒ‫ى‬ᘶ n Micro-Batchጱٖਂᓕቘ Join a Dimension Table Memory Management of Micro Batch

11. 05.1 Join a Dimension Table Products Dimension Table 2 12 5 1 21 21 21 4 3 ಢፗള॔አԧၞጱਫሿ҅୮ᖌᤒ᩻य़෸ֵአ lookup ᥝྲ scan ๅ‫ښ‬ᓒ Batch reuses this operator of Streaming which benefits a lot when dimension table is huge

12. 05.2 Micro-Batch for Streaming microbatch event (implemented as watermark) time when event occurred trigger process ၞፗള॔አԧಢጱԫᬰ‫ګ‬ฉ੘ᤒਫሿ Compact binary hash map is reused between stream and batch ‫ݺރ ׭‬൉‫ֵࣁ܋‬አ RocksDB Statebackend ጱၞ࣋ว Compact binary hash map 10X throughput improved in most streaming case using RocksDB statebackend AggregateOpeartor

13. Agenda     Why What How Achievement Future

14. Achievement TPC-H Results for Batch (lower is better) 600 500 400 300 200 100 0 q1 q2 q3 q4 q5 q6 q7 q8 q9 q10 q11 q12 q13 q14 q15 q16 q17 q18 q19 q20 q21 q22 Blink Flink 1.6.0

15. Achievement 01 ಢጱ௔ᚆࣁ )OLQN ᐒ‫܄‬ᇇ๜Ӥ൉‫׭ ܋‬ Outperform Flink By 10X in Batch 02 ၞԞ౮‫ۑ‬ධ‫ظ‬ԧ 73&+҅ӱٖḒֺ Streaming also successfully conquers TPC-H 03 ॠሞ‫҅݌‬ၞᦇᓒશ꧊ᬡ Պ๵ᑁ Streaming process exceeded 1.7 billion records/sec, in 2018 Alibaba Global Shopping Festival

16. Future Plan 01 ୏რ Open Source 02 ၞӨಢጱᣟ‫҅ݳ‬ᬟኴཛྷᔡ۸ Hybrid of Stream and Batch