Running R at Scale with Apache Arrow on Spark

In this talk you will learn how to easily configure Apache Arrow with R on Apache Spark, which will allow you to gain speed improvements and expand the scope of your data science workflows; for instance, by enabling data to be efficiently transferred between your local environment and Apache Spark. This talk will present use cases for running R at scale on Apache Spark. It will also introduce the Apache Arrow project and recent developments that enable running R with Apache Arrow on Apache Spark to significantly improve performance and efficiency. We will end this talk by discussing performance and recent development in this space.


2. OVERVIEW Intro to R R with Spark Intro to Arrow Arrow with R Arrow on Spark


4. R LANGUAGE R is a programming language for statistical computing that is: vectorized, columnar and flexible.

5. R PACKAGES CRAN is R’s package manager, like NPM or Maven. Thousands of packages available and usage growing every year.

6. AN R PACKAGE One of many packages, rayrender: A ray tracer written in R using Rcpp. library(rayrender) generate_ground() %>% add_object(sphere()) %>% render_scene()


8. SPARKLYR 0.4 - INITIAL RELEASE Support to install, connect, analyze, model and extend Spark. spark_install() # Install Apache Spark sc <- spark_connect(master = "local") # Connect to Spark cluster cars <- spark_read_csv(sc, "cars", "mtcars/") # Read data in Spark dplyr::summarize(cars, n = n()) # Count records with dplyr DBI::dbGetQuery(sc, "SELECT count(*) FROM cars") # Count records with DBI ml_linear_regression(cars, mpg ~ wt + cyl) # Perform linear regression spark_context(sc) %>% invoke("version") # Extend sparklyr with Scala

9. SPARKLYR 0.5 - CONNECTIONS Support for Apache Livy, sc <- spark_connect(master = "http://livy-server") # Connect through Apache Livy Databricks connections, sc <- spark_connect(method = "databricks") # Connect to Databricks cluster dplyr improvements, and certified with Cloudera.

10. SPARKLYR 0.6 - DISTRIBUTED R Distribute R computations to execute arbitrary R code over each partition using your favorite R packages: scene <- generate_ground() %>% add_object(sphere(z = -2)) %>% add_object(sphere(z = +2)) %>% add_object(sphere(x = -2)) camera <- sdf_len(sc, 628, repartition = 628) %>% mutate(x = 12 * sin(id/100), z = 12 * cos(id/100)) spark_apply( camera, function(cam, scene) { name <- sprintf("%04d.png", cam$id) rayrender::render_scene( scene, width = 1920, height = 1080, lookfrom = c(cam$x, 5, cam$z), filename = name) system2("hadoop", c("fs", "-put", name, "path")) }, context = scene) %>% collect()

11. SPARKLYR 0.7 - PIPELINES AND MACHINE LEARNING Provide a uniform set of high-level APIs to help create, tune, and deploy machine learning pipelines at scale, pipeline <- ml_pipeline(sc) %>% # Define Spark pipeline ft_r_formula(mpg ~ wt + cyl) %>% # Add formula transformation ml_linear_regression() # Add model to pipeline fitted <- ml_fit(pipeline, cars) # Fit pipeline and support for all MLlib algorithms.

12. SPARKLYR 0.8 - MLEAP AND GRAPHS MLeap allows you to use your Spark pipelines in any Java enabled device or service, library(mleap) # Import MLeap package install_maven() # Install Maven install_mleap() # Install MLeap transformed <- ml_transform(fitted, cars) # Fit pipeline with dataset ml_write_bundle(fitted, transformed, "") # Export model with MLeap graphframes provides an interface to the GraphFrames Spark package. library(graphframes) # Import graphframes package g <- gf_graphframe(edges = edges_tbl) # Map to graphframe gf_pagerank(g, tol = 0.01) # Compute pagerank

13. SPARKLYR 0.9 - STREAMS Spark structured streams provide parallel and fault-tolerant data processing, stream_read_text(sc, "s3a://your-s3-bucket/") %>% # Define input stream spark_apply(~webreadr::read_s3(.x$line),) %>% # Transform with R group_by(uri) %>% # Group using dplyr summarize(n = n()) %>% # Count using dplyr arrange(desc(n)) %>% # Arrange using dplyr stream_write_memory("urls", mode = "complete") # Define output stream enables support for Kubernetes and to properly interrupt long-running operations. sc <- spark_connect(config = spark_config_kubernetes("k8s://hostname:8443"))

14. SPARKLYR 1.0 - ARROW Arrow enables faster and larger data transfers between Spark and R. XGBoost enables training gradient boosting models over distributed datasets. library(sparkxgb) dplyr::mutate(cars, eff = mpg > 20) %>% xgboost_classifier(eff ~ ., num_class = 2) TFRecords writes TensorFlow records from Spark to support deep learning workflows. library(sparktf) cars %>% spark_write_tfrecord("tfrecord")


16. WHAT IS ARROW? Apache Arrow is a cross-language development platform for in-memory data. Source:

17. MEMORY LAYOUT Columnar memory layout allows applications to avoid unnecessary IO and accelerate analytical processing performance on modern CPUs and GPUs. Source:


19. FEATHER PACKAGE A lightweight binary columnar data store designed for maximum speed, based on Arrow’s memory layout. library(feather) # Import feather package write_feather(mtcars, "cars.feather") # Write feather file in R read_feather("cars.feather") # Read feather file in R import feather # Import feather package df = feather.read_dataframe("cars.feather") # Read feather file in Python feather.write_dataframe(df, "cars.feather") # Write feather file in Python

20. ARROW PACKAGE Currently, install from GitHub: devtools::install_github("apache/arrow", subdir = "r", ref = "apache-arrow-0.13.0") The R arrow package supports feather, parquet, streams, and more. library(arrow) # Import arrow package read_feather("cars.feather") # Can still read feather file read_parquet("cars.parquet") # Can also read parquet files write_arrow(mtcars, raw()) # Can efficiently serialize [1] 44 02 00 00 10 00 00 00 00 00 0a 00


22. REQUIREMENTS To use Arrow with Spark and R you’ll need: A Spark 2.3.0+ cluster. Arrow 0.13+ instealled in every node, Arrow 0.11+ usable. R 3.5+, next version is likely to support R 3.1+. sparklyr 1.0+.

23. IMPLEMENTATION R transformations in Spark without and with Arrow:

24. COPY WITH ARROW Copy 10x larger datasets and 3x faster with Arrow and Spark. library(arrow) copy_to(sc, data.frame(y = 1:10^6))

25. COLLECT WITH ARROW Collect 5x larger datasets and 3x faster with Arrow and Spark. library(arrow) sdf_len(sc, 10^7) %>% collect()

26. TRANSFORM WITH ARROW Transform datasets 40x faster with R, Arrow and Spark. library(arrow) sdf_len(sc, 10^5) %>% spark_apply(~.x/2) %>% count()


28. RESOURCES Docs: GitHub: Blog: R Help: Spark Help: Issues: Chat: Twitter:

由Apache Spark PMC & Committers发起。致力于发布与传播Apache Spark + AI技术,生态,最佳实践,前沿信息。