Apache Beam提供了统一的大数据编程抽象,提供了不同的执行引擎支持,比如Spark/Flink/Storm等等,本篇介绍了大数据领域的几个常见编程范式(实时和批处理),并引申出Beam的API,如何做到统一的编程模型。

献良发布于2018/10/30 20:06

注脚

1.Apache Beam: portable and evolutive data-intensive applications Ismaël Mejía - @iemejia Talend

2.Who am I? @iemejia Software Engineer Apache Beam PMC / Committer ASF member Integration Software Big Data / Real-Time Open Source / Enterprise 2

3.New products We are hiring ! 3

4.Introduction: Big data state of affairs 4

5.Before Big Data (early 2000s) The web pushed data analysis / infrastructure boundaries ● Huge data analysis needs (Google, Yahoo, etc) ● Scaling DBs for the web (most companies) DBs (and in particular RDBMS) had too many constraints and it was hard to operate at scale. Solution: We need to go back to basics but in a distributed fashion 5

6.MapReduce, Distributed Filesystems and Hadoop ● Use distributed file systems (HDFS) to scale data storage horizontally ● Use Map Reduce to execute tasks in parallel (performance) ● Ignore strict model (let representation loose to ease scaling e.g. KV stores). (Prepare) Great for huge dataset analysis / transformation Map but… (Shuffle) ● Too low-level for many tasks (early frameworks) Reduce ● Not suited for latency dependant analysis (Produce) 6

7.The distributed database Cambrian explosion … and MANY others, all of them with different properties, utilities and APIs 7

8.Distributed databases API cycle NewSQL let's reinvent NoSQL, because our own thing SQL is too limited SQL is back, because it is awesome 8 (yes it is an over-simplification but you get it)

9.The fundamental problems are still the same or worse (because of heterogeneity) … ● Data analysis / processing from systems with different semantics ● Data integration from heterogeneous sources ● Data infrastructure operational issues Good old Extract-Transform-Load (ETL) is still an important need 9

10.The fundamental problems are still the same "Data preparation accounts for about 80% of the work of data scientists" [1] [2] 1 Cleaning Big Data: Most Time-Consuming, Least Enjoyable Data Science Task 2 Sculley et al.: Hidden Technical Debt in Machine Learning Systems 10

11.and evolution continues ... ● Latency needs: Pseudo real-time needs, distributed logs. ● Multiple platforms: On-premise, cloud, cloud-native (also multi-cloud). ● Multiple languages and ecosystems: To integrate with ML tools Software issues: New APIs, new clusters, different semantics, … and of course MORE data stores ! 11

12.Apache Beam 12

13.Apache Beam origin Colossus BigTable PubSub Dremel Google Cloud Dataflow Spanner Megastore Millwheel Flume Apache Beam MapReduce

14.What is Apache Beam? Apache Beam is a unified programming model designed to provide efficient and portable data processing pipelines

15.Beam Model: Generations Beyond MapReduce Improved abstractions let you focus on your application logic Batch and stream processing are both first-class citizens -- no need to choose. Clearly separates event time from processing time. 15

16.Streaming - late data 8:00 8:00 8:00 8:00 9:00 10:00 11:00 12:00 13:00 14:00

17.Processing Time vs. Event Time 17

18.Beam Model: Asking the Right Questions What results are calculated? Where in event time are results calculated? When in processing time are results materialized? How do refinements of results relate? 18

19.Beam Pipelines PTransform PCollection 19

20.The Beam Model: What is Being Computed? PCollection<KV<String, Integer>> scores = input .apply(Sum.integersPerKey()); scores = (input | Sum.integersPerKey())

21.The Beam Model: What is Being Computed? Event Time: Timestamp when the event happened Processing Time: Absolute program time (wall clock)

22.The Beam Model: Where in Event Time? PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) .apply(Sum.integersPerKey()); scores = (input | beam.WindowInto(FixedWindows(2 * 60)) | Sum.integersPerKey())

23. The Beam Model: Where in Event Time? ● Split infinite data into finite chunks Input Processing Time 12:00 12:02 12:04 12:06 12:08 12:10 Output Event Time 12:00 12:02 12:04 12:06 12:08 12:10

24.The Beam Model: Where in Event Time?

