Flink in Alibaba
New Flink API Stack For Unified Processing
新的Flink API栈
Flink Runtime Improvements
Future Plans



1.Runtime Improvements For Flink As A Unified Engine

2.Ø Flink in Alibaba Ø FLINK Ø New Flink API Stack For Unified Processing Ø Flink API Ø Flink Runtime Improvements Ø Flink Ø Future Plans Ø

3.• Flink in Alibaba

4.Flink In Alibaba Cluster State Events TPS 10K Nodes PetaBytes Trillions/PerDay 472M/Sec

5.Flink Computing Platform in Alibaba AOP Bahamut Dataphin Streaming … Recommendation Search BI FOAS Flink Open Api Service Table Sql DataStream Relational Relational Streaming Process Flink Runtime Distributed Streaming DataFlow Yarn Cluster Management HDFS

6.• New Flink API Stack For Unified Processing

7.Current Flink API Stack DataStream API DataSetAPI StreamTransform Operator Tree Table API & SQL Relational Batch Plan DataStream API DataSet API Streaming Process Batch Process StreamGraph Optimization Plan Runtime Distributed Streaming DataFlow Local Cluster Cloud Single JVM Standalone/Yarn EC2/GCE JobGraph StreamTask & OP BatchTask & Driver

8.Proposed New Flink API Stack OLD NEW Table API & SQL DataStream DataSet Table & SQL Relational Streaming Batch Relational DataStream API DataSet API DAG & StreamOperator Streaming Process Batch Process Unified Core API Runtime Runtime Distributed Streaming DataFlow Distributed Streaming DataFlow Local Cluster Cloud Local Cluster Cloud Single JVM Standalone/Yarn EC2/GCE Single JVM Standalone/Yarn EC2/GCE

9.Proposed New StreamOperator API Build Probe Input1 Input2 Notified when input is finished Notified when input is finished InputSelection endInput1() InputSelection endInput2() enum InputSelection new new Hash 1. BuildFrom(Input1) Join InputSeclection firstInputSelection() 2. ProbeFrom(Input2) new new Which Input side to read first ? Which Input side to process next ? void processElement1(StreamRecord) InputSelection processElement1(StreamRecord) void processElement2(StreamRecord) InputSelection processElement2(StreamRecord) old new

10.Ø Flink Runtime Improvements Ø Flink Ø Flink Architecture Ø Flink Ø Improvements For Job Scheduler Ø Ø Pluggable Shuffle Service Architecture Ø Ø Improvements For Task Execution Ø Ø Improvements For Job FailOver Ø

11.• Flink Architecture

12.Flink Architecture 2. Start Job 6. Schedule Task Task Task Dispatcher JobMaster Data Exechange Managers Managers 1. Submit Job 3. Request Slot 5. Launch TM 4. Allocate Container Client ResourceManager Cluster

13.• Job Scheduler

14.Current Schedule Mode

15.Pluggable Job Schedule Framework JobMaster DagEvent DAGManager Plugable ExecutionGraph ScheduleTask UpdateState TaskScheduler Task Report Deploy Task TaskManager TaskManager

16.• Pluggable Shuffle Service

17.Current Shuffle Service Worker Machines Worker Machines Finished TM TM TM TM Free Slot Free Slot Task Task Server Client Client Server Client Server Client Server JOB1 FileSystem FileSystem JOB2 No Resource Available

18.External Yarn Shuffle Service Finished Worker Machines Worker Machines TM TM TM TM Task Task Task Task JOB1 External External External External External External External External JOB2 Writer Reader Writer Reader Reader Writer Writer Reader YarnShuffle Service YarnShuffle Service FileSystem FileSystem

19.Hard to Extend New Shuffle Service Worker Machines TM TM Task Task External Writer External Reader External Writer External Reader ! ! YarnShuffle Service FileSystem

20.Pluggable Shuffle Architecture JobGraph JobMaster Deploy Task Deploy Task Worker Machines Worker Machines New YarnShuffleManager() TM TM TM TM Task Task Task Task ShuffleManager ShuffleManager ShuffleManager ShuffleManager create create create create create create create create New RDMAManager() Writer Reader Writer Reader Writer Reader Writer Reader FileSystem FileSystem

21.• Task Execution

22.StreamTask Based on OperatorTree After Chain Chain Policy Chain Chain JobVertex Operator Forward KeyBy

23.StreamTask Based on OperatorDag After Chain Chain Policy Chain Chain JobVertex Operator Forward KeyBy

24.• Job FailOver

25.Current JM FailOver - Restart All

26.JM FailOver - Take Over

27.• Failover Improvements

28.Failover Improvements • Some failures are non-recoverable, such as • DividebyZero where task can immediately . fail. ExecutionJobVertex ExecutionJobVertex

29.Failover Improvements • There are some machines with hardware • issues that we could detect and avoid to schedule to that machines. JobMaster Machine Machine TaskManager TaskManager Task Task Task Task ......

30.Failover Improvements • We could rerun upstream vertex to re- • generate data for missing task inputs..

31.Failover Improvements • Umbrella JIRA: FLINK-10288 • - Classify Exceptions to different categories for different strategies. [FLINK- 10289] - Enable Per-job level failover strategy. [FLINK-10572] - Support task revocation. [FLINK- 10573] - Pluggable failover strategy – can handle machine hardware issue

32.• Speculative Execution

33.Speculative Execution • Hardware problems such as accident • I/O busy or high CPU load can cause the running tasks to be long tail, which thus can slow down Job running time. 1 2 1 TaskRunTime 1 1 2 2 3 2 3 Subtask

34.Speculative Execution • We propose the speculative execution • to handle the problem whose basic idea is to run a copy of task on another machine, and take the output result of which finishes first. [FLINK-10644] 1 1 1 1 1’ 2 2’ 2 2 2 3 3

35.Speculative Execution • Workflow Original Execution - TaskExecutors collect and report running statistics. TaskEexcutor - JobMaster detects long tail task. - Schedule speculative execution data attempt. ExecutionGraph - Commit output data. Speculative Execution • TaskExecutor data

36.Speculative Execution • Collect running statistics • - TaskExecutor accumulates total input size and currently processed data size in InputGate across all InputChannel, and report to JobMaster by heartbeatPayload. - ExecutionGraph extends ExecutionVertex to require resource and maintain multiple Attempts.

37.Speculative Execution • Decide the long tail task - slow processing throughput defined as [依据这个公式来定义长尾task,即当前task的处理速度明显低于其他已结束task的平均速度] is average throughput of terminated tasks is configurable penalty factor (>=1)

38.Speculative Execution Speculation = true Speculation = false

39.• DAG Bubble Scheduling

40.DAG Bubble Scheduling [DAG ] • Motivation • - flexibility, efficiency, reliability. • • Prototype Design - divide the DAG into several sub-graphs according to edge cost function. - intra- pipeline, inter- blocking

41.DAG Bubble Scheduling [DAG ] • Prototype Test • - TPC-H Queries can run through under limited resource budget, while may hang using the LAZY mode. - For TPC-H Q9, 10G data, 50 slots(half resource budget of LAZY mode) will reach 70% performance of the LAZY mode.

42.DAG Bubble Scheduling [DAG ] • Bubble Generation

43.• Future Plans

44.Future Plans • Deployment • Unified Elastic Session For Flip6 • session • Job Schedule • Dynamic Update of JobGraph for Batch Job • • HotUpdate of JobGraph for Streaming Job • • Network Stack • More kinds of external Shuffle Service • • RDMA Shuffle • • Checkpoint & StateBackend • New kind of Heap-Based StateBackend • • Checkpoint for Batching Process •