Redis + Structured Streaming—A Perfect Combination to Scale-Out Your Continuous

“Continuous applications” supported by Apache Spark’s Structured Streaming API enable real-time decision making in the areas such as IoT, AI, fraud mitigation, personalized experience, etc. All continuous applications have one thing in common: they collect data from various sources (devices in IoT, for example), process them in real-time (example: ETL), and deliver them to machine learning serving layer for decision making. Continuous applications face many challenges as they grow to production. Often, due to the rapid increase in the number devices or end-users or other data sources, the size of their data set grows exponentially. This results in a backlog of data to be processed. The data will no longer be processed in near-real-time. Redis, the open-source, in-memory database offers many options to handle this situation in a cost-effective manner. First and foremost, you could insert Redis into an existing continuous application without disrupting its architecture, and with minimal code changes. Redis, being in-memory, allows over a million writes per second with sub-millisecond latency. The Redis Stream data structure enables you to collect both binary and text data in the time series format. The consumer groups of Redis Stream help you match the data processing rate of your continuous application with the rate of data arrival from various sources. In this session, I will perform a live demonstration of how to integrate a continuous application using Apache Spark’s Structured Streaming API with open source Redis. I will also walk through the code, and run a live IoT continuous application.
展开查看详情

1.WIFI SSID:SparkAISummit | Password: UnifiedAnalytics

2.Redis + Structured Streaming: A Perfect Combination to Scale-out Your Continuous Applications Roshan Kumar, Redis Labs @roshankumar #UnifiedAnalytics #SparkAISummit

3.This Presentation is About…. How to collect and process data stream in real-time at scale IoT User Activity Messages

4.

5.http://bit.ly/spark-redis

6.Breaking up Our Solution into Functional Blocks 1. Data Ingest 2. Data Processing 3. Data Querying Click data Record all clicks Count clicks in real-time Query clicks by assets

7.The Actual Building Blocks of Our Solution 1. Data Ingest 2. Data Processing 3. Data Querying ClickAnalyzer Click data Redis Stream Structured Stream Processing Redis Hash Spark SQL

8.1. Data Ingest #UnifiedAnalytics #SparkAISummit 8

9.Data Ingest using Redis Streams 1. Data Ingest 2. Data Processing 3. Data Querying ClickAnalyzer Redis Stream Structured Stream Processing Redis Hash Spark SQL

10.What is Redis Streams?

11.Redis Streams in its Simplest Form

12.Redis Streams Connects Many Producers and Consumers

13.Comparing Redis Streams with Redis Pub/Sub, Lists, Sorted Sets Pub/Sub Lists Sorted Sets • Fire and forget • Tight coupling between • Data ordering isn’t built-in; • No persistence producers and producer controls the consumers order • No lookback queries • Persistence for • No maximum limit transient data only • The data structure is not • No lookback queries designed to handle data streams

14.What is Redis Streams? Pub/Sub Lists Sorted Sets It is like Pub/Sub, but It is like Lists, but decouples It is like Sorted Sets, with persistence producers and consumers but asynchronous + • Lifecycle management of streaming data • Built-in support for timeseries data • A rich choice of options to the consumers to read streaming and static data • Super fast lookback queries powered by radix trees • Automatic eviction of data based on the upper limit

15.Redis Streams Benefits It enables asynchronous data exchange between producers and consumers and historical range queries

16.Redis Streams Benefits With consumer groups, you can scale out and avoid backlogs

17.Redis Streams Benefits Simplify data collection, processing and distribution to support complex scenarios #UnifiedAnalytics #SparkAISummit 17

18.Data Ingest Solution Command xadd clickstream * img [image_id] 1. Data Ingest Sample data 127.0.0.1:6379> xrange clickstream - + 1) 1) "1553536458910-0" 2) 1) ”image_1" 2) "1" Redis Stream 2) 1) "1553536469080-0" 2) 1) ”image_3" 2) "1" 3) 1) "1553536489620-0" 2) 1) ”image_3" 2) "1” . . . .

19.2. Data Processing #UnifiedAnalytics #SparkAISummit 19

20.Data Processing using Spark’s Structured Streaming 1. Data Ingest 2. Data Processing 3. Data Querying ClickAnalyzer Redis Stream Structured Stream Processing Redis Hash Spark SQL

21.What is Structured Streaming?

22. Definition “Structured Streaming provides fast, scalable, fault- tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.”

23. How Structured Streaming Works? Source: Data Stream DataFrame Operations Selection: df.select(“xyz”).where(“a > 10”) Filtering: df.filter(_.a > 10).map(_.b) Aggregation: df.groupBy(”xyz").count() Windowing: df.groupBy( window($"timestamp", "10 minutes", "5 minutes"), $"word"” Micro-batches as ).count() DataFrames (tables) Deduplication: df.dropDuplicates("guid") Output Sink Spark Structured Streaming

24.Spark-Redis Library Redis Streams as data source Redis as data sink § Developed using Scala § Compatible with Spark 2.3 and higher ClickAnalyzer § Supports • RDD Redis Stream Structured Stream Processing Redis Hash • DataFrames • Structured Streaming

25.Redis Streams as Data Source 1. Connect to the Redis instance 2. Map Redis Stream to Structured Streaming schema 3. Create the query object 4. Run the query

26.Code Walkthrough: Redis Streams as Data Source 1. Connect to the Redis instance val spark = SparkSession.builder() .appName("redis-df") .master("local[*]") .config("spark.redis.host", "localhost") .config("spark.redis.port", "6379") .getOrCreate() val clickstream = spark.readStream .format("redis") .option("stream.keys","clickstream") .schema(StructType(Array( StructField("img", StringType) ))) .load() val queryByImg = clickstream.groupBy("img").count

27.Code Walkthrough: Redis Streams as Data Source 2. Map Redis Stream to Structured Streaming schema val spark = SparkSession.builder() .appName("redis-df") .master("local[*]") .config("spark.redis.host", "localhost") .config("spark.redis.port", "6379") .getOrCreate() xadd clickstream * img [image_id] val clickstream = spark.readStream .format("redis") .option("stream.keys","clickstream") .schema(StructType(Array( StructField("img", StringType) ))) .load() val queryByImg = clickstream.groupBy("img").count

28.Code Walkthrough: Redis Streams as Data Source 3. Create the query object val spark = SparkSession.builder() .appName("redis-df") .master("local[*]") .config("spark.redis.host", "localhost") .config("spark.redis.port", "6379") .getOrCreate() val clickstream = spark.readStream .format("redis") .option("stream.keys","clickstream") .schema(StructType(Array( StructField("img", StringType) ))) .load() val queryByImg = clickstream.groupBy("img").count

29.Code Walkthrough: Redis Streams as Data Source 4. Run the query val clickstream = spark.readStream .format("redis") .option("stream.keys","clickstream") .schema(StructType(Array( StructField("img", StringType) ))) .load() Custom output sink val queryByImg = clickstream.groupBy("img").count val clickWriter: ClickForeachWriter = new ClickForeachWriter("localhost","6379") val query = queryByImg.writeStream .outputMode("update") .foreach(clickWriter) .start() query.awaitTermination()