Apache Flink - CERN Indico
1 .On Declarative Data Analysis and Data Independence in the Big Data Era Volker Markl http://www.user.tu-berlin.de/marklv/
2 .ML ML algorithms algorithms data too uncertain V eracity Data Mining MATLAB, R, Python Predictive/Prescriptive MATLAB, R, Python Data & Analysis: Increasingly Complex! data volume too large V olume data rate too fast V elocity data too heterogeneous V ariability Data Reporting aggregation , selection Ad-Hoc Queries SQL, XQuery ETL/ELT MapReduce Analysis DM DM scalability scalability
3 .„Big Data“ vs. „Traditional Data Management“ predictive , probabilistic , prescriptive sampling and storage vs. curation quantity vs. quality statistics vs. logic syntax vs. semantics
4 .Application Scalable Data Management Machine Learning, Statistics, Data Analysis Data Science Control Flow Iterative Algorithms Error Estimation Active Sampling Sketches Curse of Dimensionality Decoupling Convergence Monte Carlo Mathematical Programming Linear Algebra Stochastic Gradient Descent Regression Statistics Hashing Parallelization Query Optimization Fault Tolerance Relational Algebra / SQL Scalability Data Analysis Language Compiler Memory Management Memory Hierarchy Data Flow Hardware Adaptation Indexing Resource Management NF 2 / XQuery Data Warehouse/OLAP ML DM “Data Scientist” – “Jack of All Trades!” Domain Expertise (e.g., Industry 4.0, Medicine , Physics , Engineering, Energy , Logistics) Real-Time
5 .Big Data Analytics Requires Systems Programming R/ Matlab : 3 million users Hadoop : 100,000 users Data Analysis Statistics Algebra Optimization Machine Learning NLP Signal Processing Image Analysis Audio-,Video Analysis Information Integration Information Extraction Data Value Chain Data Analysis Process Predictive Analytics Indexing Parallelization Communication Memory Management Query Optimization Efficient Algorithms Resource Management Fault Tolerance Numerical Stability Big Data is now where database systems were in the 70s ( prior to relational algebra , query optimization and a SQL-standard)! “ We will soon have a huge skills shortage for data- related jobs .“ Neelie Kroes (ICT 2013, Nov. 7, Vilnius) “Big Data‘s Big Problem: Little Talent“ Wall Street Journal People with Big Data Analytics Skills Declarative languages to the rescue!
} } } } catch (IOException e) { log.info("Exception occurred in loading clusters:", e); throw new RuntimeException(e); } } } 486 lines of code long development time non-robust runtime „How“ Declarative data analysis program with automatic optimization, parallelization and hardware adaption object RunKMeans { def main(args: Array[String]) { val km = new KMeans if (args.size < 5) { println(km.getDescription) return } val plan = km.getScalaPlan(args(0).toInt, args(1), args(2), args(3), args(4).toInt) LocalExecutor.execute(plan) System.exit(0) } } class KMeans extends PlanAssembler with PlanAssemblerDescription with Serializable { override def getDescription() = { "Parameters: [numSubStasks] [dataPoints] [clusterCenters] [output] [numIterations]" } override def getPlan(args: String*) = { getScalaPlan(args(0).toInt, args(1), args(2), args(3), args(4).toInt) } case class Point(x: Double, y: Double, z: Double) { def computeEuclidianDistance(other: Point) = other match { case Point(x2, y2, z2) => math.sqrt(math.pow(x - x2, 2) + math.pow(y - y2, 2) + math.pow(z - z2, 2)) } } case class Distance(dataPoint: Point, clusterId: Int, distance: Double) def asPointSum = (pid: Int, dist: Distance) => dist.clusterId -> PointSum(1, dist.dataPoint) def sumPointSums = (dataPoints: Iterator[(Int, PointSum)]) => { dataPoints.reduce { (z, v) => z.copy(_2 = z._2 + v._2) } } case class PointSum(count: Int, pointSum: Point) { def +(that: PointSum) = that match { case PointSum(c, Point(x, y, z)) => PointSum(count + c, Point(x + pointSum.x, y + pointSum.y, z + pointSum.z)) } def toPoint() = Point(round(pointSum.x / count), round(pointSum.y / count), round(pointSum.z / count)) private def round(d: Double) = math.round(d * 100.0) / 100.0; 486 lines of code
long development time
non-robust runtime

„How"

Declarative data analysis program with automatic optimization, parallelization and hardware adaption
7 .X = Big Data Analytics – System Programming! („What“, not „How“) Description of „How“? (State of the art in scalable data analysis) Hadoop , MPI Larger human base of „data scientists“ Reduction of „human“ latencies Cost reduction Description of „What“? (declarative specification) Technology X Data Analyst Machine
8 .Deep Analysis of „Big Data“ is Key! Small Data Big Data (3V) Deep Analytics Simple Analysis
9 .Declarativity Query optimization Robust out-of-core Scalability User-defined functions Complex data types Schema on read Iterations Advanced Dataflows General APIs 9 Draws on D atabase Technology Draws on MapReduce T echnology Add Apache Flink: General Purpose P rogramming + Database E xecution
10 .Introducing Apache Flink – A Success Story from Berlin Project started under the name “Stratosphere” late 2008 as a DFG funded research unit, lead by TU Berlin, in collaboration with HU Berlin, and the Hasso Plattner Institute Potsdam. Apache Open Source Incubation since April 2014, Apache Top Level Project since December 2014 Fast growing community of open source users and developers in Europe and worldwide, in academia (e.g., SICS/KTH, INRIA, ELTE) and companies (e.g., Researchgate , Spotify, Amadeus) More information : http://flink.apache.org
11 .Rich S et of Operators Reduce Join Map Reduce Map Iterate Source Sink Source Map Iterate Project Reduce Delta Iterate Aggregate Join Filter Distinct CoGroup FlatMap Vertex Update Union GroupReduce Accumulators Alexandrov et al.: “The Stratosphere Platform for Big Data Analytics,” VLDB Journal 5/2014
12 .Built-in vs. driver-based looping Step Step Step Step Step Client Step Step Step Step Step Client map join red. join Loop outside the system, in driver program Iterative program looks like many independent jobs Dataflows with feedback edges System is iteration-aware, can optimize the job Flink
13 .Effect of optimization 13 Run on a sample on the laptop Run a month later after the data evolved Hash vs. Sort Partition vs. Broadcast Caching Reusing partition/sort Execution Plan A Execution Plan B Run on large files on the cluster Execution Plan C
14 .Why optimization ? Do you want to hand-tune that?
15 .Optimizing iterative programs Caching Loop-invariant Data Pushing work „out of the loop“ Maintain state as index
16 .Streaming Flink execution engine is pipelined (streaming) c an implement true streaming & batch Spark’s execution engine materializes intermediate results (batch) c an only do micro-batch Batch Micro-Batch Streaming
17 .Some Performance Numbers Flink Spark Spark no Cache Spark mllib Dataset Execution Time (sec) 320,000 640,000 1,280,000 0 200 40 0 6 00 8 00 10 00 TPCH 3 K-means ( K=800, D=1000) Execution Time (sec) 50G 100G 200G Dataset 0 Flink Spark 50 10 0 150 200 flickr dbpedia twitter Dataset Execution Time (sec) 0 500 1000 1500 2000 Flink delta Flink bulk Spark Spark graphx Connected Components Cluster Size Flink Spark Execution Time (sec) 10 20 40 0 10 0 2 0 0 3 00 4 00 500 6 00 7 00 5G 8G 11G 14G Memory Size Execution Time (sec) 0 200 40 0 6 00 8 00 10 00 5G 8G 11G 14G 0 5 0 0 100 0 150 0 20 00 250 00 30 00 Memory Size Execution Time (sec)
18 .Evolution of Big Data Platforms 4 G 3G 2 G 1G Relational Databases Hadoop Flink Scale-out, Map/Reduce, UDFs Spark In-memory Performance and Improved P rogramming M odel In-memory + Out of Core P erformance , Declarativity, O ptimisation , Iterative A lgorithms , Streaming/Lambda
19 .flink.apache.org Contributors wanted http://flink.apache.org
20 .Data Flow Flink Program Program C ompiler Runtime Hash- and sort-based out-of-core operator implementations , memory management Flink O ptimizer Picks data shipping and local strategies, operator order Execution Plan Job Graph Execution Graph Parallel Runtime Task scheduling, network data transfers, resource allocation
21 .Overview Paradigm MapReduce Iterative Data Flows Distributed Collections (RDD) Runtime Batch Parallel Sort Streaming In-memory & Out of Core Batch Processing in Memory Compilation/ Optimization None Holistic Planning for Data Exchange, Sort/Hash, Caching, ... None Flink 21
22 .Data Model Flink vs. Spark Arbitrary Java Objects Arbitrary Java Objects Tuples as First Class Citizens Key/Value Pairs as First Class Citizens Joins / Grouping via Field References (tuple position, selector-function, field-name Joins / Grouping via Key/Value Pairs Flink 22