- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
DataStream API: Advanced Windowing
展开查看详情
1 . DataStream API Advanced Windowing Apache Flink® Training Flink v1.3 – 9.9.2017 1
2 .The lower-level details Windows 2
3 .Specifying Windowing stream .keyBy() / keyed vs non-keyed windows .window() / “Assigner” .trigger() / each Assigner has a default Trigger .evictor() / default: no Evictor .allowedLateness() / default: zero .reduce|apply|process() 3
4 .Triggers Determines when a window is ready to have its process or window function called, e.g., • by counting • by comparing timestamps to the current watermark Each WindowAssigner comes with a default Trigger 4
5 .Trigger interface onElement() onEventTime() onProcessingTime() onMerge() • merges the states of two triggers when their windows merge clear() • use this to clear any managed state, etc 5
6 .Trigger interface onElement() onEventTime() return a TriggerResult onProcessingTime() onMerge() • merges the states of two triggers when their windows merge clear() • use this to clear any managed state 6
7 .TriggerResult CONTINUE FIRE PURGE FIRE_AND_PURGE Purging only removes the window’s contents 7
8 .public class CountTrigger<W extends Window> extends Trigger<Object, W> { private final long maxCount; private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); private CountTrigger(long maxCount) { this.maxCount = maxCount; } ... } 8
9 .@Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState<Long> count = ctx.getPartitionedState(stateDesc); count.add(1L); if (count.get() >= maxCount) { count.clear(); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } 9
10 .@Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); } @Override public boolean canMerge() { return true; } @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); } 10
11 .Evictors Control which elements are passed to the window function (by removing elements from a window) Always called after the trigger fires, but before and/or after the window function • evictBefore() • evictAfter() 11
12 .Late elements When allowedLateness > 0 Late elements can trigger late firings With merging windows, late firings can lead to late merging 12
13 .ProcessWindowFunction New and improved version of WindowFunction that has more information about the window- firing context Has API for accessing both per-window state and global state (both per key) 1 3
14 . ProcessWindowFunction (API) abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction { /** Called for each window firing. */ public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out); /** Called when a window is being purged. */ public void clear(Context context) throws Exception {} } 1 4
15 .Context (API) public abstract class Context implements java.io.Serializable { public abstract W window(); public abstract long currentProcessingTime(); public abstract long currentWatermark(); public abstract KeyedStateStore windowState(); public abstract KeyedStateStore globalState(); } 1 5
16 .Per-Window vs Global state Both per-window and global state are still tied to a key Per-window state is additionally scoped to the key of the processed window Global state is the same for all windows 1 6
17 .Example – Differential Window private static class DifferentialWindowFunction extends ProcessWindowFunction<Long, Tuple2<Long, Long>, String, TimeWindow> { private final static ValueStateDescriptor<Long> previousFiringState = new ValueStateDescriptor<>("previous-firing", LongSerializer.INSTANCE); private final static ReducingStateDescriptor<Long> firingCounterState = new ReducingStateDescriptor<>( "firing-counter", new Sum(), LongSerializer.INSTANCE); @Override public void process(…) { … } } 1 7
18 .Example – Differential Window @Override public void process( String key,Context context, Iterable<Long> values, Collector<Tuple2<Long, Long>> out) { ValueState<Long> previousFiring = context.windowState().getState(previousFiringState); ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState); Long output = Iterables.getOnlyElement(values); if (firingCounter.get() == null) { // first firing out.collect(Tuple2.of(0L, output)); } else { // subsequent firing out.collect(Tuple2.of(firingCounter.get(), output - previousFiring.value())); } firingCounter.add(1L); previousFiring.update(output); } 1 8
19 .Example – Differential Window @Override public void clear(Context context) { ValueState<Long> previousFiring = context.windowState().getState(previousFiringState); ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState); previousFiring.clear(); firingCounter.clear(); } 1 9
20 .Window anti-patterns Using an Evictor prevents pre-aggregation A sliding window 1 day long with a slide of 1 minute will make 24x60 copies of every element – so don’t do that! 20