DataStream API: Advanced Windowing

Slide window的slice切分,相比原来的bucketing window的划分方式性能更高
展开查看详情

1. DataStream API Advanced Windowing Apache Flink® Training Flink v1.3 – 9.9.2017 1

2.The lower-level details Windows 2

3.Specifying Windowing stream .keyBy() / keyed vs non-keyed windows .window() / “Assigner” .trigger() / each Assigner has a default Trigger .evictor() / default: no Evictor .allowedLateness() / default: zero .reduce|apply|process() 3

4.Triggers  Determines when a window is ready to have its process or window function called, e.g., • by counting • by comparing timestamps to the current watermark  Each WindowAssigner comes with a default Trigger 4

5.Trigger interface  onElement()  onEventTime()  onProcessingTime()  onMerge() • merges the states of two triggers when their windows merge  clear() • use this to clear any managed state, etc 5

6.Trigger interface  onElement()  onEventTime() return a TriggerResult  onProcessingTime()  onMerge() • merges the states of two triggers when their windows merge  clear() • use this to clear any managed state 6

7.TriggerResult  CONTINUE  FIRE  PURGE  FIRE_AND_PURGE  Purging only removes the window’s contents 7

8.public class CountTrigger<W extends Window> extends Trigger<Object, W> { private final long maxCount; private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); private CountTrigger(long maxCount) { this.maxCount = maxCount; } ... } 8

9.@Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState<Long> count = ctx.getPartitionedState(stateDesc); count.add(1L); if (count.get() >= maxCount) { count.clear(); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } 9

10.@Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); } @Override public boolean canMerge() { return true; } @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); } 10

11.Evictors  Control which elements are passed to the window function (by removing elements from a window)  Always called after the trigger fires, but before and/or after the window function • evictBefore() • evictAfter() 11

12.Late elements  When allowedLateness > 0  Late elements can trigger late firings  With merging windows, late firings can lead to late merging 12

13.ProcessWindowFunction  New and improved version of WindowFunction that has more information about the window- firing context  Has API for accessing both per-window state and global state (both per key) 1 3

14. ProcessWindowFunction (API) abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction { /** Called for each window firing. */ public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out); /** Called when a window is being purged. */ public void clear(Context context) throws Exception {} } 1 4

15.Context (API) public abstract class Context implements java.io.Serializable { public abstract W window(); public abstract long currentProcessingTime(); public abstract long currentWatermark(); public abstract KeyedStateStore windowState(); public abstract KeyedStateStore globalState(); } 1 5

16.Per-Window vs Global state  Both per-window and global state are still tied to a key  Per-window state is additionally scoped to the key of the processed window  Global state is the same for all windows 1 6

17.Example – Differential Window private static class DifferentialWindowFunction extends ProcessWindowFunction<Long, Tuple2<Long, Long>, String, TimeWindow> { private final static ValueStateDescriptor<Long> previousFiringState = new ValueStateDescriptor<>("previous-firing", LongSerializer.INSTANCE); private final static ReducingStateDescriptor<Long> firingCounterState = new ReducingStateDescriptor<>( "firing-counter", new Sum(), LongSerializer.INSTANCE); @Override public void process(…) { … } } 1 7

18.Example – Differential Window @Override public void process( String key,Context context, Iterable<Long> values, Collector<Tuple2<Long, Long>> out) { ValueState<Long> previousFiring = context.windowState().getState(previousFiringState); ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState); Long output = Iterables.getOnlyElement(values); if (firingCounter.get() == null) { // first firing out.collect(Tuple2.of(0L, output)); } else { // subsequent firing out.collect(Tuple2.of(firingCounter.get(), output - previousFiring.value())); } firingCounter.add(1L); previousFiring.update(output); } 1 8

19.Example – Differential Window @Override public void clear(Context context) { ValueState<Long> previousFiring = context.windowState().getState(previousFiringState); ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState); previousFiring.clear(); firingCounter.clear(); } 1 9

20.Window anti-patterns  Using an Evictor prevents pre-aggregation  A sliding window 1 day long with a slide of 1 minute will make 24x60 copies of every element – so don’t do that! 20