Why And How FlinkSQL

From Flink PMC
展开查看详情

1.WHY AND HOW TO LEVERAGE THE POWER AND SIMPLICITY OF SQL ON APACHE FLINK® - FABIAN HUESKE, SOFTWARE ENGINEER

2.ABOUT ME • Apache Flink PMC member & ASF member ‒ Contributing since day 1 at TU Berlin ‒ Focusing on Flink’s relational APIs since ~2 years • Co-author of “Stream Processing with Apache Flink” ‒ Work in progress… • Co-founder & Software Engineer at data Artisans 2 © 2018 data Artisans

3.ABOUT DATA ARTISANS Original creators of Open Source Apache Flink Apache Flink® + dA Application Manager 3 © 2018 data Artisans

4.DA PLATFORM data-artisans.com/download 4 © 2018 data Artisans

5.WHAT IS APACHE FLINK? Data Stream Processing Batch Processing Event-driven realtime results from data streams Applications process static and historic data data-driven actions and services Stateful Computations Over Data Streams 5 © 2018 data Artisans

6.WHAT IS APACHE FLINK? Stateful computations over streams real-time and historic fast, scalable, fault tolerant, in-memory, event time, large state, exactly-once Queries Application Applications Streams Database Devices Stream Historic etc. Data File / Object Storage 6 © 2018 data Artisans

7.POWERED BY APACHE FLINK 8 © 2018 data Artisans

