Testing Stateful Streaming Applications

当很多原先是批处理的程序,迁移到实时数据处理环境中的时候,继续维护之前的复杂业务逻辑变得很难。Flink提供了高级和低级两层API,用于写不同的有状态实时数据处理程序,然而当程序变得很复杂的时候,我们很难理解并调试,这里我们将聊聊如何测试流式数据处理程序的最佳实践。
展开查看详情

1.Testing Stateful Streaming Applications Seth Wiesman April 10, 2018

2.Basic Stateful Word Count val words: DataStream[String] = ??? “hello” → (“hello”, 1) words .keyBy(word ⇒ word) “hello” → (“hello”, 2) .map(new CountingMap) “world” → (“world”, 1) “hello” → (“hello”, 3) ©2018 MediaMath Inc. 2

3.vs Stateless Jobs • Stateless jobs are easy to reason about • The same input always results in the same output • Referentially Transparent • That doesn’t mean no state at all! • Stateless:: (IN, STATE) → (OUT, STATE) ©2018 MediaMath Inc. 3

4.Operator State ©2018 MediaMath Inc. 4

5.import java.util.{List ⇒ JList} class ResourceToStream(path: String) extends RichSourceFunction[String] with ListCheckpointed[Integer] { var numLines: Int = 0 override def cancel(): Unit = ??? override def run(ctx: SourceFunction.SourceContext[String]): Unit = ??? override def restoreState(state: JList[Integer]): Unit = ??? override def snapshotState(checkpointId: Long, timestamp: Long): JList[Integer] = ??? } ©2018 MediaMath Inc. 5

6.import java.util.{List ⇒ JList} class ResourceToStream(path: String) extends RichSourceFunction[String] with ListCheckpointed[Integer] { var numLines: Int = 0 override def cancel(): Unit = ??? Business Logic override def run(ctx: SourceFunction.SourceContext[String]): Unit = ??? override def restoreState(state: JList[Integer]): Unit = ??? State Management override def snapshotState(checkpointId: Long, timestamp: Long): JList[Integer] = ??? } ©2018 MediaMath Inc. 6

7.Validate Business Logic it should "process a resource file" in { val runner = new ResourceToStream("/testfile.txt") val ctx = new MockSourceContext runner.run(ctx) ctx.output.size should be (4) ctx.output should be (ListBuffer("hello", "world", "good", "day")) } ©2018 MediaMath Inc. 7

8.Consider State Management it should "restore from state" in { val runner = new ResourceToStream("/testfile.txt") val ctx = new MockSourceContext runner.restoreState(Arrays.asList(2)) runner.run(ctx) ctx.output.size should be (2) ctx.output should be (ListBuffer("good", "day")) } ©2018 MediaMath Inc. 8

9.Broadcast State val mainStream: DataStream[String] = ??? val filterStream: DataStream[String] = env .addSource(new ResourceToStream(“”)) mainStream .connect(filterStream.broadcast) .flatMap(new BroadcastFilter) ©2018 MediaMath Inc. 9

