Apache Flink - CERN Indico

Introducing Apache Flink – A Success Story from Berlin. Project started under the name “Stratosphere” late 2008 as a DFG funded research unit, lead by TU ...
展开查看详情

1.On Declarative Data Analysis and Data Independence in the Big Data Era Volker Markl http://www.user.tu-berlin.de/marklv/

2.ML ML algorithms algorithms data too uncertain V eracity Data Mining MATLAB, R, Python Predictive/Prescriptive MATLAB, R, Python Data & Analysis: Increasingly Complex! data volume too large V olume data rate too fast V elocity data too heterogeneous V ariability Data Reporting aggregation , selection Ad-Hoc Queries SQL, XQuery ETL/ELT MapReduce Analysis DM DM scalability scalability

3.„Big Data“ vs. „Traditional Data Management“ predictive , probabilistic , prescriptive sampling and storage vs. curation quantity vs. quality statistics vs. logic syntax vs. semantics

4.Application Scalable Data Management Machine Learning, Statistics, Data Analysis Data Science Control Flow Iterative Algorithms Error Estimation Active Sampling Sketches Curse of Dimensionality Decoupling Convergence Monte Carlo Mathematical Programming Linear Algebra Stochastic Gradient Descent Regression Statistics Hashing Parallelization Query Optimization Fault Tolerance Relational Algebra / SQL Scalability Data Analysis Language Compiler Memory Management Memory Hierarchy Data Flow Hardware Adaptation Indexing Resource Management NF 2 / XQuery Data Warehouse/OLAP ML DM “Data Scientist” – “Jack of All Trades!” Domain Expertise (e.g., Industry 4.0, Medicine , Physics , Engineering, Energy , Logistics) Real-Time

5.Big Data Analytics Requires Systems Programming R/ Matlab : 3 million users Hadoop : 100,000 users Data Analysis Statistics Algebra Optimization Machine Learning NLP Signal Processing Image Analysis Audio-,Video Analysis Information Integration Information Extraction Data Value Chain Data Analysis Process Predictive Analytics Indexing Parallelization Communication Memory Management Query Optimization Efficient Algorithms Resource Management Fault Tolerance Numerical Stability Big Data is now where database systems were in the 70s ( prior to relational algebra , query optimization and a SQL-standard)! “ We will soon have a huge skills shortage for data- related jobs .“ Neelie Kroes (ICT 2013, Nov. 7, Vilnius) “Big Data‘s Big Problem: Little Talent“ Wall Street Journal People with Big Data Analytics Skills Declarative languages to the rescue!

