溪流中的漩涡

在之前,我们使用Spark来检测欺诈。最近,我们已经开始使用机器学习模型来实现实时欺诈检测。Capital One的欺诈检测微服务之一是结构化流的早期采用者。作为这一实施的一部分,微服务遇到了几个障碍。在这次谈话中,我们描述了这些障碍以及我们如何绕过它们。
展开查看详情

1.Whirlpools in the Stream Misadventures in Structured Streaming Jayesh Lalwani, Lead Software Engineer, Quantum, Capital One

2.What is Structured Streaming

3.Dunderheaded Data stores!

4.Look Ma! No lookups! Process B Events Process A State • Spark allows you to join Streaming data with Batch Data • BUT…. • Spark doesn’t support pushdown of join predicates. This means that the State is loaded into Spark memory for every microbatch • Caching a Batch dataframe “freezes” it

5.Solutions • Cache dataframe & Restart the Streaming application • Cache dataframe & Restart the Streaming query • Cache the data outside of Spark • Lookup batch data inside a map/flatmap operation • Stream updates to Process B and do stream to stream join (2.3+ only) – works for limited use cases

6.Data store? We don’t need no stinkin’ data stores • Outputs supported Batch • Outputs supported Mode Streaming Mode • JSON • JSON • CSV • CSV • Parquet • Parquet • Orc • Text • Text • JDBC WHERE IS JDBC?

7.My implementation of JDBC sink • Features: • Can write streaming data to JDBC data stores • Uses the same code that is used by Batch JDBC sinks • Modes: Overwrite, Append • Supports Atleast Once out of the box • Supports Exactly Once with some configuration • Fork: https://github.com/GaalDornick/spark • Implementation: https://github.com/GaalDornick/spark/blob/master/sql/core/src/main/scal a/org/apache/spark/sql/execution/streaming/JdbcSink.scala • Unit test: https://github.com/GaalDornick/spark/blob/master/sql/core/src/test/scala /org/apache/spark/sql/execution/streaming/JDBCSinkSuite.scala

8.Clumsy footed Checkpoints

9.Missing the point with Checkpoints • Checkpoint stores state • Allows Streaming application to restart on failure • Checkpoint should be stored on a location that is • Resilient to failure • Shared between executors and drivers (read-write many) • Immediately consistent • S3 IS NOT IMMEDIATELY CONSISTENT

10.Solution • NFS • We use EFS on AWS • On Kubernetes, you need a PV that supports RWX access mode • NFS • Ceph • GlusterFS

11.Checkpoints had a great fall • https://issues.apache.org/jira/browse/SPARK-21696 • Leads to corrupt checkpoints. • Solved by deleting the checkpoint once a day • This means we lose data • Fixed in 2.2.1

12.Anthropophagic Aggregations!

13.Aggregations? What Aggregations? • Structured streaming allows you to aggregate a dataframe • But you cannot aggregate an aggregated data frame • PARTITION BY clause is supported only for time windows Solutions • If possible, aggregate batch data before joining with stream • Implement aggregations without Spark SQL using groupBy..flatMapGroups

14. Jayesh Lalwani Lead Software Engineer Team Heartbeat Jayesh.Lalwani@capitalone.com