- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 视频嵌入链接 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Bridging the Gap Between Data Scientists and Software Engineers
Deploying legacy Python algorithms to Apache Spark with minimum pain.
• About GE Aviation
• The problem
• Starting point
• Approach taken
• Some code
• Challenges
• Benefits
• Conclusions and recommendations
展开查看详情
1 .Bridging the Gap Between Data Scientists and Software Engineers Deploying legacy Python algorithms to Apache Spark with minimum pain 17 Oct 2019 Dr Lucas Partridge GE Aviation Dr Peter Knight Digital #UnifiedDataAnalytics #SparkAISummit
2 .About us Peter Knight (Data Scientist) - predicts wear on aircraft engines to minimize unplanned downtime. Lucas Partridge (Software Engineer) - helps the Data Scientists scale up their algorithms for big data. GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 2
3 .Outline • About GE Aviation • The problem • Starting point • Approach taken • Some code • Challenges • Benefits • Conclusions and recommendations GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 3
4 . General Electric - Aviation • 48k employees • $30.6B revenue - 2018 • >33k commercial engines “Every two seconds, an aircraft powered by GE technology takes off somewhere in the world” GE Aviation - Bridging the Gap between Data Scientists and 4 Software Engineers | 17 Oct 2019
5 .General problem • GE Aviation has 100s of data scientists and engineers developing Python algorithms. • But most develop and test their algorithms on their local machines and don’t have the time to learn Spark. • Spark = good candidate to make these algorithms scale as the engine fleet grows. • But how do we deploy these legacy algorithms to Spark as quickly as possible? GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 5
6 .Specific problem • Forecasting when aircraft engines should be removed for maintenance. • So we can predict what engine parts will be needed, where, and when. • ‘Digital Twin’ model exists for each important part to be tracked. • Tens of engine lines, tens of tracked parts → 100s of algorithms to scale! GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 6
7 .Starting point – a typical legacy Python algorithm a Pandas DataFrame! def execute(input_data): # Calculate results from input data # … return results also a Pandas DataFrame! GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 7
8 .The legacy Python algorithms • Used Pandas DataFrames. • Were run on laptops. Didn’t exploit Spark. • Each algorithm was run independently. • Each fetched its own data and read from, and wrote to, csv files. • Some Java Hadoop Map-Reduce and R ones too - not covered here. GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 8
9 .The legacy Python algorithms • Often failed at runtime. • Typically processed data for more than one asset (engine) at a time; they often tried to process all engines! • All the data would be read into a single Pandas DataFrame → ran out of memory! Bang! GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 9
10 .The legacy Python algorithms • Weren’t consistently written • function arguments vs globals • using different names for the same data column. • Had complex config in JSON – hard to do what-if runs. • Other considerations: • The problem domain suggested the need for a pipeline of algorithms. • Few data scientists and software engineers know about Spark, much less about ML Pipelines! GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 10
11 . Working towards a solution • Studied representative legacy algorithms • structure • how do they process data – columns required, sorting of data rows • are any tests available?! E.g., csv files of inputs and expected outputs. • Assumed we couldn’t alter the legacy code at all • so decided to wrap rather than port them to PySpark • i.e., legacy Python algorithm is called in parallel across the cluster. GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 11
12 .To wrap or to port a legacy algorithm? Wrap when… • You wish to retain the ability to run, test and update the algorithm outside Spark (e.g., on laptop, or in other Big Data frameworks). • An auto-code generation tool is available for generating all the necessary wrapper code. Port when… • Performance is critical. • Algorithm is small, simple and easy to test on Spark. • The algorithm’s creator is comfortable working directly with Spark. • Spark skills are available for foreseeable future. GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 12
13 . Initially tried wrapping with RDD.mapPartitions()… • Call it after repartitioning the input data by engine id. This worked but… • Could get unexpected key skew effects unless you experiment with the way your data is partitioned. • The data for more than one asset (engine) at a time could be passed into the wrapped algorithm. • ok if the algorithm can handle data for more than asset; otherwise not. • We really wanted to use @pandas_udf if Spark 2.3+ was available, and it’s ‘grouped map’ usage means that the data for only one asset gets passed to the algorithm. GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 13
14 . …so then we switched to RDD.groupByKey() • Where key = asset (engine) id. • So the data for only one asset gets passed to the algorithm. • This more closely mirrors the behaviour of @pandas_udf, so this code should be easier to convert to use @pandas_udf later on. • And it will work with algorithms that can only cope with the data for one asset at a time. GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 14
15 .Forecasting engine removals – solution components • Multiple Digital Twin models – each one models the wear on a single engine part. • Input Data Predictor - asks all the digital twin models what input data they need, and then predicts those values n years into the future. • Aggregator – compares all the predictions to estimate when a given engine should be removed due to the wear of a particular part. • → All of these were made into ML Pipeline Transformers… PipelineModel PipelineModel Historic Input Data of Digital Twin Aggregator Persist results data Predictor models GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 15
16 .Strategy taken • Passed data to algorithms rather than have each algorithm fetch its own data. • algorithm shouldn’t have to know where the data comes from. • Got representative digital twin models working in isolation, using temporary table of predicted input data as input. • Prototyped in notebook environment (Apache Zeppelin). • Eventually incorporated the pipeline into a spark-submit job using a new hierarchy of classes… GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 16
17 . Sample mixin classes Class hierarchy HasEsnCol HasDatetimeCol datetimeCol esnCol HasFleetCol fleetCol Params Key pyspark.ml Transformer A Estimator A GE code GeAnalytic A Code you write EngineWearModel A AnEstimator Parent class Child class HadoopMapReduce A GroupByKeyEngineWearModel A A Abstract class EngineWearModel DigitalTwinEnginePartXEngineTypeP DigitalTwinEnginePartYEngineTypeP GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 17
18 .Zooming in…. EngineWearModel A Abstract methods: _transform() _handleMissingData() _processInputColumns() _processResultsColumns() GroupByKeyEngineWearModel A Concrete methods: _transform() _runAnalyticForOneAsset() _convertRddOfPandasDataFramesToSparkDataFrame() DigitalTwinEnginePartXEngineTypeP Concrete methods: _handleMissingData() _processInputColumns() _processResultsColumns() GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 18
19 . GroupByKeyEngineWearModel A _transform() method Concrete methods: _transform() _runAnalyticForOneAsset() _convertRddOfPandasDataFramesToSparkDataFrame() def _transform(self, dataset): no_nulls_data = self._handleMissingData(dataset) data_with_processed_input_columns = self._processInputColumns(no_nulls_data) asset_col = self.getEsnCol() grouped_input_rdd = data_with_processed_input_columns\ .rdd.map(lambda row: (row[asset_col], row)).groupByKey().mapValues(list) results_rdd = grouped_input_rdd.mapValues( self._runAnalyticForOneAsset(self.getFailFast(), asset_col)) results_df = self._convertRddOfPandasDataFramesToSparkDataFrame(results_rdd) processed_results = self._processResultsColumns(results_df) output_df = dataset.join(processed_results, asset_col, 'left_outer') return output_df Note: Implement these methods in each DigitalTwinXXX class GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 19
20 . GroupByKeyEngineWearModel A Invoking the legacy Concrete methods: _transform() Pandas-based algorithm _runAnalyticForOneAsset() _convertRddOfPandasDataFramesToSparkDataFrame() def _runAnalyticForOneAsset(self, failFast, assetCol): # Import the named legacy algorithm: pandas_based_analytic_module = importlib.import_module( self.getExecuteModuleName()) # A param set by each digital twin class. a Pandas def _assetExecute(assetData): # Convert row data for asset into a Pandas DataFrame: DataFrame rows = list(assetData) column_names = rows[0].__fields__ input_data = pd.DataFrame(rows, columns=column_names) try: results = pandas_based_analytic_module.execute(input_data) # Call legacy algorithm. except Exception as e: asset_id = input_data[assetCol].iloc[0] ex = Exception("Encountered %s whilst processing asset id '%s'" % (e.__class__.__name__, asset_id), e.args[0]) if failFast: also a Pandas raise ex # Fail immediately, report error to driver node. else: DataFrame # Log error message silently in the Spark executor's logs: error_msg = "Silently ignoring this error: %s" % ex print(datetime.now().strftime("%y/%m/%d %H:%M:%S : ") + error_msg) return error_msg return results return _assetExecute GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 20
21 .def _runAnalyticForOneAsset(self, failFast, assetCol): # Import the named legacy algorithm: pandas_based_analytic_module = importlib.import_module( self.getExecuteModuleName()) # A param set by each digital twin class. def _assetExecute(assetData): # Convert row data for asset into a Pandas DataFrame: rows = list(assetData) column_names = rows[0].__fields__ input_data = pd.DataFrame(rows, columns=column_names) try: results = pandas_based_analytic_module.execute(input_data) # Call legacy algorithm. except Exception as e: asset_id = input_data[assetCol].iloc[0] ex = Exception("Encountered %s whilst processing asset id '%s'" % (e.__class__.__name__, asset_id), e.args[0]) if failFast: raise ex # Fail immediately, report error to driver node. else: # Log error message silently in the Spark executor's logs: error_msg = "Silently ignoring this error: %s" % ex print(datetime.now().strftime("%y/%m/%d %H:%M:%S : ") + error_msg) return error_msg return results GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 21 return _assetExecute
22 . GroupByKeyEngineWearModel A Converting the results back Concrete methods: _transform() into a Spark DataFrame _runAnalyticForOneAsset() _convertRddOfPandasDataFramesToSparkDataFrame() def _convertRddOfPandasDataFramesToSparkDataFrame(self, resultsRdd): errors_rdd = resultsRdd.filter(lambda results: not (isinstance(results[1], pd.DataFrame))) if not (errors_rdd.isEmpty()): print("Possible errors: %s" % errors_rdd.collect()) valid_results_rdd = resultsRdd.filter(lambda results: isinstance(results[1], pd.DataFrame)) if valid_results_rdd.isEmpty(): raise RuntimeError("ABORT! No valid results were obtained!") # Convert the Pandas dataframes into lists and flatten into one list. flattened_results_rdd = valid_results_rdd.flatMapValues( lambda pdf: (r.tolist() for r in pdf.to_records(index=False))).values() # Create Spark DataFrame, using a schema made from that of the first Pandas DataFrame. spark = SparkSession.builder.getOrCreate() first_pdf = valid_results_rdd.first()[1] # Pandas DataFrame first_pdf_schema = spark.createDataFrame(first_pdf).schema return spark.createDataFrame(flattened_results_rdd, first_pdf_schema) GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 22
23 .Algorithms before and after wrapping Item or activity BEFORE AFTER Standalone legacy algorithm Same algorithm wrapped for PySpark ML Pipeline Hosting location On single node or laptop In platform that runs spark-submit jobs on schedule. Configuration Held in separate JSON config file for each Stored in params of ML Pipeline which can be saved and loaded algorithm from disk for the whole pipeline. Config is part of the pipeline itself. Acquisition of input data Each algorithm fetched its own input data: made a PySpark spark.sql(“SELECT …”) statement for data required by all separate Hive query, wrote its input data to csv, the algorithms in the pipeline. Passed as a Spark DataFrame into then read it into a single in-memory Pandas transform() method for the whole pipeline. DataFrame for all applicable engines. All asset (engine) data Held in-memory on single machine Spread across executors of Spark cluster Writing of results Each algorithm wrote its output to csv which was Each algorithm appends a column of output to the Spark then loaded into Hive as a separate table. DataFrame that’s passed from one transform() to the next in the pipeline. Programming paradigm Written as an execute() function which called other Inherits from a specialised pyspark.ml.Transformer class functions. GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 23
24 .But it wasn’t all a bed of roses! Challenges… • Pipeline wasn’t really a simple linear pipeline • Digital twin models operate independently – so could really be run in parallel. • Many digital twins need to query data that’s in a different shape to the data that’s passed into the transform() method for the whole ML Pipeline. • Converting the Pandas DataFrames back into Spark DataFrames without hitting data-type conversion issues at runtime was tricky! Other data PipelineModel Historic Input Data of Digital Twin Aggregator Persist results data Predictor models GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 24
25 .More challenges • Debugging can be tricky! Tips: • failFast flag – True - stop processing if any asset throws an exception. Useful when debugging. – False - silently log an error message for any asset that throws an exception, but continue processing for other assets. Useful in production. • run with fewer engines and/or fleets when testing; gradually expand out. • Even simple things have to be encoded as extra transformers in the pipeline or added as extra params. • e.g., persisting data, when required, between different stages in the pipeline GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 25
26 .Benefits of this approach • Much more reliable – don’t run out of memory any more! • Will scale with the number of assets as the engine fleet grows. • Whole forecasting scenario runs as a single ML PipelineModel - one per engine type/config. • Consistent approach (and column names!) across the algorithms. GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 26
27 .Key benefit Data scientists who know little/nothing about Spark... • can still develop and test their algorithm outside Spark on their own laptop, and… • yet still have it deployed to Spark to scale with Big Data☺. You don’t have to rewrite each algorithm in PySpark to use the power of Spark. GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 27
28 .Potential next steps • Auto-generate the wrapper code for new Pandas-based algorithms; e.g., from a Data Science Workbench UI. Or, at the very least, create formal templates that encapsulate the lessons learned. • Allow the same test data csv files on a laptop to be used unaltered for testing in the deployed Spark environment. Need to verify that the ported algorithms actually work! • Switch to using @pandas_udf on later versions of Spark. GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 28
29 .Potential next steps • Look to optimize the entire pipeline, e.g., by removing Spark actions where possible, such as persisting intermediate results. • Many existing ‘algorithms’ – especially the digital twin models - are themselves really codified workflows or pipelines of lower-level algorithms. • so you could convert each algorithm into a pipeline of lower-level algorithms. • what are different algorithms now would simply become different pipelines; or even the same pipeline of transformers that’s just configured for a different engine part. GE Aviation - Bridging the Gap between Data Scientists and Software Engineers | 17 Oct 2019 29