Flink real-time analysis in CloudStream Service of Huawei Cloud

华为云中CloudStream是一个完整的管理服务模块,它支持如下几个特性:按需服务记账,简易的StreamSQL,实时StreamSQL测试,多租户,安全隔离等等。我们在CloudStream底层使用了Apache Flink作为流式数据处理引擎,让Flink的作业可以随意运行在YARN/Mesos/Kubernetes之上,我们还扩展了Flink的部分功能,使其可以满足物联网方面的应用场景需求。本PPT将介绍CloudStream的基本功能和架构。

1. Huawei Cloud Flink real-time analysis Jinkui Shi Radu Tudoran in Cloud Stream Service 2018/04

2. Speakers Jinkui Shi Radu Tudoran Principal Engineer @ Huawei Staff Engineer @ Huawei Cloud Cloud shijinkui@huawei.com Radu.Tudoran@huawei.com

3. Background about Huawei Cloud ❖ Cloud BU ❖ Foundation at 2017/06 ❖ Huawei Cloud ❖ HUAWEI CLOUD services-let enterprises use ICT services in the same way as using water and electric utilities.

4. Why choose Flink ❖ Graceful Runtime framework ❖ Rich Stream SQL function ❖ lightweight async checkpoint ❖ Real low latency and hight throughput ❖ expansibility: ML, Graph, Edge

5. Cloud Stream Service ❖ Cloud Stream Service (CS) : Real-time big data stream analysis service on Huawei Cloud. Compatible with Apache Flink and Spark APIs, CS also fully managed computing clusters. Users just focus on StreamSQL or UDF and run jobs in real time. ❖ CS is the first public cloud native service that choose Flink as its Runtime computing engine in the world. https://www.huaweicloud.com/en-us/product/cs.html

6.CS Overview - Industrial IoT - Car Internet - exchange(BitCoin/Stock) - Bank/insurance industry - Electronic Commerce … Make the computing easier - Union batch and stream - SQL and Job visualization - Streaming monitoring Connect everything - Open Source source/sink - Cloud Service source/sink

7. Features easy-to-use, serverless, fully-managed, safe, High cost performance

8. Cost Comparison (Reference) Item Offline Environment Buildup CS Saved Cost 80,000 x 3 = 105,000 CNY 0.5 x 20 x 24 x 30 x 12 x 3 = The hardware cost of a single 259,000 CNY Hardware cost physical machine is 80 Users are charged 0.5 CNY thousand CNY. The cost is for per hour for a single SPU. 20 reference only. SPUs are purchased. O&M manpower cost 200,000 CNY/man-year 0 Water/Electricity/DC maintenance 76300 CNY/year 0 Total 516,300 CNY 259,000 CNY 42.9% To achieve the same computing capability CS saves:42.9% costs

9. Job types ❖ Flink SQL: First-class citizen for easy-use ❖ Flink Jar job: FlinkML, Gelly, CEP, SQL ❖ Spark Streaming and structured streaming Jar job ❖ PySpark Jar Job ❖ Edge Computing Job: beta now

10. Connect to Ecosystem ❖ Open Source Connectors(Flink connector and Bahir Flink) ❖ Connect to cloud native service in Huawei Cloud Problem of Connection API adapter: 1. define unified connector API between Flink and Spark such as Kafka, JDBC connector.. 2. define cloud service general connector API such as object bucket storage.. Apache Bahir need more contributions.

11.Online Stream SQL editor SPU: Stream Processing Units, 1 core and 4G memory https://console.huaweicloud.com/cs