6.„What“, not „how“ Example: k-Means Clustering Hand-optimized code (data-, load- and system dependent) Mahout - Kmeans Implementierung Cluster.java public class Cluster { public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.kmeans.measure"; public static final String CLUSTER_PATH_KEY = "org.apache.mahout.clustering.kmeans.path"; public static final String CLUSTER_CONVERGENCE_KEY = "org.apache.mahout.clustering.kmeans.convergence"; private static int nextClusterId = 0; private final int clusterId; private Vector center = new SparseVector(0); private Vector centroid = null; private int numPoints = 0; private Vector pointTotal = null; private boolean converged = false; private static DistanceMeasure measure; private static double convergenceDelta = 0; public static String formatCluster(Cluster cluster) { return cluster.getIdentifier() + ": " + cluster.computeCentroid().asFormatString(); } public static Cluster decodeCluster(String formattedString) { int beginIndex = formattedString.indexOf([); String id = formattedString.substring(0, beginIndex); String center = formattedString.substring(beginIndex); char firstChar = id.charAt(0); boolean startsWithV = firstChar == V; if (firstChar == C || startsWithV) { int clusterId = Integer.parseInt(formattedString.substring(1, beginIndex - 2)); Vector clusterCenter = AbstractVector.decodeVector(center); Cluster cluster = new Cluster(clusterCenter, clusterId); cluster.converged = startsWithV; return cluster; } return null; } public static void configure(JobConf job) { try { ClassLoader ccl = Thread.currentThread().getContextClassLoader(); Class<?> cl = ccl.loadClass(job.get(DISTANCE_MEASURE_KEY)); measure = (DistanceMeasure) cl.newInstance(); measure.configure(job); convergenceDelta = Double.parseDouble(job.get(CLUSTER_CONVERGENCE_KEY)); nextClusterId = 0; } catch (ClassNotFoundException e) { throw new RuntimeException(e); } catch (IllegalAccessException e) { throw new RuntimeException(e); } catch (InstantiationException e) { throw new RuntimeException(e); } } public static void config(DistanceMeasure aMeasure, double aConvergenceDelta) { measure = aMeasure; convergenceDelta = aConvergenceDelta; nextClusterId = 0; } public static void emitPointToNearestCluster(Vector point, List<Cluster> clusters, Text values, OutputCollector<Text, Text> output) throws IOException { Cluster nearestCluster = null; double nearestDistance = Double.MAX_VALUE; for (Cluster cluster : clusters) { double distance = measure.distance(point, cluster.getCenter()); if (nearestCluster == null || distance < nearestDistance) { nearestCluster = cluster; nearestDistance = distance; } } String outKey = nearestCluster.getIdentifier(); String value = "1 " + values.toString(); output.collect(new Text(outKey), new Text(value)); } public static void outputPointWithClusterInfo(String key, Vector point, List<Cluster> clusters, Text values, OutputCollector<Text, Text> output) throws IOException { Cluster nearestCluster = null; double nearestDistance = Double.MAX_VALUE; for (Cluster cluster : clusters) { double distance = measure.distance(point, cluster.getCenter()); if (nearestCluster == null || distance < nearestDistance) { nearestCluster = cluster; nearestDistance = distance; } } output.collect(new Text(key), new Text(Integer .toString(nearestCluster.clusterId))); } private Vector computeCentroid() { if (numPoints == 0) return pointTotal; else if (centroid == null) { // lazy compute new centroid centroid = pointTotal.divide(numPoints); } return centroid; } public Cluster(Vector center) { super(); this.clusterId = nextClusterId++; this.center = center; this.numPoints = 0; this.pointTotal = center.like(); } public Cluster(Vector center, int clusterId) { super(); this.clusterId = clusterId; this.center = center; this.numPoints = 0; this.pointTotal = center.like(); } public Cluster(String clusterId) { this.clusterId = Integer.parseInt((clusterId.substring(1))); this.numPoints = 0; this.converged = clusterId.startsWith("V"); } @Override public String toString() { return getIdentifier() + " - " + center.asFormatString(); } public String getIdentifier() { if (converged) return "V" + clusterId; else return "C" + clusterId; } public void addPoint(Vector point) { addPoints(1, point); } public void addPoints(int count, Vector delta) { centroid = null; numPoints += count; if (pointTotal == null) pointTotal = delta.copy(); else pointTotal = pointTotal.plus(delta); } public Vector getCenter() { return center; } public int getNumPoints() { return numPoints; } public void recomputeCenter() { center = computeCentroid(); numPoints = 0; pointTotal = center.like(); } public boolean computeConvergence() { Vector centroid = computeCentroid(); converged = measure.distance(centroid, center) <= convergenceDelta; return converged; } public Vector getPointTotal() { return pointTotal; } public boolean isConverged() { return converged; } } KMeansClusterMapper.java public class KMeansClusterMapper extends KMeansMapper { @Override public void map(WritableComparable<?> key, Text values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { Vector point = AbstractVector.decodeVector(values.toString()); Cluster.outputPointWithClusterInfo(values.toString(), point, clusters, values, output); } } KMeansCombiner.java public class KMeansCombiner extends MapReduceBase implements Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { Cluster cluster = new Cluster(key.toString()); while (values.hasNext()) { String[] numPointnValue = values.next().toString().split(" "); cluster.addPoints(Integer.parseInt(numPointnValue[0].trim()), AbstractVector.decodeVector(numPointnValue[1].trim())); } output.collect(key, new Text(cluster.getNumPoints() + " " + cluster.getPointTotal().asFormatString())); } @Override public void configure(JobConf job) { super.configure(job); Cluster.configure(job); } } KMeansDriver.java public class KMeansDriver { private static final Logger log = LoggerFactory.getLogger(KMeansDriver.class); private KMeansDriver() { } public static void main(String[] args) { String input = args[0]; String clusters = args[1]; String output = args[2]; String measureClass = args[3]; double convergenceDelta = Double.parseDouble(args[4]); int maxIterations = Integer.parseInt(args[5]); runJob(input, clusters, output, measureClass, convergenceDelta, maxIterations, 2); } public static void runJob(String input, String clustersIn, String output, String measureClass, double convergenceDelta, int maxIterations, int numCentroids) { boolean converged = false; int iteration = 0; String delta = Double.toString(convergenceDelta); while (!converged && iteration < maxIterations) { log.info("Iteration {}", iteration); String clustersOut = output + "/clusters-" + iteration; converged = runIteration(input, clustersIn, clustersOut, measureClass, delta, numCentroids); clustersIn = output + "/clusters-" + iteration; iteration++; } log.info("Clustering "); runClustering(input, clustersIn, output + "/points", measureClass, delta); } private static boolean runIteration(String input, String clustersIn, String clustersOut, String measureClass, String convergenceDelta, int numReduceTasks) { JobClient client = new JobClient(); JobConf conf = new JobConf(KMeansDriver.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(conf, new Path(input)); Path outPath = new Path(clustersOut); FileOutputFormat.setOutputPath(conf, outPath); conf.setOutputFormat(SequenceFileOutputFormat.class); conf.setMapperClass(KMeansMapper.class); conf.setCombinerClass(KMeansCombiner.class); conf.setReducerClass(KMeansReducer.class); conf.setNumReduceTasks(numReduceTasks); conf.set(Cluster.CLUSTER_PATH_KEY, clustersIn); conf.set(Cluster.DISTANCE_MEASURE_KEY, measureClass); conf.set(Cluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta); client.setConf(conf); try { JobClient.runJob(conf); FileSystem fs = FileSystem.get(conf); return isConverged(clustersOut + "/part-00000", conf, fs); } catch (IOException e) { log.warn(e.toString(), e); return true; } } private static void runClustering(String input, String clustersIn, String output, String measureClass, String convergenceDelta) { JobClient client = new JobClient(); JobConf conf = new JobConf(KMeansDriver.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(conf, new Path(input)); Path outPath = new Path(output); FileOutputFormat.setOutputPath(conf, outPath); conf.setMapperClass(KMeansClusterMapper.class); conf.setNumReduceTasks(0); conf.set(Cluster.CLUSTER_PATH_KEY, clustersIn); conf.set(Cluster.DISTANCE_MEASURE_KEY, measureClass); conf.set(Cluster.CLUSTER_CONVERGENCE_KEY, convergenceDelta); client.setConf(conf); try { JobClient.runJob(conf); } catch (IOException e) { log.warn(e.toString(), e); } } private static boolean isConverged(String filePath, JobConf conf, FileSystem fs) throws IOException { Path outPart = new Path(filePath); SequenceFile.Reader reader = new SequenceFile.Reader(fs, outPart, conf); Text key = new Text(); Text value = new Text(); boolean converged = true; while (converged && reader.next(key, value)) { converged = value.toString().charAt(0) == V; } return converged; } } KMeansJob.java public class KMeansJob { private KMeansJob() { } public static void main(String[] args) throws IOException { if (args.length != 7) { System.err.println("Expected number of arguments 10 and received:" + args.length); System.err .println("Usage:input clustersIn output measureClass convergenceDelta maxIterations numCentroids"); throw new IllegalArgumentException(); } int index = 0; String input = args[index++]; String clusters = args[index++]; String output = args[index++]; String measureClass = args[index++]; double convergenceDelta = Double.parseDouble(args[index++]); int maxIterations = Integer.parseInt(args[index++]); int numCentroids = Integer.parseInt(args[index++]); runJob(input, clusters, output, measureClass, convergenceDelta, maxIterations, numCentroids); } public static void runJob(String input, String clustersIn, String output, String measureClass, double convergenceDelta, int maxIterations, int numCentroids) throws IOException { // delete the output directory JobConf conf = new JobConf(KMeansJob.class); Path outPath = new Path(output); FileSystem fs = FileSystem.get(conf); if (fs.exists(outPath)) { fs.delete(outPath, true); } fs.mkdirs(outPath); KMeansDriver.runJob(input, clustersIn, output, measureClass, convergenceDelta, maxIterations, numCentroids); } } KMeansMapper.java public class KMeansMapper extends MapReduceBase implements Mapper<WritableComparable<?>, Text, Text, Text> { protected List<Cluster> clusters; @Override public void map(WritableComparable<?> key, Text values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { Vector point = AbstractVector.decodeVector(values.toString()); Cluster.emitPointToNearestCluster(point, clusters, values, output); } void config(List<Cluster> clusters) { this.clusters = clusters; } @Override public void configure(JobConf job) { super.configure(job); Cluster.configure(job); clusters = new ArrayList<Cluster>(); KMeansUtil.configureWithClusterInfo(job.get(Cluster.CLUSTER_PATH_KEY), clusters); if (clusters.isEmpty()) throw new NullPointerException("Cluster is empty!!!"); } } KMeansReducer.java public class KMeansReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> { protected Map<String, Cluster> clusterMap; @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { Cluster cluster = clusterMap.get(key.toString()); while (values.hasNext()) { String value = values.next().toString(); String[] numNValue = value.split(" "); cluster.addPoints(Integer.parseInt(numNValue[0].trim()), AbstractVector .decodeVector(numNValue[1].trim())); } cluster.computeConvergence(); output.collect(new Text(cluster.getIdentifier()), new Text(Cluster .formatCluster(cluster))); } @Override public void configure(JobConf job) { super.configure(job); Cluster.configure(job); clusterMap = new HashMap<String, Cluster>(); List<Cluster> clusters = new ArrayList<Cluster>(); KMeansUtil.configureWithClusterInfo(job.get(Cluster.CLUSTER_PATH_KEY), clusters); setClusterMap(clusters); if (clusterMap.isEmpty()) throw new NullPointerException("Cluster is empty!!!"); } private void setClusterMap(List<Cluster> clusters) { clusterMap = new HashMap<String, Cluster>(); for (Cluster cluster : clusters) { clusterMap.put(cluster.getIdentifier(), cluster); } clusters.clear(); } public void config(List<Cluster> clusters) { setClusterMap(clusters); } } KMeansUtil.java public final class KMeansUtil { private static final Logger log = LoggerFactory.getLogger(KMeansUtil.class); private KMeansUtil() { } public static void configureWithClusterInfo(String clusterPathStr, List<Cluster> clusters) { JobConf job = new JobConf(KMeansUtil.class); Path clusterPath = new Path(clusterPathStr); List<Path> result = new ArrayList<Path>(); PathFilter clusterFileFilter = new PathFilter() { @Override public boolean accept(Path path) { return path.getName().startsWith("part"); } }; try { FileSystem fs = clusterPath.getFileSystem(job); FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus( clusterPath, clusterFileFilter)), clusterFileFilter); for (FileStatus match : matches) { result.add(fs.makeQualified(match.getPath())); } for (Path path : result) { SequenceFile.Reader reader = null; try { reader =new SequenceFile.Reader(fs, path, job); Text key = new Text(); Text value = new Text(); int counter = 1; while (reader.next(key, value)) { Cluster cluster = Cluster.decodeCluster(value.toString()); clusters.add(cluster); } } finally { if (reader != null) { reader.close(); } } } } catch (IOException e) { log.info("Exception occurred in loading clusters:", e); throw new RuntimeException(e); } } } 486 lines of code long development time non-robust runtime „How“ Declarative data analysis program with automatic optimization, parallelization and hardware adaption object RunKMeans { def main(args: Array[String]) { val km = new KMeans if (args.size < 5) { println(km.getDescription) return } val plan = km.getScalaPlan(args(0).toInt, args(1), args(2), args(3), args(4).toInt) LocalExecutor.execute(plan) System.exit(0) } } class KMeans extends PlanAssembler with PlanAssemblerDescription with Serializable { override def getDescription() = { "Parameters: [numSubStasks] [dataPoints] [clusterCenters] [output] [numIterations]" } override def getPlan(args: String*) = { getScalaPlan(args(0).toInt, args(1), args(2), args(3), args(4).toInt) } case class Point(x: Double, y: Double, z: Double) { def computeEuclidianDistance(other: Point) = other match { case Point(x2, y2, z2) => math.sqrt(math.pow(x - x2, 2) + math.pow(y - y2, 2) + math.pow(z - z2, 2)) } } case class Distance(dataPoint: Point, clusterId: Int, distance: Double) def asPointSum = (pid: Int, dist: Distance) => dist.clusterId -> PointSum(1, dist.dataPoint) def sumPointSums = (dataPoints: Iterator[(Int, PointSum)]) => { dataPoints.reduce { (z, v) => z.copy(_2 = z._2 + v._2) } } case class PointSum(count: Int, pointSum: Point) { def +(that: PointSum) = that match { case PointSum(c, Point(x, y, z)) => PointSum(count + c, Point(x + pointSum.x, y + pointSum.y, z + pointSum.z)) } def toPoint() = Point(round(pointSum.x / count), round(pointSum.y / count), round(pointSum.z / count)) private def round(d: Double) = math.round(d * 100.0) / 100.0; } def parseInput = (line: String) => { val PointInputPattern = """(\d+)\|-?(\d+\.\d+)\|-?(\d+\.\d+)\|-?(\d+\.\d+)\|""".r val PointInputPattern(id, x, y, z) = line (id.toInt, Point(x.toDouble, y.toDouble, z.toDouble)) } def formatOutput = (cid: Int, p: Point) => "%d|%.2f|%.2f|%.2f|".format(cid, p.x, p.y, p.z) def computeDistance(p: (Int, Point), c: (Int, Point)) = { val ((pid, dataPoint), (cid, clusterPoint)) = (p, c) val distToCluster = dataPoint.computeEuclidianDistance(clusterPoint) pid -> Distance(dataPoint, cid, distToCluster) } def getScalaPlan(numSubTasks: Int, dataPointInput: String, clusterInput: String, clusterOutput: String, numIterations: Int) = { val dataPoints = DataSource(dataPointInput, DelimitedInputFormat(parseInput)) val clusterPoints = DataSource(clusterInput, DelimitedInputFormat(parseInput)) def computeNewCenters(centers: DataSet[(Int, Point)]) = { val distances = dataPoints cross centers map computeDistance val nearestCenters = distances groupBy { case (pid, _) => pid } reduceGroup { ds => ds.minBy(_._2.distance) } map asPointSum.tupled val newCenters = nearestCenters groupBy { case (cid, _) => cid } reduceGroup sumPointSums map { case (cid, pSum) => cid -> pSum.toPoint() } newCenters } val finalCenters = clusterPoints.iterate(numIterations, computeNewCenters) val output = finalCenters.write(clusterOutput, DelimitedOutputFormat(formatOutput.tupled)) val plan = new ScalaPlan(Seq(output), "KMeans Iteration (Immutable)") plan.setDefaultParallelism(numSubTasks) plan } } 65 lines of code short development time robust runtime „What“ (Technology X Prototype) ( Scala frontend) (Hadoop)

7.X = Big Data Analytics – System Programming! („What“, not „How“) Description of „How“? (State of the art in scalable data analysis) Hadoop , MPI Larger human base of „data scientists“ Reduction of „human“ latencies Cost reduction Description of „What“? (declarative specification) Technology X Data Analyst Machine

8.Deep Analysis of „Big Data“ is Key! Small Data Big Data (3V) Deep Analytics Simple Analysis

9.Declarativity Query optimization Robust out-of-core Scalability User-defined functions Complex data types Schema on read Iterations Advanced Dataflows General APIs 9 Draws on D atabase Technology Draws on MapReduce T echnology Add Apache Flink: General Purpose P rogramming + Database E xecution

10.Introducing Apache Flink – A Success Story from Berlin Project started under the name “Stratosphere” late 2008 as a DFG funded research unit, lead by TU Berlin, in collaboration with HU Berlin, and the Hasso Plattner Institute Potsdam. Apache Open Source Incubation since April 2014, Apache Top Level Project since December 2014 Fast growing community of open source users and developers in Europe and worldwide, in academia (e.g., SICS/KTH, INRIA, ELTE) and companies (e.g., Researchgate , Spotify, Amadeus) More information : http://flink.apache.org

11.Rich S et of Operators Reduce Join Map Reduce Map Iterate Source Sink Source Map Iterate Project Reduce Delta Iterate Aggregate Join Filter Distinct CoGroup FlatMap Vertex Update Union GroupReduce Accumulators Alexandrov et al.: “The Stratosphere Platform for Big Data Analytics,” VLDB Journal 5/2014

12.Built-in vs. driver-based looping Step Step Step Step Step Client Step Step Step Step Step Client map join red. join Loop outside the system, in driver program Iterative program looks like many independent jobs Dataflows with feedback edges System is iteration-aware, can optimize the job Flink

13.Effect of optimization 13 Run on a sample on the laptop Run a month later after the data evolved Hash vs. Sort Partition vs. Broadcast Caching Reusing partition/sort Execution Plan A Execution Plan B Run on large files on the cluster Execution Plan C

14.Why optimization ? Do you want to hand-tune that?

15.Optimizing iterative programs Caching Loop-invariant Data Pushing work „out of the loop“ Maintain state as index

16.Streaming Flink execution engine is pipelined (streaming) c an implement true streaming & batch Spark’s execution engine materializes intermediate results (batch) c an only do micro-batch Batch Micro-Batch Streaming

17.Some Performance Numbers Flink Spark Spark no Cache Spark mllib Dataset Execution Time (sec) 320,000 640,000 1,280,000 0 200 40 0 6 00 8 00 10 00 TPCH 3 K-means ( K=800, D=1000) Execution Time (sec) 50G 100G 200G Dataset 0 Flink Spark 50 10 0 150 200 flickr dbpedia twitter Dataset Execution Time (sec) 0 500 1000 1500 2000 Flink delta Flink bulk Spark Spark graphx Connected Components Cluster Size Flink Spark Execution Time (sec) 10 20 40 0 10 0 2 0 0 3 00 4 00 500 6 00 7 00 5G 8G 11G 14G Memory Size Execution Time (sec) 0 200 40 0 6 00 8 00 10 00 5G 8G 11G 14G 0 5 0 0 100 0 150 0 20 00 250 00 30 00 Memory Size Execution Time (sec)

18.Evolution of Big Data Platforms 4 G 3G 2 G 1G Relational Databases Hadoop Flink Scale-out, Map/Reduce, UDFs Spark In-memory Performance and Improved P rogramming M odel In-memory + Out of Core P erformance , Declarativity, O ptimisation , Iterative A lgorithms , Streaming/Lambda

19.flink.apache.org Contributors wanted http://flink.apache.org

20.Data Flow Flink Program Program C ompiler Runtime Hash- and sort-based out-of-core operator implementations , memory management Flink O ptimizer Picks data shipping and local strategies, operator order Execution Plan Job Graph Execution Graph Parallel Runtime Task scheduling, network data transfers, resource allocation

21.Overview Paradigm MapReduce Iterative Data Flows Distributed Collections (RDD) Runtime Batch Parallel Sort Streaming In-memory & Out of Core Batch Processing in Memory Compilation/ Optimization None Holistic Planning for Data Exchange, Sort/Hash, Caching, ... None Flink 21

22.Data Model Flink vs. Spark Arbitrary Java Objects Arbitrary Java Objects Tuples as First Class Citizens Key/Value Pairs as First Class Citizens Joins / Grouping via Field References (tuple position, selector-function, field-name Joins / Grouping via Key/Value Pairs Flink 22