Optimizations in Blink Runtime for Global Shopping Festival at A

去年双11一天,阿里巴巴购物节销售额高达253亿美金,在这一天,阿里巴巴实时计算引擎Blink(阿里巴巴内部的Flink)在峰值一秒中处理了4.72亿条数据记录,本PPT将揭示这里面涉及的关键优化包括:信任网络栈,动态负载均衡,超大规模作业的checkpointing优化等等。
展开查看详情

1. Optimizations in Blink Runtime for Global Shopping Festival at Alibaba Feng Wang @ Alibaba Senior Staff Engineer April, 2018

2.Agenda

3.Alibaba Global Shopping Festival Part 1

4.About Alibaba Global Shopping Festival ü The History of Alibaba Global Shopping Festival • Began in 2009 as an event for merchants and consumers in order to raise awareness of the value in online shopping • Nov. 11th (Double-Eleven) every year ü The Economy Scale of Alibaba Global Shopping Festival Last Year • More than 140,000 participating brands and merchants across 225 countries and regions • Total GMV settled through Alipay was US$25.3 billion (RMB168.2 billion) • Alibaba Cloud processed 325,000 orders per second at peak • Alipay processed 1.48 billion total payment transactions ! - -- -

5.Alibaba Blink Part 2

6.Blink : Alibaba’s Flink Version Blink 20 6 + + 27 27 71 + 2 6 + 216 762

7.Blink Computing Platform Data Application Search Recommendation Ads BI Security Monitor Developing Platform Bayes High-Level API Table API SQL Core API DataStream API DataSet API Runtime Distributed Streaming Dataflow Hadoop Cluster YARN (Resource Management) HDFS (FileSystem)

8.Blink Architecture 6A . Hadoop YARN 9. 05 .99 0.A2 0 A.6 2 9. 05 YARN AM TaskManager TaskManager YARN 246 A2 Resource Manager 1.A. 2 2 A 9 A tasks A . 32 tasks 6A 7 Bayes Dispatcher . 2 2 A 9 A 332 9 A JobManager StateBackend StateBackend 6A A. 8 6 0 2 2 A.9 05208 6 A6 4 Hadoop HDFS

9.Blink at Alibaba Global Shopping Festival Search Rec Ads BI Security The Largest Job • thousands of subtasks // 2 4 24 2 • tens of TBs state / • Thousands of Jobs / 4 2 7 / 24 • >5k Nodes 4 • >500k CPU cores

10.Real time GMV at Alibaba Global Shopping Festival , , - , - , 2 ,

11.Optimizations in Blink Runtime Part 3

12.Runtime Optimizations ü Network Stack • Credit-Based Flow Control • Barrier Align Without Spilling to Disk ü Load Balance • Self-Adaptive Rebalance Shuffle • Dynamic Balance for KeyedStream ü Incremental Checkpointing • Defragmentation for Frequent Checkpointing

13.Flow-Control Problem TaskManager TaskManager SubTask1 SubTask3 Backpressure Local bufferpool RecordWriter (all channels blocked) RecordReader Result SubPartitions Input Channels 3 TCP Channel 1 4 2 Netty Netty Server Client SubTask2 SubTask4 buffers blocked RecordWriter on the wire RecordReader Result SubPartitions (a waste of mem) Input Channels 3 1 4 2

14.Credit-Based Flow-Control TaskManager TaskManager Upstream Subtask Downstream Subtask : 7 Local bufferpool Netty Netty Server Client 2 available buffers Result Sub-partition 2 for receiving (credit) no buffers blocked on the wire result buffers to be sent (backlog) 9 Input Channel 9 8 :-- 7 : 2 .8 / 8 88: 8 / 7. 7 7 8 &

15.Barrier Alignment with Spilling Data to Disk 1.Unnecessary Disk I/O Cost a a b c c i replay b f h spill i h spill emit barrier h e g b g d f g f c e f e b d e d a c d Operator 3 2 1 Operator a 3 2 1 barrier Operator x y Operator 2 1 4 4 1 3 2 5 5 4 6 6 3 5 7 6 7 8 4 7 8 9 5 8 9 1. begin aligning 2. aligning 3. aligned 4. consume spilled data 2. Slow Alignment, Slow Checkpointing

