Powering Custom Apps at Facebook using Spark Script Transformati

Script Transformation is an important and growing use-case for Apache Spark at Facebook. Spark’s script transforms allow users to run custom scripts and binaries directly from SQL and serves as an important means of stitching Facebook’s custom business logic with existing data pipelines.

Along with Spark SQL + UDFs, a growing number of our custom pipelines leverage Spark’s script transform operator to run user-provided binaries for applications such as indexing, parallel training and inference at scale. Spawning custom processes from the Spark executors introduces new challenges in production ranging from external resources allocation/management, structured data serialization, and external process monitoring.

In this session, we will talk about the improvements to Spark SQL (and the resource manager) to support running reliable and performant script transformation pipelines. This includes:
1) cgroup v2 containers for CPU, Memory and IO enforcement,
2) Transform jail for processes namespace management,
3) Support for complex types in Row format delimited SerDe,
4) Protocol Buffers for fast and efficient structured data serialization. Finally, we will conclude by sharing our results, lessons learned and future directions (e.g., transform pipelines resource over-subscription).

展开查看详情

1.WIFI SSID:Spark+AISummit | Password: UnifiedDataAnalytics

2.Powering Custom Apps at Facebook using Spark Script Transformation Abdulrahman Alfozan Spark Summit Europe

3.Agenda 1. Intro to Spark Script Transforms 2. Spark Transforms at Facebook 3. Core Engine Improvements 4. Efficiency Analysis and Results 5. Transforms Execution Model 6. Future Plans

4.Spark at Facebook 2019 Scaling Spark 2018 Full-production Largest Compute 2017 deployment Engine at Facebook by CPU Running 60TB+ 2016 shuffle pipelines Successor to Apache Hive at Facebook Few Pipelines in 2015 Production Small Scale Experiments Reliability and efficiency are our top priority

5.Agenda 1. Intro to Spark Script Transforms 2. Spark Transforms at Facebook 3. Core Engine Improvements 4. Efficiency Analysis and Results 5. Transforms Execution Model 6. Future Plans

6. Script Transforms SQL query SELECT TRANSFORM (inputs) USING “script” AS (outputs) FROM src_tbl;

7. Script Transforms SQL query Query plan SELECT ScriptTransformation (inputs, TRANSFORM (inputs) script, outputs) USING “script” AS (outputs) TableScan (src_tbl) FROM src_tbl;

8. Script Transforms SQL query Query plan SELECT ScriptTransformation (inputs, TRANSFORM (inputs) script, outputs) USING “script” AS (outputs) TableScan (src_tbl) FROM src_tbl; Input Table inputs Spark External Execution Process Output Table outputs

9.Why Script Transforms? 1. Flexibility: Unlike UDFs, transforms allow unlimited use-cases 2. Efficiency: Most transformers are written in C++

10.Why Script Transforms? 1. Flexibility: Unlike UDFs, transforms allow unlimited use-cases 2. Efficiency: Most transformers are written in C++ Transforms provide custom data processing while relying on Spark for ETL, data partitioning, distributed execution, and fault-tolerance.

11.Why Script Transforms? 1. Flexibility: Unlike UDFs, transforms allow unlimited use-cases 2. Efficiency: Most transformers are written in C++ Transforms provide custom data processing while relying on Spark for ETL, data partitioning, distributed execution, and fault-tolerance. e.g. Spark is optimized for ETL. PyTorch is optimized for model serving.

12.Agenda 1. Intro to Spark Script Transforms 2. Spark Transforms at Facebook 3. Core Engine Improvements 4. Efficiency Analysis and Results 5. Transforms Execution Model 6. Future plans

13. Transform Pipelines Usage % of overall CPU 15% 12% 9% 6% 3% 0%

14.Transform Pipelines Usage Comparison Query Count CPU Pure SQL (72%) Pure SQL (54%) Transforms & UDFs (45%) Transforms & UDFs (20%) DataFrames (8%) DataFrames (1%) Count CPU