10.class BroadcastFilter extends RichCoFlatMapFunction[String, String, String] with ??? { val filters = mutable.Set.empty[String] override def flatMap1(value: String, out: Collector[String]): Unit = { if (!filters.contains(value)) { out.collect(value) } } override def flatMap2(value: String, out: Collector[String]): Unit = { filters += value } ©2018 MediaMath Inc. 10

11.CheckpointedFunction override def initializeState(context: FunctionInitializationContext): Unit = { state = context.getOperatorStateStore.getUnionListState( new ListStateDescriptor[String]("filters", BasicTypeInfo.STRING_TYPE_INFO) ) ??? } ©2018 MediaMath Inc. 11

12.CheckpointedFunction override def snapshotState(context: FunctionSnapshotContext): Unit = { state.clear() if (getRuntimeContext.getIndexOfThisSubtask == 0) { for (filter ← filters) { state.add(filter) } } } ©2018 MediaMath Inc. 12

13.CheckpointedFunction it should "filter out blacklisted words" in { val filter = new BroadcastFilter val out = new MockCollector • Testing business logic is still straightforward filter.flatMap2("hello", out) out.output.size should be (0) • Testing checkpoint methods is less clear filter.flatMap1("hello", out) • Mocking out operator state is not fun out.output.size should be (0) filter.flatMap1("world", out) out.output.size should be (1) } ©2018 MediaMath Inc. 13

14.OperatorTestHarness • Used as part of Flink’s internal test suite • ⚠️ not part of the public api ⚠️ • Not recommended as a first option • Try and get by with integration tests first • Be careful to test your code, not Flink’s! • Used sparingly, can be very powerful ©2018 MediaMath Inc. 14

15.it should "restore from state" in { val initialFilter = new BroadcastFilter val initialOperator = new CoStreamFlatMap(initialFilter) val initialTestHarness = new TwoInputStreamOperatorTestHarness(initialOperator) initialTestHarness.initializeState(new OperatorStateHandles(0, null, null, null, null)) initialTestHarness.open() initialTestHarness.processElement2(new StreamRecord[String]("hello")) val snapshot = initialTestHarness.snapshot(0L, 0L) initialTestHarness.close() ©2018 MediaMath Inc. 15

16. val restoreFilter = new BroadcastFilter val restoreOperator = new CoStreamFlatMap(restoreFilter) val restoreTestHarness = new TwoInputStreamOperatorTestHarness(restoreOperator) restoreTestHarness.initializeState(snapshot) restoreTestHarness.setup() restoreTestHarness.open() restoreTestHarness.processElement1(new StreamRecord[String]("hello")) restoreTestHarness.getOutput.size() should be (0) restoreTestHarness.close() } ©2018 MediaMath Inc. 16

17.Keyed State ©2018 MediaMath Inc. 17

18.Keyed State • Many keyed state operations can be handled “statelessly” words .keyBy(word ⇒ word) .countWindow(3) .reduce((word, _) ⇒ word) • Not always an option ©2018 MediaMath Inc. 18

19.class EveryNthWord(n: Int) extends RichFlatMapFunction[String, String] { @transient private lazy val countDesc = new ValueStateDescriptor("count", classOf[Int]) override def flatMap(value: String, out: Collector[String]): Unit = { val countState = getRuntimeContext.getState(countDesc) val count = Option(countState.value()).map(_.intValue()).getOrElse(1) if (count == n) { countState.clear() out.collect(value) } else { countState.update(count + 1) } } } ©2018 MediaMath Inc. 19

20.FlatMapWithState object StatelessEveryNthWord { type StatefulFunction = (String, Option[Int]) ⇒ (TraversableOnce[String], Option[Int]) } ©2018 MediaMath Inc. 20

21.FlatMapWithState class StatelessEveryNthWord(n: Int) extends StatelessEveryNthWord.StatefulFunction { override def apply(word: String, state: Option[Int]): (TraversableOnce[String], Option[Int])= { state match { case Some(count) if count + 1 == n ⇒ (Some(word), None) case Some(count) ⇒ (None, Some(count + 1)) case None ⇒ (None, Some(1)) } } } ©2018 MediaMath Inc. 21

22.FlatMapWithState Test it should "initialize state the first time it sees a word" in { val function = new StatelessEveryNthWord(3) val (output, state) = function("hello", None) output should be (None) state should be (Some(1)) } it should "modify state in the middle of a run" in { val function = new StatelessEveryNthWord(3) val (output, state) = function("hello", Some(1)) output should be (None) state should be (Some(2)) } ©2018 MediaMath Inc. 22

23.Possible FlatMapWithState Implemention override def flatMap(value: String, out: Collector[String]): Unit = { initialize val state = Option(getRuntimeContext.getState(countDesc).value()) context f(value, state) match { case (output, None) ⇒ output.foreach(out.collect) getRuntimeContext.getState(countDesc).clear() react to results case (output, Some(x)) ⇒ output.foreach(out.collect) getRuntimeContext.getState(countDesc).update(x) } ©2018 MediaMath Inc. 23

24.A more complex example • Group all elements of the same key that occur within 30 seconds of each other • similar to a session window • Output the third instance of every word immediately • When more than 30 seconds have passed since the last word: • clear state • If more than 3 instances of a word have been observed, output to heavy hitter side output • If less than 3 instances of a word have been observed, output to infrequent side output ©2018 MediaMath Inc. 24

25.Specialized Context object Algebra { final case class Context(count: Long, watermark: Long) final case class Result(word: Option[String], timer: Long) } trait Algebra { def evaluateElem(word: String, ctx: Algebra.Context): Algebra.Result def evaluateTimer(word: String, count: Long): Option[Either[String, String]] } ©2018 MediaMath Inc. 25

26.Stateless Business Logic class StatelessAlgebra extends Algebra { final val private[this] thirty_seconds: Long = 30 * 1000L override def evaluateElem(word: String, ctx: Algebra.Context): Algebra.Result = { val output = if (ctx.count == 3) { Some(word) } else { None } val timer = ctx.watermark + thirty_seconds Algebra.Result(output, timer) } ©2018 MediaMath Inc. 26

27.Stateless Business Logic override def evaluateTimer(word: String, count: Long): Option[Either[String, String]] = { if (count < 3) { Some(Left(word)) } else if (count > 3) { Some(Right(word)) } else { None } } } ©2018 MediaMath Inc. 27

28. Program Runner getRuntimeContext.getState(wordDesc).update(value) getRuntimeContext.getReducingState(countDesc).add(1L) initialize context val count = getRuntimeContext.getReducingState(countDesc).get() val context = Algebra.Context(count, ctx.timerService().currentWatermark()) val Algebra.Result(output, time) = program.evaluateElem(value, context) output.foreach(out.collect) getRuntimeContext.getState(timerDesc).update(time) react to results ctx.timerService().registerEventTimeTimer(time) ©2018 MediaMath Inc. 28

29.Testing our Implementation • Business Logic • Business logic can be tested in isolation of Flink • Simple to simulate various scenarios • Runner • Does not require knowledge of business domain to validate • Isolates the more complex parts of writing streaming applications ©2018 MediaMath Inc. 29