12. Visualization[vɪʒʊəlɪ'zeʃən] ❖ runtime monitoring ❖ for dev: editor, notebook ❖ for prod: pipeline, DSL

13. Flink Benchmark - chicken ribs ❖ Standard benchmark problem: ❖ just focus on performance and supposed use case ❖ can’t cover all the API and feature ❖ performance only show your best, no worst case ❖ Enterprise care more reliability and best practice

14. Flink Reliability benchmark AutoRun a large-scale test to find Flink that may encounter runtime memory overflow, calculation result error, run-time reliability problems, and collect metrics of anti-pressure, latency, throughput, memory, CPU, rate to analyze the reasons for the reliability problem. ❖ Test metric dimensionality for every API: ❖ overall source generating rate: ❖ fixed rate, rapid rate, index rate ❖ data skew and backpresure ❖ Job.ratio= max{Vertex.ratio | Vertex∈Job}; ❖ Vertex.ratio = max{SubTask.ratio | SubTask∈Vertex} ❖ latency ❖ job latency: source generate rate and job processing rate ❖ event latency: the time cost between source and sink ❖ throughput and GC …

15. Flink ReliabilityBench project ❖ The generated report include all API ❖ In next half year, we’ll publish Flink reliability bench and standard benchmark to Cloud Stream Service ❖ User just set the needed resource, then auto run the bench, generate a final report for tuning and best practice guide Welcome everyone and Flink community to try it then

16. Some problem ❖ In SQL, how expression JSON and OpenTSDB, and other data format? ❖ SQL with phrase: ❖ how make a general and extensible rule to support all connector? ❖ how support general and extensible cloud standard, like object bucket storage.. ❖ API server? ❖ manage job lifetime and metric ❖ For job, input the source data, …, output sink data with Streaming API ❖ sink reliability support for external Write ahead log framework: ❖ source1 - processing – sink1 – source2 - processing - sink – source2 - … maybe lost data

17. Intelligent Streaming Computing ❖ Open Source framework ❖ Streaming+ML: Spark MLlib, pySpark, Flink ML ❖ Streaming+Graph: Spark GraphX, Flink Gelly ❖ SQL: bonding the above by UDF Stream Analysis is not enough, Intelligent framework is need. If we make less efforts, maybe surpassed by others quickly. Keep hunger

18. Scenario 1: streaming trading analysis BitCoin trading pain spots 1. Disorder stream data for K line charts of 5min, 15min, 30min, 60min 2. Aggregate streaming data at window 3. Low latency Just a example diagram for showing. From sohu site. DCS(Redis) Huawei Cloud DIS Cloud Stream Cloud Table solution HBas Kafka Flink Spark OpenTSDB e

19. Scenario 3: Stream Analysis and ETL CS uses jobs of the Flink SQL, Flink, and Spark Streaming types to conduct exception detection, real- time alarm reporting, and CEP- based processing on stream data. Feedback/decision- making/monitoring: Based on the positive feedback during service running and monitoring information, CS provides guidance for positive product optimization, loss stop, quantization, and visualization.

20. Enhanced Statistics and ML Features Extraction Design Principles • Incremental computation • Fixed size memory • Constant to sub- linear time complexity

21. Enhanced Statistics and ML Features Extraction Online Linear Regression Learner Trhoughput (ev) In general: 2 𝑆2 = 𝑦 − 𝑓 𝑥𝑖 , 𝛽1 , 𝛽2, 𝛽3 , … , 𝛽𝑛 For the linear fit: Throughput analysis 𝑆2 = 𝑦𝑖 − 𝑓 𝛽1 + 𝛽2 𝑥𝑖 2 Incremental mean 1 𝑥 = 𝑥𝑡−1 + (𝑥 − 𝑥𝑡−1 ) 𝑡 𝑡 Execution time (s) Incremental variance (2nd central moment) 𝑚2,𝑡 = 𝑚2,𝑡−1 + (𝑥𝑡 − 𝑥𝑡−1 )(𝑥𝑡 − 𝑥𝑡 ) Latency analysis Incremental covariance Regression parameters 𝑡−2 1 𝑠𝑥𝑦,𝑡 𝑠𝑥𝑦,𝑡 = 𝑠𝑥𝑦,𝑡−1 + 𝑥 − 𝑥𝑡−1 𝑦𝑡 − 𝑦𝑡−1 𝛽2 = 2 𝛽1 = 𝑦 − 𝛽2 𝑥 𝑡−1 𝑡 𝑡 𝑚2,𝑡 Events Time range (ms)

22. GeoSepatial •Huawei offers complete coverage of geospatial standard plus extra time- based functions • DDL for Time Geospatial • ST_Point Realtime IoT Analytics • ST_Line Continuous data • ST_Polygon Stream Topology User GeoSpatial GeoSpatial Define function function • SQL Geospatial Scalar Functions Function Geometry Engine Geometry Engine • ST_CONTAINS • ST_DISTANCE • ST_COVERS • ST_PERIMETER Flink IoT Stream Engine • ST_DISJOINT • ST_AREA (polygon) • ST_BUFFER • ST_OVERLAPS • ST_INTERSECTION • ST_INTERSECTS Deploy Execute • ST_ENVELOPE • ST_WITHIN Submit Translation SQL IoT Functions Stream IoT Operators •ST_DISTANCE • ST_CONTAINS • ST_COVERS •Window Tumble Count/ Optimizatio • ST_PERIMETER • ST_DISJOINT Time • ST_AREA n (polygon) • ST_BUFFER •Window Hop Count/ SQL IoT • ST_OVERLAPS •ST_INTERSECTION Time • SQL Time Geospatial Fun. IoT Op. Library • ST_INTERSECTS • ST_WITHIN •… •ST_ENVELOPE •… •Window Session Count/ Time •Process Function • AGG_DISTANCE Stream SQL IoT •Map •FlatMap • AVG_SPEED Stream SQL Time • … on HOP/TUMBLE/OVER/SESSION windows GeoSpatial Analytics • …on count/time windows • ….on rowtime/proctime windows

23. GeoSepatial Examples Select if cars deviate from road SELECT carId FROM CarStream WHERE ST_WITHIN( + ST_POINT( car.lat, car.lon), ST_BUFFER( ST_ROAD_FROM_FILE(file), 2.0)) Filter by region SELECT timestampr, lat, lon, speed FROM CarStream WHERE ST_WITHIN( ST_POINT(lat, lon), ST_POLYGON( ARRAY[ ST_POINT(53.454326,7.334517), ST_POINT(53.682480, 13.906822), ST_POINT(47.761194, 12.607594), ST_POINT(47.722358, 7.601213), ST_POINT(53.454326,7.334517)])) Compute Time Aggregates over Spatial Data SELECT timestampa, lat, lon, AGG_DISTANCE( ST_POINT(lat, lon)) OVER ( PARTITION BY carid ORDER BY proctime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW), AVG_SPEED( ST_POINT(lat, lon)) OVER ( PARTITION BY carid ORDER BY proctime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM CarStream

24. Flink CEP on SQL enhance SQL CEP Syntax SELECT * FROM stream... MATCH_RECOGNIZE ( [row_pattern_partition_by ] [row_pattern_order_by ] [row_pattern_measures ] [row_pattern_rows_per_match ] Define pattern matching computation [row_pattern_skip_to ] PATTERN (row_pattern) [with_in clause] Offer complete syntax [duration clause] [row_pattern_subset_clause] coverage for real time DEFINE row_pattern_definition_list ) CEP analytics SELECT * FROM Ticker MATCH_RECOGNIZE ( PARTITION BY symbol MEASURES FINAL FIRST(A.price) AS firstAPrice, FIINAL FIRST(B.price) AS firstBPrice, FINAL FIRST(C.price) AS firstCPrice, FINAL LAST(A.price) AS lastAPrice, FINAL LAST(B.price) AS lastBPrice, FINAL LAST(C.price) AS lastCPrice ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN ((A B C){2}) DEFINE A AS A.price < 50, B AS B.price < 30, C AS C.price < 70 ) # Events: ~2.5M # Matched events: ~ 100K # Stocks: 7 Average latency: ~ 27.13 ms