- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Supporting Over a Thousand Custom Hive User Defined Functions
展开查看详情
1 .Supporting Over a Thousand Custom Hive User Defined Functions By Sergey Makagonov and Xin Yao Facebook
2 .Agenda • Introduction to User Defined Functions • Hive UDFs at Facebook • Major challenges and improvements • Partial aggregations
3 .What Are “User Defined Functions”?
4 .User Defined Functions • UDFs are used to add custom code logic if built-in functions cannot achieve desired result SELECT substr(description, 1, 100) AS first_100, count(*) AS cnt FROM tmp_table GROUP BY 1;
5 .Types of Hive functions • Regular user-defined functions (UDFs): work on a single row in a table and for one or more inputs produce a single output • User-defined table functions (UDTFs): for every row in a table can return multiple values as output • Aggregate functions (UDAFs): work on one or more rows in a table and produce a single output
6 .Types of Hive functions. Regular UDFs SELECT FB_ARRAY_CONCAT( arr1, arr2 arr1 arr2 ) AS zipped [‘a’, ‘b’, ‘c’] [‘d’, ‘e’, ‘f’] FROM dim_two_rows; [’foo’, ‘bar’] [‘baz’, ‘spam’] Output: [“a”,“b”,”c”,”d”,”e”,”f”] [“foo”,”bar”,”baz”,”spam”]
7 .Types of Hive functions. UDTFs SELECT id, idx id FROM dim_one_row 123 LATERAL VIEW STACK(3, 1, 2, 3) tmp AS idx; Output: 123 1 123 2 123 3
8 .Types of Hive functions. UDAFs SELECT id COLLECT_SET(id) AS all_ids 123 FROM dim_three_rows; 124 125 Output: [123, 124, 125]
9 .How Hive UDFs work in Spark • most Hive data types (java types and derivatives of ObjectInspector class) can be converted to Spark’s data types, and vise versa • Instances of Hive’s GenericUDF, SimpleGenericUDAF and GenericUDTF are called via wrapper classes extending Spark’s Expression, ImperativeAggregate and Generator classes respectively
10 .How Hive UDFs work in Spark
11 .UDFs at Facebook
12 .UDFs at Facebook • Hive was primary query engine until we started to migrate jobs to Spark and Presto • Over the course of several years, over a thousand custom User Defined Functions were built • Hive queries that used UDFs accounted for over 70% of CPU time • Supporting Hive UDFs in Spark is important for migration
13 .Identifying Baseline • At the beginning of Hive to Spark migration – the level of support of UDFs was unclear
14 . UDFs testing framework • Most of UDFs were already covered with basic tests during Hive days • We also had a testing framework built for running those tests in Hive
15 . UDFs testing framework • The framework was extended further to allow running queries against Spark • A temporary scala file is created for each UDF class, containing code to run SQL queries using DataFrame API • spark-shell subprocess is spawned to run the scala file: spark-shell --conf spark.ui.enabled=false … -i /tmp/spark- hive-udf-1139336654093084343.scala • Output is parsed and compared to the expected result
16 . UDFs testing framework • With test coverage in place, baseline support of UDFs by query count and CPU days was identified: 58% • Failed tests helped to identify the common issues
17 .Major challenges
18 . Unsupported APIs • getRequiredJars and getRequiredFiles - functions to automatically include additional resources required by this UDF. • initialize(StructObjectInspector) in GenericUDTF - Spark SQL uses a deprecated interface initialize(ObjectInspector[]) only. • configure (GenericUDF, GenericUDTF, and GenericUDAFEvaluator) - a function to initialize functions with MapredContext, which is inapplicable to Spark. • close (GenericUDF and GenericUDAFEvaluator) is a function to release associated resources. Spark SQL does not call this function when tasks finish. • reset (GenericUDAFEvaluator) - a function to re-initialize aggregation for reusing the same aggregation. Spark SQL currently does not support the reuse of aggregation. • getWindowingEvaluator (GenericUDAFEvaluator) - a function to optimize aggregation by evaluating an aggregate over a fixed window.
19 . Unsupported APIs • getRequiredJars and getRequiredFiles - functions to automatically include additional resources required by this UDF. • initialize(StructObjectInspector) in GenericUDTF - Spark SQL uses a deprecated interface initialize(ObjectInspector[]) only. • configure (GenericUDF, GenericUDTF, and GenericUDAFEvaluator) - a function to initialize functions with MapredContext, which is inapplicable to Spark. • close (GenericUDF and GenericUDAFEvaluator) is a function to release associated resources. Spark SQL does not call this function when tasks finish. • reset (GenericUDAFEvaluator) - a function to re-initialize aggregation for reusing the same aggregation. Spark SQL currently does not support the reuse of aggregation. • getWindowingEvaluator (GenericUDAFEvaluator) - a function to optimize aggregation by evaluating an aggregate over a fixed window.
20 .getRequiredFiles and getRequiredJars • functions to automatically include additional resources required by this UDF • UDF code can assume that file is present in the executor working directory
21 .Supporting required files/jars (SPARK-27543) Driver Executor Executor fetches files added to During initialization, for each U SparkContext from Driver For each UDF: DF: - If required file is in working - Identify required files and jars dir – do nothing (was distributed) - Register files for distribution: - If file is missing – try create a SparkContext.addFile(…) symlink to absolute path SparkContext.addJar(…)
22 . UDFs and Thread Safety • Majority of Hive UDFs are written without concurrency in mind • Hive runs tasks in a separate JVM process per each task • Spark runs a separate JVM process for each Executor, and Executor can run multiple tasks concurrently Executor Task 1 Task 2 UDF instance 1 UDF instance 2
23 .Thread-unsafe UDF Example • Consider that we have 2 tasks and hence 2 instances of UDF: “instance 1” and “instance 2” • evaluate method is called for each row, both of the instances could pass the null-check inside evaluate method at the same time • Once “instance 1” finishes initialization first, it will call evaluate for the next row • If “instance 2” is still in the middle of initializing the mapping, it could overwrite the data that “instance 1” relied on, which could lead to data corruption or an exception
24 .Approach 1: Introduce Synchronization • Introduce locking (synchronization) on the UDF class when initializing the mapping Cons: • Synchronization is computationally expensive • Requires manual and accurate refactoring of code, which does not scale for hundreds of UDFs
25 .Approach 2: Make Field Non-static • Turn static variable into an instance variable Cons: • Adds more pressure on memory (instances cannot share complex data) Pros: • Minimum changes in the code, which can also be codemoded for all other UDFs that use static fields of non-primitive types
26 .Kryo serialization/deserialization • In Spark, UDF objects are initialized on Driver, serialized, and later deserialized on executors • Some classes cannot be deserialized out of the box • Example: guava’s ImmutableSet. Kryo can successfully serialize the objects on the driver, but fails to deserialized them on executors
27 .Solving Kryo serde problem • Catch serde issues by running Hive UDF tests in cluster mode • For commonly used classes, write custom or import existing serializers • Mark problematic instance variables as transient
28 .Hive UDFs performance • Hive UDFs don’t support data types from Spark out of the box • Similarly, Spark cannot work with Hive’s object inspectors • For each UDF call, Spark’s data types are wrapped into Hive’s inspectors and java types • Same for the results: java types are converted back into Spark’s data types
29 .Hive UDFs performance • This wrapping/unwrapping overhead can lead up to 2x of CPU time spent in UDF compared to a Spark-native implementation • UDFs that work with complex types suffer the most