Ge aviation spark application experience porting analytics

GE是商用喷气发动机制造领域的全球领导者,为许多最畅销的商用机身提供产品。 GE航空公司拥有超过33,000台发动机,具有开发分析功能以监控其商用发动机车队的历史。近年来,与传统的分析方法相比,GE航空数字公司为发动机监控开发了先进的分析解决方案,其目标是改进检测并减少错误警报。高级分析在实时监控系统中实施,该系统以每次航班为基础通知GE的车队支持团队。这些分析是使用大型历史数据集开发和验证的。 直到最近,当GE的数据被转移到Apache Spark环境时,才使用SQL Server和MATLAB等分析工具。因此,我们的高级分析现在正在迁移到Spark,其中还应该通过更大的数据集来提高性能。我们将分享将我们的高级算法转换为自定义Spark ML管道的经验,以及概述各种案例研究。
展开查看详情

1.Experience Porting Analytics into PySpark ML Pipelines 4 Oct 2018 Prof Honor Powrie Dr Peter Knight GE Aviation Digital Session hashtag: #SAISExp12

2.Outline • GE Aviation - commercial engines, data and analytics overview • Historic analytic development process • Converting analytics to PySpark ML Pipelines • Plotting ML Pipelines • The full analytic lifecycle in Spark • Conclusions GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 2

3. General Electric - Aviation • 40k employees • $27.4B revenue - 2017 • >33k commercial engines “Every two seconds, an aircraft powered by GE technology takes off somewhere in the world” GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 3

4.HISTORICAL TODAY TOMORROW 1 KB / FLIGHT 200 KB / FLIGHT 1 GB / FLIGHT 30 PARAMETERS 1000 PARAMETERS 1000 PARAMETERS @ 10 Hz 3 SNAPSHOTS / 20 SNAPSHOTS / 3.5 HR / FLIGHT FLIGHT FLIGHT < 50 GB per year 100K flights per day …100K flights per day 10 TB per year …….100 TB per day …50 PB per year

5.ML and Analytic Applications - GE Commercial Engines Operational Lifecycle Shop Visit Forecast Enterprise Workscope Material Prediction Supply Chain Management Time on Wing Optimisation 3 2 1 Fleet Monitoring Fleet Management Borescope Imaging Digital Twin Models GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 5

6.About Functional Elements Examples • GE Aviation’s custom ML library, • Fleet based on Probabilistic Graphical Segmentation Models • Developed to tackle some of the • Multivariate key challenges of real world data Models and • Used extensively in ML applications Anomaly Detection for GE commercial engines • Being recoded in C++ and Python • Diagnostic and integrated with Spark Reasoning GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 6

7.Historic Analytic Development and Deployment Process Develop (Data Scientists) Deploy (Software Engineers) Hand off package Analytic recoded in Java Data: Greenplum to SQL server Functional spec XML configuration Windows Server environment Oracle Database Model files MATLAB data exploration Predix run-time Configuration file Test in QA environment MATLAB analytics Test cases Deploy to pre-production MATLAB generate metrics Deploy to production Toll Gate Reviews Monitor in production (Spotfire) Aim: convert entire model building & deployment workflow to ML Pipelines GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 7

8.Why we like Spark ML Pipelines • Easy to string together pre-processing, ML and alerting stages • Same pipeline – and code - for analytic development (model building), evaluation and deployment (run time) • Extensive library of existing analytics and data manipulation tools • Extensible for our own analytics using Python – in a standard framework • Self describing – explainParams()shows you how to use it • Scales to big data GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 8

9.Converting our Workflow to Spark ML Pipelines • The pipeline includes various custom modules before & after ML, e.g. normalisation & alerting. • Some analytics are more complex than just processing on a row by row basis • e.g. Median Trend: group the data by engine (and configurable other conditions) and have a sliding window (by date or records) that is sorted by date. • For our ML algorithms we have overcome a number of challenges… GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 9

10.Converting ProDAPS to Spark ML Our ProDAPS ML analytic seems more complex than many already in Spark. For example: • Once the model is built, many different types of inference can be configured • Most existing ML analytics use Dense Vectors, but these have limitations e.g. they can’t handle null records • We want to be able to filter the data between pipeline stages • We are still on Spark 2.2, so had to write our own method for saving and loading custom stages • No export/import commands for porting pipelines between Spark clusters • Tedious to add params – see next slide • Our full wish-list is on JIRA at: SPARK-19498 GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 10

