Divide and you will conquer Apache Spark. It’s quite common to develop a papyrus script where people try to initialize spark, read paths, execute all the logic and write the result. Even, we found scripts where all the spark transformations are done in a simple method with tones of lines. That means the code is difficult to test, to maintain and to read. Well, that means bad code. We built a set of tools and libraries that allows developers to develop their pipelines by joining all the Pieces. These pieces are compressed by Readers, Writers, Transformers, Aliases, etc. Moreover, it comes with enriched SparkSuites using the Spark-testing-base from Holden Karau. Recently, we start using junit4git (github.com/rpau/junit4git) in our tests, allowing us to execute only the Spark tests that matter by skipping tests that are not affected by latest code changes. This translates into faster builds and fewer coffees. By allowing developers to define each piece on its own, we enable to test small pieces before having the full set of them together. Also, it allows to re-use code in multiple pipelines and speed up their development by improving the quality of the code. The power of “Transform” method combined with Currying, creates a powerful tool that allows fragmenting all the Spark logic. This talk is oriented to developers that are being introduced in the Spark world and how developing iteration by iteration in small steps could help them in producing great code with less effort.

Spark开源社区发布于2019/05/20

注脚

展开查看详情

1.Modular Apache Spark: Transform Your Code into Pieces Albert Franzi (@FranziCros), Alpha Health

2.Slides available in: http://bit.ly/SparkAI2019-afranzi 2

3. About me 2013 2014 2015 2017 2018 2019 3

4. Modular Apache Spark: Transform Your Code into Pieces 4

5.We will learn: - How we simplified our spark code by modularizing - How we increased our test coverage in our spark code by using the spark- testing-base provided by Holden Karau - How we reduced the test time execution by skipping unaffected tests -- > less coffees 5

6.We will learn: - How we simplified our spark code by modularizing - How we increased our test coverage in our spark code by using the spark- testing-base provided by Holden Karau - How we reduced the test time execution by skipping unaffected tests -- > less coffees 6

7.Did you play with duplicated code across your Spark Jobs? Have you ever experienced the joy of code reviewing a never ending stream of spark chaos? @ The Shining 1980 7

8.Please! Don’t play with duplicated code never ever! @ The Shining 1980 8

9.Fragment the Spark Job Spark Readers Transformation Task Context Spark Writers Aliases Joins Formatters ... 9

10.Readers / Writers ● Enforce schemas ● Use schemas to read only the fields you are going to use Spark Readers ● Provide Readers per Dataset & attach its sources to it ● Share schemas & sources between Readers & Writers ● GDPR compliant by design Spark Writers 10

11. Readers val userBehaviourSchema: StructType = ??? val userBehaviourPath = Path("s3://<bucket>/user_behaviour/year=2018/month=10/day=03/hour=12/gen=27/") GDPR val userBehaviourReader = ReaderBuilder(PARQUET) .withSchema(userBehaviourSchema) .withPath(userBehaviourPath) Spark Readers .buildReader() val df: DataFrame = userBehaviourReader.read() 11

12. Readers val userBehaviourSchema: StructType = ??? // Path structure - s3://<bucket>/user_behaviour/[year]/[month]/[day]/[hour]/[gen]/ val userBehaviourBasePath = Path("s3://<bucket>/user_behaviour/") val startDate: ZonedDateTime = ZonedDateTime.now(ZoneOffset.UTC) val halfDay: Duration = Duration.ofHours(12) val userBehaviourPaths: Seq[Path] = PathBuilder .latestGenHourlyPaths(userBehaviourBasePath, startDate, halfDay) Spark Readers val userBehaviourReader = ReaderBuilder(PARQUET) .withSchema(userBehaviourSchema) .withPath(userBehaviourPaths: _*) .buildReader() val df: DataFrame = userBehaviourReader.read() 12

13. Readers val userBehaviourSchema: StructType = ??? val userBehaviourBasePath = Path("s3://<bucket>/user_behaviour/") val startDate: ZonedDateTime = ZonedDateTime.now(ZoneOffset.UTC) val halfDay: Duration = Duration.ofHours(12) val userBehaviourReader = ReaderBuilder(PARQUET) Spark Readers .withSchema(userBehaviourSchema) .withHourlyPathBuilder(userBehaviourBasePath, startDate, halfDay) .buildReader() val df: DataFrame = userBehaviourReader.read() 13

14. Readers val df: DataFrame = UserBehaviourReader.read(startDate, halfDay) Spark Readers 14

15. Transforms def transform[U](t: (Dataset[T]) ⇒ Dataset[U]): Dataset[U] Transformation Dataset[T] ⇒ magic ⇒ Dataset[U] 15