15.Use-case 1: Batch Inference SQL Query ADD FILES inference_engine, model.md; Transform resources SELECT TRANSFORM (id INT, metadata STRING, image STRING) Input columns ROW FORMAT SERDE 'JSONSimpleSerDe' Input format USING ‘inference_engine --model=model.md’ AS labels MAP<STRING, DOUBLE> Output: category>confidence ROW FORMAT SERDE 'JSONSimpleSerDe' Output format FROM tlb_images;

16.Use-case 1: Batch Inference Transform main.cpp #include ”spark/Transformer.h” Transform lib ... while (transformer.readRow(input)) { // data processing auto prediction = predict(input) Row iterator // write output map transformer.writeRow(prediction) }

17.Use-case 1: Batch Inference PyTorch runtime container Spark Executor Self-contained Executable Spark Task Transform Process stdin JSON deserialization into InternalRow Serialization into JSON {id:1, metadata:, image:…} C++ objects Model JSON deserialization stdout C++ objects into InternalRow {label_1: score, label_2: score} serialization into JSON

18.Use-case 2: Batch Indexing SQL Query ADD FILES indexer; Transform resources SELECT TRANSFORM (shard_id INT, data STRING) Input columns ROW FORMAT SERDE ‘RowFormatDelimited‘ Input format USING ‘indexer --schema=data<STRING>’ FROM src_tbl CLUSTER BY shard_id; Partition operator

19.Use-case 2: Batch Indexing Execution Reducer Transforms Mappers Shuffle Reducer 1 Mapper 1 Spark Task Transform Process shard_id data shard_id data stdin 1 {…} 1 {…} indexer 1 {…} 1 {…} 2 {…} 1 {…} Reducer 2 Mapper 2 Spark Task shard_id 1 data {…} shard_id 2 2 data {…} {…} … 2 {…} 2 {…} 2 {…}

20.Agenda 1. Intro to Spark Script Transforms 2. Spark Transforms at Facebook 3. Core Engine Improvements 4. Efficiency Analysis and Results 5. Transforms Execution Model 6. Future Plans

21.Core Engine Improvements Operator ScriptTransformationExec.scala • Direct process invocation • Class IOSchema to handle SerDe schema and config • MonitorThread to track transform process progress • Transform process error handling and surfacing

22.Core Engine Improvements SerDe support • DelimitedJSONSerDe.scala JSON format standard RFC 8259

23.Core Engine Improvements SerDe support • SimpleSerDe.scala ROW FORMAT DELIMITED Configurable properties FIELDS TERMINATED BY ',' COLLECTION ITEMS TERMINATED BY '|' MAP KEYS TERMINATED BY ':' LINES TERMINATED BY '\n'

24.Core Engine Improvements SerDe support • Text-based DelimitedJSONSerDe.scala Development SimpleSerDe.scala • Binary Production ?

25.Core Engine Improvements Production SerDe Requirements • Binary format Text-based encoding is slow and less-compact

26.Core Engine Improvements Production SerDe Requirements • Binary format Text-based encoding is slow and less-compact • Zero-copy Access to serialized data without parsing or unpacking Improving Facebook’s performance on Android with FlatBuffers

27.Core Engine Improvements Production SerDe Requirements • Binary format Text-based encoding is slow and less-compact • Zero-copy Access to serialized data without parsing or unpacking Improving Facebook’s performance on Android with FlatBuffers • Word-aligned data Allow for SIMD optimizations

28.Binary SerDe Considerations • LazyBinarySerDe (Apache Hive) Not zero-copy nor word-aligned, require converters in Spark • Protocol Buffers / Thrift Not zero-copy, more suited for RPC • Flatbuffers / Cap’n Proto require converters (to/from InternalRow) in Spark Core • Apache Arrow great future option

29.Binary SerDe Considerations Chosen format UnsafeRow • Binary & Word-aligned • Zero-copy • Already part of Spark core • Available converters to/from InternalRow