- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Apache Spark - Berkeley的数据分析堆栈
展开查看详情
1 . A Short History • Started at UC Berkeley in 2009 CS162 Operating Systems and Systems Programming • Open Source: 2010 Lecture 25 Apache Spark – Berkeley Data Analytics Stack (BDAS) • Apache Project: 2013 • Today: most popular big data processing engine November 27th, 2017 Prof. Ion Stoica http://cs162.eecs.Berkeley.edu 11/27/17 CS162 © UCB Fall 2017 Lec 24.2 What Is Spark? General • Parallel execution engine for big data processing • Unifies batch, interactive computations • Easy to use: 2-5x less code than Hadoop MR – High level API’s in Python, Java, and Scala • Fast: up to 100x faster than Hadoop MR – Can exploit in-memory when available – Low overhead scheduling, optimized engine • General: support multiple computation models Spark SQL Spark Core 11/27/17 CS162 © UCB Fall 2017 Lec 24.3 11/27/17 CS162 © UCB Fall 2017 Lec 24.4 Page 1
2 . General General • Unifies batch, interactive, streaming computations • Unifies batch, interactive, streaming computations • Easy to build sophisticated applications – Support iterative, graph-parallel algorithms – Powerful APIs in Scala, Python, Java Spark Spark Spark SQL Spark SQL MLlib GraphX Streaming Streaming Spark Core Spark Core 11/27/17 CS162 © UCB Fall 2017 Lec 24.5 11/27/17 CS162 © UCB Fall 2017 Lec 24.6 Easy to Write Code Large-Scale Usage Largest cluster: 8000 nodes WordCount in 3 lines of Spark Largest single job: 1 petabyte Top streaming intake: 1 TB/hour 2014 on-disk sort record WordCount in 50+ lines of Java MR 11/27/17 CS162 © UCB Fall 2017 Lec 24.7 11/27/17 CS162 © UCB Fall 2017 Lec 24.8 Page 2
3 . Fast: Time to sort 100TB RDD: Core Abstraction Write programs in terms of distributed datasets 2013 Record: 2100 machines and operations on them Hadoop 72 minutes • Resilient Distributed Datasets (RDDs) Operations – Collections of objects distr. across a • Transformations cluster, stored in RAM or on Disk (e.g. map, filter, groupBy) 2014 Record: 207 machines – Built through parallel transformations • Actions – Automatically rebuilt on failure (e.g. count, collect, save) Spark 23 minutes Also sorted 1PB in 4 hours Source: Daytona GraySort benchmark, sortbenchmark.org 11/27/17 CS162 © UCB Fall 2017 Lec 24.9 11/27/17 CS162 © UCB Fall 2017 Lec 24.10 Operations on RDDs Working With RDDs • Transformations f(RDD) => RDD § Lazy (not computed immediately) textFile = sc.textFile(”SomeFile.txt”)! § E.g. “map” • Actions: RDD § Triggers computation § E.g. “count”, “saveAsTextFile” 11/27/17 CS162 © UCB Fall 2017 Lec 24.11 11/27/17 CS162 © UCB Fall 2017 Lec 24.12 Page 3
4 . Working With RDDs Working With RDDs textFile = sc.textFile(”SomeFile.txt”)! textFile = sc.textFile(”SomeFile.txt”)! RDD RDD RDD RDD RDD RDD RDD RDD Action Value Transformations Transformations linesWithSpark.count()! 74! ! linesWithSpark.first()! # Apache Spark! linesWithSpark = textFile.filter(lambda line: "Spark” in line)! linesWithSpark = textFile.filter(lambda line: "Spark” in line)! 11/27/17 CS162 © UCB Fall 2017 Lec 24.13 11/27/17 CS162 © UCB Fall 2017 Lec 24.14 Example: Log Mining Example: Log Mining Load error messages from a log into memory, then Load error messages from a log into memory, then interactively search for various patterns interactively search for various patterns Worker Driver Worker Worker 11/27/17 CS162 © UCB Fall 2017 Lec 24.15 11/27/17 CS162 © UCB Fall 2017 Lec 24.16 Page 4
5 . Example: Log Mining Example: Log Mining Load error messages from a log into memory, then Load error messages from a log into memory, then interactively search for various patterns interactively search for various patterns Base RDD lines = spark.textFile(“hdfs://...”) Worker lines = spark.textFile(“hdfs://...”) Worker Driver Driver Worker Worker Worker Worker 11/27/17 CS162 © UCB Fall 2017 Lec 24.17 11/27/17 CS162 © UCB Fall 2017 Lec 24.18 Example: Log Mining Example: Log Mining Load error messages from a log into memory, then Load error messages from a log into memory, then interactively search for various patterns interactively search for various patterns Transformed RDD lines = spark.textFile(“hdfs://...”) Worker lines = spark.textFile(“hdfs://...”) Worker errors = lines.filter(lambda s: s.startswith(“ERROR”)) errors = lines.filter(lambda s: s.startswith(“ERROR”)) Driver Driver Worker Worker Worker Worker 11/27/17 CS162 © UCB Fall 2017 Lec 24.19 11/27/17 CS162 © UCB Fall 2017 Lec 24.20 Page 5
6 . Example: Log Mining Example: Log Mining Load error messages from a log into memory, then Load error messages from a log into memory, then interactively search for various patterns interactively search for various patterns lines = spark.textFile(“hdfs://...”) Worker lines = spark.textFile(“hdfs://...”) Worker errors = lines.filter(lambda s: s.startswith(“ERROR”)) errors = lines.filter(lambda s: s.startswith(“ERROR”)) messages = errors.map(lambda s: s.split(“\t”)[2]) messages = errors.map(lambda s: s.split(“\t”)[2]) messages.cache() Driver messages.cache() Driver messages.filter(lambda s: “mysql” in s).count() messages.filter(lambda s: “mysql” in s).count() Action Worker Worker Worker Worker 11/27/17 CS162 © UCB Fall 2017 Lec 24.21 11/27/17 CS162 © UCB Fall 2017 Lec 24.22 Example: Log Mining Example: Log Mining Load error messages from a log into memory, then Load error messages from a log into memory, then interactively search for various patterns interactively search for various patterns lines = spark.textFile(“hdfs://...”) Worker lines = spark.textFile(“hdfs://...”) Worker errors = lines.filter(lambda s: s.startswith(“ERROR”)) errors = lines.filter(lambda s: s.startswith(“ERROR”)) messages = errors.map(lambda s: s.split(“\t”)[2]) Block 1 messages = errors.map(lambda s: s.split(“\t”)[2]) tasks Block 1 messages.cache() Driver messages.cache() Driver tasks messages.filter(lambda s: “mysql” in s).count() messages.filter(lambda s: “mysql” in s).count() Worker tasks Worker Worker Block 2 Worker Block 2 Block 3 Block 3 11/27/17 CS162 © UCB Fall 2017 Lec 24.23 11/27/17 CS162 © UCB Fall 2017 Lec 24.24 Page 6
7 . Example: Log Mining Example: Log Mining Load error messages from a log into memory, then Load error messages from a log into memory, then interactively search for various patterns interactively search for various patterns Cache 1 lines = spark.textFile(“hdfs://...”) Worker lines = spark.textFile(“hdfs://...”) Worker errors = lines.filter(lambda s: s.startswith(“ERROR”)) errors = lines.filter(lambda s: s.startswith(“ERROR”)) messages = errors.map(lambda s: s.split(“\t”)[2]) Block 1 messages = errors.map(lambda s: s.split(“\t”)[2]) Block 1 messages.cache() Driver messages.cache() Driver Read Process HDFS & Cache Block Data messages.filter(lambda s: “mysql” in s).count() messages.filter(lambda s: “mysql” in s).count() Cache 2 Worker Worker Cache 3 Worker Block 2 Worker Process Block 2 Read HDFS Read & Cache Process Block HDFS Data & Cache Block 3 Block 3 Block Data 11/27/17 CS162 © UCB Fall 2017 Lec 24.25 11/27/17 CS162 © UCB Fall 2017 Lec 24.26 Example: Log Mining Example: Log Mining Load error messages from a log into memory, then Load error messages from a log into memory, then interactively search for various patterns interactively search for various patterns Cache 1 Cache 1 lines = spark.textFile(“hdfs://...”) Worker lines = spark.textFile(“hdfs://...”) Worker errors = lines.filter(lambda s: s.startswith(“ERROR”)) errors = lines.filter(lambda s: s.startswith(“ERROR”)) messages = errors.map(lambda s: s.split(“\t”)[2]) results messages = errors.map(lambda s: s.split(“\t”)[2]) Block 1 Block 1 messages.cache() Driver messages.cache() Driver results messages.filter(lambda s: “mysql” in s).count() Cache 2 messages.filter(lambda s: “mysql” in s).count() Cache 2 results Worker messages.filter(lambda s: “php” in s).count() Worker Cache 3 Cache 3 Worker Block 2 Worker Block 2 Block 3 Block 3 11/27/17 CS162 © UCB Fall 2017 Lec 24.27 11/27/17 CS162 © UCB Fall 2017 Lec 24.28 Page 7
8 . Example: Log Mining Example: Log Mining Load error messages from a log into memory, then Load error messages from a log into memory, then interactively search for various patterns interactively search for various patterns Cache 1 Cache 1 lines = spark.textFile(“hdfs://...”) Worker lines = spark.textFile(“hdfs://...”) Worker errors = lines.filter(lambda s: s.startswith(“ERROR”)) errors = lines.filter(lambda s: s.startswith(“ERROR”)) messages = errors.map(lambda s: s.split(“\t”)[2]) tasks Block 1 messages = errors.map(lambda s: s.split(“\t”)[2]) Block 1 messages.cache() Driver messages.cache() Driver Process from tasks Cache messages.filter(lambda s: “mysql” in s).count() Cache 2 messages.filter(lambda s: “mysql” in s).count() Cache 2 messages.filter(lambda s: “php” in s).count() tasks Worker messages.filter(lambda s: “php” in s).count() Worker Cache 3 Cache 3 Worker Block 2 Worker Process Block 2 from Process Cache from Block 3 Block 3 Cache 11/27/17 CS162 © UCB Fall 2017 Lec 24.29 11/27/17 CS162 © UCB Fall 2017 Lec 24.30 Example: Log Mining Example: Log Mining Load error messages from a log into memory, then Load error messages from a log into memory, then interactively search for various patterns interactively search for various patterns Cache 1 Cache 1 lines = spark.textFile(“hdfs://...”) Worker lines = spark.textFile(“hdfs://...”) Worker errors = lines.filter(lambda s: s.startswith(“ERROR”)) errors = lines.filter(lambda s: s.startswith(“ERROR”)) messages = errors.map(lambda s: s.split(“\t”)[2]) results messages = errors.map(lambda s: s.split(“\t”)[2]) Block 1 Block 1 messages.cache() Driver messages.cache() Driver results messages.filter(lambda s: “mysql” in s).count() Cache 2 messages.filter(lambda s: “mysql” in s).count() Cache 2 results messages.filter(lambda s: “php” in s).count() Worker messages.filter(lambda s: “php” in s).count() Worker Cache 3 Cache 3 Worker Block 2 Cache your data è Faster Results Worker Block 2 Full-text search of Wikipedia • 60GB on 20 EC2 machines Block 3 Block 3 • 0.5 sec from mem vs. 20s for on-disk 11/27/17 CS162 © UCB Fall 2017 Lec 24.31 11/27/17 CS162 © UCB Fall 2017 Lec 24.32 Page 8
9 . Language Support Expressive API Python Standalone Programs • map reduce lines = sc.textFile(...) • Python, Scala, & Java lines.filter(lambda s: “ERROR” in s).count() Interactive Shells Scala • Python & Scala val lines = sc.textFile(...) lines.filter(x => x.contains(“ERROR”)).count() Performance • Java & Scala are faster due to Java static typing JavaRDD<String> lines = sc.textFile(...); • …but Python is often fine lines.filter(new Function<String, Boolean>() { Boolean call(String s) { return s.contains(“error”); } }).count(); 11/27/17 CS162 © UCB Fall 2017 Lec 24.33 11/27/17 CS162 © UCB Fall 2017 Lec 24.34 Expressive API Fault Recovery • map reduce sample RDDs track lineage information that can be used to efficiently • filter count reconstruct lost partitions take • groupBy fold first • sort reduceByKey • union groupByKey partitionBy • join cogroup mapWith • leftOuterJoin cross pipe • rightOuterJoin zip save ... 11/27/17 CS162 © UCB Fall 2017 Lec 24.35 11/27/17 CS162 © UCB Fall 2017 Lec 24.36 Page 9
10 . Fault Recovery Example Fault Recovery Example • Two-partition RDD A={A1, A2} stored on disk • C1 lost due to node failure before reduce finishes 1) filter and cache à RDD B 2) joinà RDD C 3) aggregate à RDD D A1 filter B1 join C1 A1 filter B1 join C1 RDD A RDD A agg. D agg. D A2 filter B2 join C2 A2 filter B2 join C2 11/27/17 CS162 © UCB Fall 2017 Lec 24.37 11/27/17 CS162 © UCB Fall 2017 Lec 24.38 Fault Recovery Example Spark Streaming: Motivation • Many important apps must process large data streams at second- • C1 lost due to node failure before reduce finishes scale latencies – Site statistics, intrusion detection, online ML • Reconstruct C1, eventually, on different node • To build and scale these apps users want: – Integration: with offline analytical stack – Fault-tolerance: both for crashes and stragglers A1 filter B1 join C1 RDD A agg. D A2 filter B2 join C2 11/27/17 CS162 © UCB Fall 2017 Lec 24.39 11/27/17 CS162 © UCB Fall 2017 Lec 24.40 Page 10
11 . How does it work? How does it work? • Data streams are chopped into batches • Data streams are chopped into batches – A batch is an RDD holding a few 100s ms worth of data – A batch is an RDD holding a few 100s ms worth of data • Each batch is processed in Spark • Each batch is processed in Spark • Results pushed out in batches Streaming receivers receivers data streams data streams batches batches results 11/27/17 CS162 © UCB Fall 2017 Lec 24.41 11/27/17 CS162 © UCB Fall 2017 Lec 24.42 Streaming Word Count Word Count val lines = context.socketTextStream(“localhost”, 9999) val words = lines.flatMap(_.split(" ")) create DStream object NetworkWordCount { from data over socket def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("NetworkWordCount") val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) val context = new StreamingContext(sparkConf, Seconds(1)) split lines into words val lines = context.socketTextStream(“localhost”, 9999) val words = lines.flatMap(_.split(" ")) wordCounts.print() count the words val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.start() print some counts on screen ssc.awaitTermination() } } start processing the stream 11/27/17 CS162 © UCB Fall 2017 Lec 24.43 11/27/17 CS162 © UCB Fall 2017 Lec 24.44 Page 11
12 . Word Count Fault Recovery Example public class WordCountTopology { public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { super("python", "splitsentence.py"); } @Override Spark Streaming Storm public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override 140 Failure happens 119 public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Iteratrion time (s) Integer count = counts.get(word); 120 if (count == null) count = 0; count++; object NetworkWordCount { counts.put(word, count); collector.emit(new Values(word, count)); 100 def main(args: Array[String]) { } val sparkConf = new SparkConf().setAppName("NetworkWordCount") val context = new StreamingContext(sparkConf, Seconds(1)) @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { 81 80 val lines = context.socketTextStream(“localhost”, 9999) declarer.declare(new Fields("word", "count")); val words = lines.flatMap(_.split(" ")) } 58 58 59 59 } 57 57 57 val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() 56 60 public static void main(String[] args) throws Exception { ssc.start() ssc.awaitTermination() TopologyBuilder builder = new TopologyBuilder(); } builder.setSpout("spout", new RandomSentenceSpout(), 5); 40 } builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); 20 Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { 0 conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { 1 2 3 4 5 6 7 8 9 10 conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Iteration Thread.sleep(10000); cluster.shutdown(); } } } 11/27/17 CS162 © UCB Fall 2017 Lec 24.45 11/27/17 CS162 © UCB Fall 2017 Lec 24.46 Administrivia (1/2) Administrivia (2/2) • Midterm 3 coming up on Wen 11/29 6:30-8PM • Exam location: Assigned based on Cal student ID number: – All topics up to and including Lecture 24 – ends in 0, 1, 2, 3, 4: Li Ka Shing 245 » Focus will be on Lectures 17 – 24 and associated readings, – ends in 5, 6, 7: GPB 100 and Projects 3 – ends in 8, 9: Kroeber 160 » But expect 20-30% questions from materials from Lectures 1-16 • Rest of today – Closed book – 7:45pm: HKN class survey – 2 sides hand-written notes both sides – 8-9:30pm Review (VLSB 2050) 11/27/17 CS162 © UCB Fall 2017 Lec 24.47 11/27/17 CS162 © UCB Fall 2017 Lec 24.48 Page 12
13 . From RDDs to DataFrames Spark early adopters Users Data Engineers Data Scientists Understands Statisticians MapReduce R users & functional APIs PyData … BREAK 11/27/17 CS162 © UCB Fall 2017 Lec 24.49 11/27/17 CS162 © UCB Fall 2017 Lec 24.50 DataFrames in Spark Distributed collection of data grouped into named columns (i.e. RDD with schema) Domain-specific functions designed for common tasks – Metadata – Sampling – Project, filter, aggregation, join, … – UDFs Available in Python, Scala, Java, and R 11/27/17 CS162 © UCB Fall 2017 Lec 24.51 11/27/17 CS162 © UCB Fall 2017 Lec 24.52 Page 13
14 . Spark DataFrame Spark RDD Execution Similar APIs as single-node tools like Pandas, R, i.e., easy to learn opaque closures Java/Scala Python frontend (user-defined functions) • > head(filter(df, df$waiting < 50)) # an example in R frontend • ## eruptions waiting • ##1 1.750 47 • ##2 1.750 47 • ##3 1.867 48 JVM Python backend backend *Old Faithful geyser data from http://www.stat.cmu.edu/~larry/all-of-statistics/=data/faithful.dat 11/27/17 CS162 © UCB Fall 2017 Lec 24.53 11/27/17 CS162 © UCB Fall 2017 Lec 24.54 Spark DataFrame Execution Spark DataFrame Execution DataFrame Python Java/Scala R frontend DF DF DF Simple wrappers to create logical plan Logical Logical Plan Intermediate representation for computation Plan Intermediate representation for computation Catalyst Catalyst optimizer optimizer Physical Physical execution execution 11/27/17 CS162 © UCB Fall 2017 Lec 24.55 11/27/17 CS162 © UCB Fall 2017 Lec 24.56 Page 14
15 . Performance Benefit of Logical Plan: Performance Parity Across Languages SQL R DataFrame Python Java/Scala Python Python RDD RDD Java/Scala Java/Scala 0 2 4 6 8 10 0 2 4 6 8 10 Runtime for an example aggregation workload (sec) Runtime for an example aggregation workload (sec) 11/27/17 CS162 © UCB Fall 2017 Lec 24.57 11/27/17 CS162 © UCB Fall 2017 Lec 24.58 Further Optimizations TPC-DS Spark 2.0 vs 1.6 – Lower is Better 600 Time (1.6) 500 Spark 1.6 14M Time (2.0) • Whole-stage code generation rows/s – Remove expensive iterator calls 400 Runtime (seconds) 125M – Fuse across multiple operators Spark 2.0 rows/s 300 200 Parquet 11M Optimized input / output in 1.6 rows/s • Parquet + built-in cache 100 Parquet 90M in 2.0 rows/s 0 q1 1 3 5 7 9 q2 1 5 6 8 9 q3 1 7 8 9b 0 2 3 6 8 0 2 5 9 1 2 6 8 q7 1 3 5 4 4 9a q4 q1 q2 q3 q6 q7 q1 q1 q1 q1 q2 q2 q2 q2 q3 q3 q4 q4 q4 q4 q4 q5 q5 q5 q5 q6 q6 q6 q7 q7 q3 q7 q3 q3 Automatically applies to SQL, DataFrames, Datasets 11/27/17 CS162 © UCB Fall 2017 Lec 24.59 11/27/17 CS162 © UCB Fall 2017 Lec 24.60 Page 15
16 . From Streaming to Structured Streaming Data The simplest way to perform streaming analytics Late arrival, varying distribution over time, … is not having to reason about streaming Processing Output Business logic change & new ops How do we define (windows, sessions) output over time & correctness? 11/27/17 CS162 © UCB Fall 2017 Lec 24.61 11/27/17 CS162 © UCB Fall 2017 Lec 24.62 Structured Streaming Spark 1.3 Spark 2.0 High-level streaming API built on DataFrames Static DataFrames Infinite DataFrames • Event time, windowing, sessions, sources & sinks Also supports interactive & batch queries • Aggregate data in a stream, then serve using JDBC • Change queries at runtime • Build and apply ML models Not just streaming, but “continuous applications” Single API ! 11/27/17 CS162 © UCB Fall 2017 Lec 24.63 11/27/17 CS162 © UCB Fall 2017 Lec 24.64 Page 16
17 . Example: Batch Aggregation Example: Continuous Aggregation logs = ctx.read.format("json").open("s3://logs") logs = ctx.read.format("json").stream("s3://logs") logs.groupBy(“userid”, “hour”).avg(“latency”) logs.groupBy(“userid”, “hour”).avg(“latency”) .write.format("jdbc") .write.format("jdbc") .save("jdbc:mysql//...") .startStream("jdbc:mysql//...") 11/27/17 CS162 © UCB Fall 2017 Lec 24.65 11/27/17 CS162 © UCB Fall 2017 Lec 24.66 Example Apache Spark Today Traditional streaming • > 1,500 contributors Other processing types • > 400K meetup members world wide Ad-hoc Queries • > 300K students trained world wide Kafka ETL Database • 1,000s deployments in productions – Virtually every large enterprise – Available in all clouds (e.g., AWS, Google Compute Engine, MS Reporting Applications Azure, IBM, …) ML Model – Distributed by IBM, Cloudera, Hortonworks, MapR, Oracle, … • Databricks, startup to commercialize Apache Spark Goal: end-to-end continuous applications 11/27/17 CS162 © UCB Fall 2017 Lec 24.67 11/27/17 CS162 © UCB Fall 2017 Lec 24.68 Page 17
18 . Summary • Server à Datacenter DataFrames ML Pipelines • OS à Datacenter OS (e.g., Apache Mesos) • Applications à Big data / ML applications (e.g., Apache Spark) Spark • AMPLab Spark SQL MLlib GraphX Spark: unified engine across data Streaming – Massive success in industry,… sources, workloads and environments – and, academia: Spark Core » Faculty at MIT, Stanford, CMU, Cornell, etc » Two ACM Dissertation Awards Data Sources • New lab starting: RISELab {JSON} 11/27/17 CS162 © UCB Fall 2017 Lec 24.69 11/27/17 CS162 © UCB Fall 2017 Lec 24.70 RISELab RISELab (Real-time Intelligent Secure Execution) From live data to real-time decisions AMPLab From batch data to advanced analytics 11/27/17 CS162 © UCB Fall 2017 Lec 24.72 Page 18
19 . RISE stack Secure … Ray Clipper Tegra Analytics & ML Machine Time scheduler object store optimizer RISE μkernel Ground (data context service) SGX 11/27/17 CS162 © UCB Fall 2017 Lec 24.73 Page 19