25.The Beam Model: When in Processing Time? PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)) .triggering(AtWatermark())) .apply(Sum.integersPerKey()); scores = (input | beam.WindowInto(FixedWindows(2 * 60) .triggering(AtWatermark()) | Sum.integersPerKey())

26.The Beam Model: When in Processing Time?

27.The Beam Model: How Do Refinements Relate? PCollection<KV<String, Integer>> scores = input .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)) .triggering(AtWatermark() .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) .withLateFirings(AtCount(1))) .accumulatingFiredPanes()) .apply(Sum.integersPerKey()); scores = (input | beam.WindowInto(FixedWindows(2 * 60) .triggering(AtWatermark() .withEarlyFirings(AtPeriod(1 * 60)) .withLateFirings(AtCount(1)) .accumulatingFiredPanes()) | Sum.integersPerKey())

28.The Beam Model: How Do Refinements Relate?

29.Customizing What Where When How 1 2 3 4 Classic Windowed Streaming Streaming Batch Batch + Accumulation 29

30.Apache Beam - Programming Model Element-wise Grouping Windowing/Triggers ParDo -> DoFn GroupByKey Windows CoGroupByKey FixedWindows MapElements GlobalWindows FlatMapElements SlidingWindows Filter Combine -> Reduce Sum Sessions Count WithKeys Triggers Min / Max Keys Mean AfterWatermark Values ... AfterProcessingTime Repeatedly 30

31.The Apache Beam Vision Other Beam 1. End users: who want to write pipelines Beam Java Languages Python in a language that’s familiar . 2. Library / IO connectors: Who want to create generic transforms. Beam Model: Pipeline Construction 3. SDK writers: who want to make Beam concepts available in new languages. Apache Cloud Apache Flink Dataflow Spark 4. Runner writers: who have a distributed processing environment and want to support Beam pipelines Beam Model: Fn Runners Execution Execution Execution 31

32.Runners Runners “translate” the code into the target runtime Apache Beam Apache Apex Apache Spark Apache Flink Apache Gearpump Direct Runner Google Cloud IBM Streams Apache Storm Ali Baba Apache Samza Hadoop Dataflow JStorm MapReduce WIP * Same code, different runners & runtimes

33.Beam IO (Data store connectors) Filesystems: Google Cloud Storage, Hadoop FileSystem, AWS S3, Azure Storage (in progress) File support: Text, Avro, Parquet, Tensorflow Cloud databases: Google BigQuery, BigTable, DataStore, Spanner, AWS Redshift (in progress) Messaging: Google Pubsub, Kafka, JMS, AMQP, MQTT, AWS Kinesis, AWS SNS, AWS SQS Cache: Redis, Memcached (in progress) Databases: Apache HBase, Cassandra, Hive (HCatalog), Mongo, JDBC Indexing: Apache Solr, Elasticsearch And other nice ecosystem tools / libraries: Scio: Scala API by Spotify Euphoria: Alternative Java API closer to Java 8 collections Extensions: joins, sorting, probabilistic data structures, etc. 33

34.A simple evolution example 34

35.A log analysis simple example Logs rotated and stored in HDFS and analyzed daily to measure user engagement. Running on-premise Hadoop cluster with Spark Data: 64.242.88.10 user01 07/Mar/2018:16:05:49 /news/abfg6f 64.242.88.10 user01 07/Mar/2018:16:05:49 /news/de0aff ... Output: user01, 32 urls, 2018/03/07 35

36.A log analysis simple example PCollection<KV<User, Long>> numVisits = pipeline .apply(TextIO.read().from("hdfs://...")) .apply(MapElements.via(new ParseLog())) .apply(Count.perKey()); $ mvn exec:java -Dexec.mainClass=beam.example.loganalysis.Main -Pspark-runner -Dexec.args="--runner=SparkRunner --master=tbd-bench" 36

37.A log analysis simple example Remember the software engineering maxima: Requirements always change We want to identify user sessions and calculate the number of URL visits per session and we need quicker updates from a different source, a Kafka topic and we will run this in a new Flink cluster * Session = a sustained burst of activity 37

38.A log analysis simple example PCollection<KV<User, Long>> numVisitsPerSession = pipeline .apply( KafkaIO.<Long, String>read() .withBootstrapServers("hostname") .withTopic("visits")) .apply(Values.create()) .apply(MapElements.via(new ParseLog())) .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(10)))) .apply(Count.perKey()); $ mvn exec:java -Dexec.mainClass=beam.example.loganalysis.Main -Pflink-runner -Dexec.args="--runner=FlinkRunner --master=realtime-cluster-master" 38

39.Apache Beam Summary Expresses data-parallel batch and streaming algorithms with one unified API. Cleanly separates data processing logic from runtime requirements. Supports execution on multiple distributed processing runtime environments. Integrates with the larger data processing ecosystem. 39

40.Current status and upcoming features 40

41.Beam is evolving too... ● Streaming SQL support via Apache Calcite ● Schema-aware PCollections friendlier APIs ● Composable IO Connectors: Splittable DoFn (SDF) (New API) ● Portability: Open source runners support for language portability ● Go SDK finally gophers become first class citizens on Big Data 41