16. Transforms def withGreeting(df: DataFrame): DataFrame = { df.withColumn("greeting", lit("hello world")) } def extractFromJson(colName: String, outputColName: String, Transformation jsonSchema: StructType)(df: DataFrame): DataFrame = { df.withColumn(outputColName, from_json(col(colName), jsonSchema)) } 16

17. Transforms def onlyClassifiedAds(df: DataFrame): DataFrame = { df.filter(col("event_type") === "View") .filter(col("object_type") === "ClassifiedAd") } def dropDuplicates(df: DataFrame): DataFrame = { df.dropDuplicates() } def cleanedCity(df: DataFrame): DataFrame = { df.withColumn("city", getCityUdf(col("object.location.address"))) } Transformation val cleanupTransformations: Seq[DataFrame => DataFrame] = Seq( dropDuplicates, cleanedCity, onlyClassifiedAds ) val df: DataFrame = UserBehaviourReader.read(startDate, halfDay) val classifiedAdsDF = df.transforms(cleanupTransformations: _*) 17

18. Transforms val cleanupTransformations: Seq[DataFrame => DataFrame] = Seq( dropDuplicates, cleanedCity, onlyClassifiedAds ) val df: DataFrame = UserBehaviourReader.read(startDate, halfDay) val classifiedAdsDF = df.transforms(cleanupTransformations: _*) Transformation “As a data consumer, I only need to pick up which transformations I would like to apply, instead of coding them from scratch.” “It’s like cooking, engineers provide manufactured ingredients (transformations) and Data Scientists use the required ones for a successful receipt.” 18

19. Transforms - Links of Interest github.com/MrPowers/spark-daria “Spark helper methods to maximize developer productivity.” “DataFrame transformations can be defined with arguments so they don’t make assumptions about the schema of the underlying DataFrame.” - by Matthew Powers. Transformation bit.ly/Spark-ChainingTransformations bit.ly/Spark-SchemaIndependentTransformations 19

20.We will learn: - How we simplified our spark code by modularizing - How we increased our test coverage in our spark code by using the spark- testing-base provided by Holden Karau - How we reduced the test time execution by skipping unaffected tests -- > less coffees 20

21.Did you put untested Spark jobs into production? “Mars Climate Orbiter destroyed because of a Metric System Mixup (1999)” 21

22.Testing with Holden Karau github.com/holdenk/spark-testing-base “Base classes to use when writing tests with Spark.” ● Share Spark Context between tests ● Provide methods to make tests easier ○ Fixture Readers ○ Json to DF converters ○ Extra validators github.com/MrPowers/spark-fast-tests “An alternative to spark-testing-base to run tests in parallel without restarting Spark Session after each test file.” 22

23.Testing SharedSparkContext package com.holdenkarau.spark.testing import java.util.Date import org.apache.spark._ import org.scalatest.{BeforeAndAfterAll, Suite} /** * Shares a local `SparkContext` between all tests in a suite * and closes it at the end. You can share between suites by enabling * reuseContextIfPossible. */ trait SharedSparkContext extends BeforeAndAfterAll with SparkContextProvider { self: Suite => ... protected implicit def reuseContextIfPossible: Boolean = false ... } 23

24.Testing package com.alpha.data.test trait SparkSuite extends DataFrameSuiteBase { self: Suite => override def reuseContextIfPossible: Boolean = true protected def createDF(data: Seq[Row], schema: StructType): DataFrame = { spark.createDataFrame(spark.sparkContext.parallelize(data), schema) } protected def jsonFixtureToDF(fileName: String, schema: Option[StructType] = None): DataFrame = { val fixtureContent = readFixtureContent(fileName) val fixtureJson = fixtureContentToJson(fixtureContent) jsonToDF(fixtureJson, schema) } protected def checkSchemas(inputSchema: StructType, expectedSchema: StructType): Unit = { assert(inputSchema.fields.sortBy(_.name).deep == expectedSchema.fields.sortBy(_.name).deep) } ... } 24

25.Testing Spark Readers Transformation Task Context Spark Writers Test Testing each piece independently helps testing all together. 25

26.We will learn: - How we simplified our spark code by modularizing - How we increased our test coverage in our spark code by using the spark- testing-base provided by Holden Karau - How we reduced the test time execution by skipping unaffected tests -- > less coffees 26

27.Are tests taking too long to execute? 27

28.Junit4Git by Raquel Pau github.com/rpau/junit4git “Junit Extensions for Test Impact Analysis.” “This is a JUnit extension that ignores those tests that are not related with your last changes in your Git repository.” 28

29.Junit4Git - Gradle conf configurations { agent } @RunWith(classOf[ScalaGitRunner]) dependencies { testCompile("org.walkmod:scalatest4git_2.11:${version}") agent "org.walkmod:junit4git-agent:${version}" } test.doFirst { jvmArgs "-javaagent:${configurations.agent.singleFile}" } 29