8.FLINK’S POWERFUL ABSTRACTIONS Layered abstractions to navigate simple to complex use cases High-level Analytics API SQL / Table API (dynamic tables) val stats = stream Stream- & Batch DataStream API (streams, windows) .keyBy("sensor") .timeWindow(Time.seconds(5)) Data Processing .sum((a, b) -> a.add(b)) Stateful Event- Process Function (events, state, time) Driven Applications def processElement(event: MyEvent, ctx: Context, out: Collector[Result]) = { // work with event and state (event, state.value) match { … } out.collect(…) // emit events state.update(…) // modify state // schedule a timer callback 9 © 2018 data Artisans ctx.timerService.registerEventTimeTimer(event.timestamp + 500) }

9.APACHE FLINK’S RELATIONAL APIS ANSI SQL LINQ-style Table API tableEnvironment SELECT user, COUNT(url) AS cnt .scan("clicks") FROM clicks .groupBy('user) GROUP BY user .select('user, 'url.count as 'cnt) Unified APIs for batch & streaming data A query specifies exactly the same result regardless whether its input is static batch data or streaming data. 10 © 2018 data Artisans

10. QUERY TRANSLATION tableEnvironment .scan("clicks") SELECT user, COUNT(url) AS cnt .groupBy('user) FROM clicks .select('user, 'url.count as 'cnt) GROUP BY user Input data is Input data is bounded unbounded (batch) (streaming) 11 © 2018 data Artisans

11.WHAT IF “CLICKS” IS A FILE? Input data is Result is produced Clicks read at once at once user cTime url user cnt Mary 12:00:00 https://… SELECT Mary 2 Bob 12:00:00 https://… user, COUNT(url) as cnt Bob 1 Mary 12:00:02 https://… FROM clicks GROUP BY user Liz 1 Liz 12:00:03 https://… 12 © 2018 data Artisans

12.WHAT IF “CLICKS” IS A STREAM? Input data is Result is Clicks continuously read continuously updated user cTime url user cnt Mary 12:00:00 https://… SELECT Mary 2 1 Bob 12:00:00 https://… user, COUNT(url) as cnt Bob 1 Mary 12:00:02 https://… FROM clicks GROUP BY user Liz 1 Liz 12:00:03 https://… 13 © 2018 data Artisans The result is the same!

13.WHY IS STREAM-BATCH UNIFICATION IMPORTANT? • Usability ‒ ANSI SQL syntax: No custom “StreamSQL” syntax. ‒ ANSI SQL semantics: No stream-specific results. • Portability ‒ Run the same query on bounded and unbounded data ‒ Run the same query on recorded and real-time data bounded query bounded query start of the stream past now future unbounded query unbounded query • How can we achieve SQL semantics on streams? 14 © 2018 data Artisans

14.DATABASE SYSTEMS RUN QUERIES ON STREAMS • Materialized views (MV) are similar to regular views, but persisted to disk or memory ‒ Used to speed-up analytical queries ‒ MVs need to be updated when the base tables change • MV maintenance is very similar to SQL on streams ‒ Base table updates are a stream of DML statements ‒ MV definition query is evaluated on that stream ‒ MV is query result and continuously updated 15 © 2018 data Artisans

15.CONTINUOUS QUERIES IN FLINK • Core concept is a “Dynamic Table” ‒ Dynamic tables are changing over time • Queries on dynamic tables ‒ produce new dynamic tables (which are updated based on input) ‒ do not terminate • Stream ↔ Dynamic table conversions 16 © 2018 data Artisans 16

16.STREAM ↔ DYNAMIC TABLE CONVERSIONS • Append Conversions ‒Records are only inserted/appended • Upsert Conversions ‒Records are inserted/updated/deleted ‒Records have a (composite) unique key • Changelog Conversions ‒Records are inserted/updated/deleted 17 © 2018 data Artisans

17.SQL FEATURE SET IN FLINK 1.5.0 • SELECT FROM WHERE • GROUP BY / HAVING ‒ Non-windowed, TUMBLE, HOP, SESSION windows • JOIN ‒ Windowed INNER, LEFT / RIGHT / FULL OUTER JOIN ‒ Non-windowed INNER JOIN • Scalar, aggregation, table-valued UDFs • SQL CLI Client (beta) • [streaming only] OVER / WINDOW ‒ UNBOUNDED / BOUNDED PRECEDING • [batch only] UNION / INTERSECT / EXCEPT / IN / ORDER BY 18 © 2018 data Artisans

18.WHAT CAN I BUILD WITH THIS? • Data Pipelines ‒ Transform, aggregate, and move events in real-time • Low-latency ETL ‒ Convert and write streams to file systems, DBMS, K-V stores, indexes, … ‒ Ingest appearing files to produce streams • Stream & Batch Analytics ‒ Run analytical queries over bounded and unbounded data ‒ Query and compare historic and real-time data • Power Live Dashboards ‒ Compute and update data to visualize in real-time 19 © 2018 data Artisans

19.THE NEW YORK TAXI RIDES DATA SET • The New York City Taxi & Limousine Commission provides a public data set about past taxi rides in New York City • We can derive a streaming table from the data • Table: TaxiRides rideId: BIGINT // ID of the taxi ride isStart: BOOLEAN // flag for pick-up (true) or drop-off (false) event lon: DOUBLE // longitude of pick-up or drop-off location lat: DOUBLE // latitude of pick-up or drop-off location rowtime: TIMESTAMP // time of pick-up or drop-off event 20 © 2018 data Artisans

20.IDENTIFY POPULAR PICK-UP / DROP-OFF LOCATIONS ▪ Compute every 5 minutes for each location the number of departing and arriving taxis of the last 15 minutes. SELECT cell, isStart, HOP_END(rowtime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS hopEnd, COUNT(*) AS cnt FROM (SELECT rowtime, isStart, toCellId(lon, lat) AS cell FROM TaxiRides) GROUP BY cell, isStart, HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) 21 © 2018 data Artisans

21.AVERAGE RIDE DURATION PER PICK-UP LOCATION ▪ Join start ride and end ride events on rideId and compute average ride duration per pick-up location. SELECT pickUpCell, AVG(TIMESTAMPDIFF(MINUTE, e.rowtime, s.rowtime) AS avgDuration FROM (SELECT rideId, rowtime, toCellId(lon, lat) AS pickUpCell FROM TaxiRides WHERE isStart) s JOIN (SELECT rideId, rowtime FROM TaxiRides WHERE NOT isStart) e ON s.rideId = e.rideId AND e.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR GROUP BY pickUpCell 22 © 2018 data Artisans

22. BUILDING A DASHBOARD SELECT cell, isStart, HOP_END(rowtime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS hopEnd, COUNT(*) AS cnt FROM (SELECT rowtime, isStart, toCellId(lon, lat) AS cell FROM TaxiRides) GROUP BY cell, isStart, HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) Elastic Search Kafka 23 © 2018 data Artisans

23. SOUNDS GREAT! HOW CAN I USE IT? • ATM, SQL queries must be embedded in Java/Scala code  ‒ Tight integration with DataStream and DataSet APIs • Community focused on internals (until Flink 1.4.0) ‒ Operators, types, built-in functions, extensibility (UDFs, extern. catalog) ‒ Proven at scale by Alibaba, Huawei, and Uber ‒ All built their own submission system & connectors library • Community neglected user interfaces ‒ No query submission client, no CLI ‒ No integration with common catalog services ‒ Limited set of TableSources and TableSinks 24 © 2018 data Artisans

24.COMING IN FLINK 1.5.0 - SQL CLI Demo Time! That’s a nice toy, but … ... can I use it for anything serious? 25 © 2018 data Artisans

25.FLIP-24 – A SQL QUERY SERVICE • REST service to submit & manage SQL queries ‒ SELECT … ‒ INSERT INTO SELECT … ‒ CREATE MATERIALIZE VIEW … • Serve results of “SELECT …” queries • Provide a table catalog (integrated with external catalogs) • Use cases ‒ Data exploration with notebooks like Apache Zeppelin ‒ Access to real-time data from applications ‒ Easy data routing / ETL from management consoles 26 © 2018 data Artisans

26.CHALLENGE: SERVE DYNAMIC TABLES Unbounded input yields unbounded results (Serving bounded results is easy) SELECT user, url SELECT user, COUNT(url) AS cnt FROM clicks FROM clicks WHERE url LIKE '%xyz.com' GROUP BY user Append-only Table Continuously updating Table • Result rows are never changed • Result rows can be updated or • Consume, buffer, or drop rows deleted • Consume changelog or periodically query result table • Result table must be maintained 27 © 2018 data Artisans somewhere

27.FLIP-24 – A SQL QUERY SERVICE Event Log External Catalog (Schema Registry, SELECT HCatalog, …) user, Database / COUNT(url) AS cnt HDFS FROM clicks Application GROUP BY user Catalog Submit Query Submit Query Job Optimizer Query REST REST Results Result Server State Query Service Event Log Results are served by Query Service via REST + Application does not need a special client + Works well in many network configurations Database / − Query service can become bottleneck HDFS 28 © 2018 data Artisans

28.FLIP-24 – A SQL QUERY SERVICE Event Log External Catalog (Schema Registry, SELECT HCatalog, …) user, Database / COUNT(url) AS cnt HDFS FROM clicks Application GROUP BY user Catalog Submit Query Submit Query Job REST Optimizer Query REST Result Handle Result Server State Serving Library Query Service Event Log Database / HDFS 29 © 2018 data Artisans

29.WE WANT YOUR FEEDBACK! • The design of SQL Query Service is not final yet. • Check out FLIP-24 and FLINK-7594 • Share your ideas and feedback and discuss on JIRA or dev@flink.apache.org. 30 © 2018 data Artisans