16.Barrier Alignment Without Spilling (based on Credit-based) f h i e g blocked (no credit) h emit barrier d f g c e f b d e a c d barrier Operator x y Operator 22 11 Operator 3 2 1 1 3 4 2 5 3 4 5 6 6 7 4 7 8 5 8 9 1. begin aligning 2. aligning 3. aligned

17.Online Job Example 256x KeyBy KeyBy 128x 64x 128x Custom PV Log HBase KeyBy 256x 256x 64x 256x Custom Queue Transaction Log KeyBy

18.Throughputs with Credit-based Flow Control

19.Barrier Alignment without Spilling

20.Runtime Optimizations ü Network Stack • Credit-Based Flow Control • Barrier Align Without Spilling to Disk ü Load Balance • Self-Adaptive Rebalance Shuffle • Dynamic Balance for KeyedStream ü Incremental Checkpointing • Defragmentation for Frequent Checkpointing

21.Problem of Round-Robin Rebalance Shuffle processing costs vary between different records subtasks may be under different workload Be blocked subtask with heaviest workload will backpressure firstly

22.What is the Best policy of Rebalance Shuffle? no upstream subtask would be blocked If there is downstream subtask which is able to receive and process data

23.Self-adaptive Rebalance downstream subtask1 Input Channel upstream subtask credit=2 Subpartition Queues downstream subtask2 Input Channel backlog=4 RecordWriter credit=1 under load, more credit backlog=6 downstream subtask3 Input Channel the min backlog size backlog=1 credit=3 less backlog waiting in queue

24.Throughputs of Self-adaptive Rebalance Shuffle 2304x 4480x Rebalance HBase Search Engine

25.Data Skew in KeyedStream TM Downstream Subask1 kg0 kg3 kg1 kg2 kg4 TM TM Upstream Subask Downstream Subtask2 kg3 kg6 kg4 kg7 What we expect is? kg5 TM Downstream Subtask3 kg6 kg7 kg8

26.Dynamic Rebalance for KeyedStream TM Downstream Subask1 kg0 kg0 kg3 kg1 kg1 kg2 kg2 kg4 TM JM TM 1. query tps for all keygroups Upstream Subtask Downstream Subtask2 2. create new kg distribution plan KeyGroup kg3 kg6 kg4 kg4 kg7 3. trigger a checkpoint Partitioner kg5 TM 4. switch to new kg partitioner Downstream Subtask3 after barrier align kg6 kg6 kg7 kg7 kg8 kg8 5. reload new state partition after barrier align kg8

27.Rebalance Algorithm Split an array of N elements into K parts: • N: Keygroups Count • K: Subtasks Count • Element Value: TPS of every Keygroup KG1-TPS KG2-TPS KG3-TPS …… …… …… …… …… …… KGn-TPS Dynamic Programming time complexity : O(N2*K)

28.Rebalance Algorithm • N=100,000 (keygroups) O(N2*K) = 10k * 1G (CPU Speed) • K=1000 (subtasks) complexity is too large to be processed sub array sub array sub array sub array n<4096

29.File Fragmentation caused by Incremental Checkpointing Checkpoint1 Checkpoint2 Checkpoint3 TaskManager more frequent (Rocksdb Statebackend) 1.sst 2.sst 3.sst 3.sst 4.sst 5.sst 5.sst 6.sst 7.sst HDFS 3.sst ref:2 4.sst 5.sst ref:2 7.sst ref:1 more small files 1.sst 2.sst ref:1 ref:1 6.sst ref:1 ref:1 2MB 2MB 5MB 8MB 1MB 4MB 1MB JobManager (Checkpoint Coordinator) Completed Checkpoint1 Completed Checkpoint2 Completed Checkpoint3