42.IO connectors APIs are too strict "Source" "Transform" "Sink" A B InputFormat / Receiver / SourceFunction / ... OutputFormat / Sink / SinkFunction / ... Configuration: Configuration: Filepattern Directory Query string Table name Topic name Topic name … …

43.SDF - Enable composable IO APIs "Source" "Transform" "Sink" A B My filenames come on a I want to know which Kafka topic. Narrow APIs records failed to write are not I have a table per client + hackable I want to kick off another table of clients transform after writing

44. Splittable DoFn (SDF): Partial work via restrictions Element DoFn Element: what work Restriction: what part of the work Dynamically Splittable (Element, Restriction) SDF Design: s.apache.org/splittable-do-fn * More details in this video by Eugene Kirpichov Google Cloud Platform 44

45.Language portability Other Beam ● If I run a Beam python pipeline on the Beam Java Languages Python Spark runner, is it translated to PySpark? ● Wait, can I execute python on a Java Beam Model: Pipeline Construction based runner? ● Can I use the python Tensorflow Apache Cloud Apache transform from a Java pipeline? Flink Dataflow Spark ● I want to connect to Kafka from Python but there is not a connector Beam Model: Fn Runners can I use the Java one? Execution Execution Execution No 45

46.How do Java-based runners do work today? Worker Worker Pipeline Job SDK Runner Master Client Executor UDF (Runner) Worker Cluster Executor / Fn API 46

47.Portability Framework Worker Worker Job Server Master Job Docker Pipeline Container SDK protobuf UDF SDK Harness Artifact Staging Executor Artifacts (Runner) Client Worker Cluster Staging Provision Control Data Location DFS Artifact State Logging Retrieval Executor / Fn API

48.Language portability advantages Isolation of user code Isolated configuration of user environment Multiple language execution Mix user code in different languages Makes creating new SDK easier (homogeneous) Issues Performance overhead (15% in early evaluation). via extra RPC + container Extra component (docker) A bit more complex but it is the price of reuse and consistent environments

49.Go SDK First user SDK completely based on Portability API. func main() { p := beam.NewPipeline() s := p.Root() lines := textio.Read(s, *input) counted := CountWords(s, lines) formatted := beam.ParDo(s, formatFn, counted) textio.Write(s, *output, formatted) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } } 49

50.Contribute A vibrant community of contributors + companies: Google, data Artisans, Lyft, Talend, Yours? ● Try it and help us report (and fix) issues. ● Multiple Jiras that need to be taken care of. ● New feature requests, new ideas, more documentation. ● More SDKs (more languages) .net anyone please, etc ● More runners, improve existing, a native go one maybe? Beam is in a perfect shape to jump in. First Stable Release. 2.0.0 API stability contract (May 2017) Current: 2.6.0

51.Learn More! Apache Beam https://beam.apache.org The World Beyond Batch 101 & 102 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102 Join the mailing lists! user-subscribe@beam.apache.org dev-subscribe@beam.apache.org Follow @ApacheBeam on Twitter * The nice slides with animations were created by Tyler Akidau and Frances Perry and used with authorization. 51 Special thanks too to Eugene Kirpichov, Dan Halperin and Alexey Romanenko for ideas for this presentation.

52.Thanks 52

53.

user picture
  • 献良
  • 非著名互联网公司工程师

相关Slides

  • 本PPT解释了作为支持交易型分布式数据库系统的TiDB核心产品架构及其主要组件,包括TiDB,TiKV,Placement Driver,TiSpark,TheFlash,Tool,TiDB-operator for k8s等,对其基本作用进行阐述,并对其中的核心组件TiKV重点分析,解释了基本数据组织方式,执行方式,数据管理,水平扩展和负载均衡,以及分布式一致性等基本问题。最好对其分析引擎TiSpark也进行了简要功能说明。

  • 介绍了ES的基本结构,功能和原理,重点分析了在实际生产环境中各种运维和监控的指标,以及各种调优经验和配置参数,还有运维自动化的方法论探讨,可以作为ES在实际生产环境中的最佳实践部署和运维监控案例,也可以帮助ES平台维护者理解并思考如何提供更好的ES服务及运维保障。

  • Adaptive Execution @ Spark + AI Summit Europe 2018 Video @ https://databricks.com/session/spark-sql-adaptive-execution-unleashes-the-power-of-cluster-in-large-scale-2

  • Apache Spark作为分布式内存计算引擎,内存使用的优化对于性能提升至关重要,Intel的Optane(傲腾)技术,让内存和SSD之间架设了个新的数据缓存/存储层,并通过PMDK等特殊的API绕过文件系统,系统调用,内存拷贝等一系列额外操作,让性能有极大的提升。Intel开源的OAP(Optimized Analytics Package)for Apache Spark项目,也是基于这个前体,构建即席查询引擎,以及在机器学习算法诸如KMeans算法上也获得了不错的性能回报。