11. Example Custom Transformer Showing Verbose Param Code Straight line: y = m x + c Current code required (~50 lines) from pyspark import keyword_only from pyspark.ml.param.shared import Param, Params, TypeConverters from pyspark import keyword_only from pyspark.ml import Transformer from pyspark import keyword_only from pyspark.ml.param.shared import Param, Params, TypeConverters from pyspark.ml import• Transformer Currently for Params, each parameter that the default values are class StraightLine(Transformer): from pyspark.ml.param.shared import Param, TypeConverters @keyword_only def __init__(self, inputCol=None, outputCol=None, m=1.0, c=0.0): from pyspark.ml import Transformer super(StraightLine, self).__init__() self._setDefault(inputCol=None, outputCol=None, m=1.0, c=0.0) being set 3 times and the parameter names are being class StraightLine(Transformer): kwargs = self._input_kwargs class StraightLine(Transformer): self.setParams(**kwargs) @keyword_only @keyword_only @keyword_only entered 9 times! def setParams(self, inputCol=None, outputCol=None, m=1.0, c=0.0): def __init__(self, inputCol=None, outputCol=None, m=1.0, c=0.0): kwargs = self._input_kwargs def __init__(self, inputCol=None, outputCol=None, m=1.0, c=0.0): return self._set(**kwargs) super(StraightLine, self).__init__() super(StraightLine, self).__init__() self._setDefault(inputCol=None, outputCol=None, m=1.0, c=0.0) self._setDefault(inputCol=None, outputCol=None, m=1.0, c=0.0) inputCol = Param(Params._dummy(), "inputCol", "the input column name (your X). (string)", typeConverter=TypeConverters.toString) kwargs = self._input_kwargs def setInputCol(self, value): kwargs = self._input_kwargs return self._set(inputCol=value) self.setParams(**kwargs) def getInputCol(self): self.setParams(**kwargs) return self.getOrDefault(self.inputCol) @keyword_only @keyword_only def setParams(self, inputCol=None, outputCol=None, m=1.0, c=0.0): outputCol = Param(Params._dummy(), "outputCol", "the output column name (your Y). (string)", def setParams(self, inputCol=None, outputCol=None, m=1.0, c=0.0): typeConverter=TypeConverters.toString) kwargs = self._input_kwargs def setOutputCol(self, value): kwargs = self._input_kwargs return self._set(outputCol=value) return self._set(**kwargs) def getOutputCol(self): return self.getOrDefault(self.outputCol) return self._set(**kwargs) m = Param(Params._dummy(), "m", "the slope of the line. (float)", inputCol = Param(Params._dummy(), "inputCol", "the input column name (your X). (string)" typeConverter=TypeConverters.toFloat) inputCol = Param(Params._dummy(), "inputCol", "the input column name (your X). (string)", typeConverter=TypeConverters.toString) def setM(self, value): typeConverter=TypeConverters.toString) return self._set(m=value) def setInputCol(self, value): def getM(self): def setInputCol(self, value): return self.getOrDefault(self.m) return self._set(inputCol=value) return self._set(inputCol=value) c = Param(Params._dummy(), "c", "the y offset when x = 0. (float)", def getInputCol(self): typeConverter=TypeConverters.toFloat) def getInputCol(self): def setC(self, value): return self.getOrDefault(self.inputCol) return self._set(c=value) return self.getOrDefault(self.inputCol) def getC(self): return self.getOrDefault(self.c) outputCol = Param(Params._dummy(), "outputCol", "the output column name (your Y). (strin outputCol = Param(Params._dummy(), "outputCol", "the output column name (your Y). (string)", def _transform(self, dataset): typeConverter=TypeConverters.toString) typeConverter=TypeConverters.toString) input_col = self.getInputCol() def setOutputCol(self, value): if not input_col: def setOutputCol(self, value): return self._set(outputCol=value) raise Exception("inputCol not supplied") return self._set(outputCol=value) def getOutputCol(self): output_col = self.getOutputCol() def getOutputCol(self): if not output_col: return self.getOrDefault(self.outputCol) GE Aviation - porting analytics to PySpark ML Pipelines | 4return raise Exception("outputCol not supplied") Oct 2018 self.getOrDefault(self.outputCol) 11 return dataset.selectExpr("*", m = Param(Params._dummy(), "m", "the slope of the line. (float)", str(self.getM()) + " * " + input_col + " + " + str(self.getC()) + " AS " + output_col)

12. Example Custom Transformer Showing Verbose Param Code Straight line: y = m x + c Current code required (~50 lines) from pyspark import keyword_only from pyspark.ml.param.shared import Param, Params, TypeConverters from pyspark.ml import Transformer class StraightLine(Transformer): @keyword_only • Currently for each parameter that the default values are def __init__(self, inputCol=None, outputCol=None, m=1.0, c=0.0): super(StraightLine, self).__init__() self._setDefault(inputCol=None, outputCol=None, m=1.0, c=0.0) being set 3 times and the parameter names are being kwargs = self._input_kwargs self.setParams(**kwargs) @keyword_only entered 9 times! def setParams(self, inputCol=None, outputCol=None, m=1.0, c=0.0): kwargs = self._input_kwargs return self._set(**kwargs) • We propose adding a method to add all this boiler plate inputCol = Param(Params._dummy(), "inputCol", "the input column name (your X). (string)", typeConverter=TypeConverters.toString) def setInputCol(self, value): code in one function – specify param name, description, return self._set(inputCol=value) def getInputCol(self): return self.getOrDefault(self.inputCol) datatype, default value and required flag outputCol = Param(Params._dummy(), "outputCol", "the output column name (your Y). (string)", typeConverter=TypeConverters.toString) • Ideally explainParams() should also show the data def setOutputCol(self, value): return self._set(outputCol=value) def getOutputCol(self): types return self.getOrDefault(self.outputCol) m = Param(Params._dummy(), "m", "the slope of the line. (float)", typeConverter=TypeConverters.toFloat) def setM(self, value): Proposed code (~10 lines, clearer & easier to maintain) return self._set(m=value) from pyspark import keyword_only def getM(self): from pyspark.ml.param.shared import Param, Params, TypeConverters, addParam return self.getOrDefault(self.m) from pyspark.ml import Transformer c = Param(Params._dummy(), "c", "the y offset when x = 0. (float)", typeConverter=TypeConverters.toFloat) class StraightLine(Transformer): def setC(self, value): addParam("inputCol", "specify the input column name (your X).", String, None) return self._set(c=value) addParam("outputCol", "specify the output column name (your Y).", String, None) def getC(self): return self.getOrDefault(self.c) addParam("m", "specify m - the slope of the line.", Float, 1.0) addParam("c", "specify c - the y offset when x = 0.", Float, 0.0) def _transform(self, dataset): input_col = self.getInputCol() def _transform(self, dataset): if not input_col: return dataset.selectExpr("*", raise Exception("inputCol not supplied") str(self.getM()) + " * " + self.getInputCol() + " + " + str(self.getC()) + " AS " + self.getOutputCol()) output_col = self.getOutputCol() if not output_col: GE Aviationnot raise Exception("outputCol - porting analytics supplied") to PySpark ML Pipelines | 4 Oct 2018 12 return dataset.selectExpr("*", str(self.getM()) + " * " + input_col + " + " + str(self.getC()) + " AS " + output_col)

13. • We created code to plot any Spark ML pipeline using bokeh, showing Params on hover Display Pipeline • Available on GitHub at: https://github.com/GeneralElectric/SparkMLPipelineDisplay • The following example is based on the example pipeline in the spark documentation: https://spark.apache.org/docs/2.2.0/ml-pipeline.html Before Training After Training GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 13

14.We can now do the whole data science workflow in Spark in a notebook environment 1. Start from existing configuration file 3. Generate the ML Pipeline 4. Build the model(s) and display from config & display it 2. Explore the data - e.g. interactive bokeh plot 5. Calculate metrics – e.g. ROC curve GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 14

15.Conclusions • GE Aviation uses analytics across the entire commercial engine operational lifecycle • Over the past few decades, we have seen an explosion in data, the trend is set to continue and today we have discussed solutions to manage this • Overcome real-life challenges of implementing python custom ML Pipeline analytics • Provided feedback on ways to make adding custom python ML libraries easier • Developed ML Pipeline display utility shared with the community • Completed entire analytic development and deployment lifecycle in Spark • Still working to port all analytics to Spark • Production deployment environment not yet in Spark GE Aviation - porting analytics to PySpark ML Pipelines | 4 Oct 2018 15

16.Questions?