自学内容网 自学内容网

Flink窗口分配器WindowAssigner

前言

Flink 数据流经过 keyBy 分组后,下一步就是 WindowAssigner。

WindowAssigner 定义了 stream 中的元素如何被分发到各个窗口,元素可以被分发到一个或多个窗口中,Flink 内置了常用的窗口分配器,包括:tumbling windows、 sliding windows、 session windows 和 global windows。除了 global windows ,其它分配器都是基于时间来分发数据的。

当然,你也可以继承 WindowAssigner 抽象类实现自定义的窗口分配逻辑。

WindowAssigner

先看一下 WindowAssigner 抽象类的定义:

@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    private static final long serialVersionUID = 1L;

    public WindowAssigner() {
    }

    public abstract Collection<W> assignWindows(T var1, long var2, WindowAssignerContext var4);

    public Trigger<T, W> getDefaultTrigger() {
        return this.getDefaultTrigger(new StreamExecutionEnvironment());
    }

    /** @deprecated */
    @Deprecated
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment var1);

    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig var1);

    public abstract boolean isEventTime();

    @PublicEvolving
    public abstract static class WindowAssignerContext {
        public WindowAssignerContext() {
        }

        public abstract long getCurrentProcessingTime();
    }
}

四个方法,作用如下:

  • assignWindows 将元素 element 分发到一个或多个窗口,返回值是窗口集合
  • getDefaultTrigger 返回默认的窗口触发器 Trigger
  • getWindowSerializer 返回窗口序列化器(窗口也要在算子间传输)
  • isEventTime 是否基于事件时间语义

Flink 内置的 WindowAssigner 实现类关系图如下:

首先,可以按照基于何种时间语义划分出三大类:

  • 基于事件时间语义
  • 基于处理时间语义
  • 不基于时间语义 --> GlobalWindows

在基于时间语义的大类下面,又可以按照时间窗口算法划分为三个具体实现:

  • 滚动窗口分配算法 tumbling windows
  • 滑动窗口分配算法 sliding windows
  • 会话窗口分配算法 session windows

定义窗口Window

窗口对象被 Flink 统一封装为抽象类org.apache.flink.streaming.api.windowing.windows.Window,Flink 内置了两种实现,分别是:

  • TimeWindow 基于时间范围的窗口,包含开始时间戳和结束时间戳
  • GlobalWindow 全局窗口,与时间无关的窗口

如果内置的这两种窗口无法满足你的需求,你也可以自定义窗口。需要注意的是,窗口本身是要在算子间传输的,所以你在自定义窗口的同时,还必须提供一个窗口序列化器,以便于 Flink 可以将你的窗口对象序列化传输。

如下示例,我们定义了一个基于数字范围的 NumberWindow,可以将一个数字划分到对应的数字范围窗口内。

public class NumberWindow extends Window {
    private final int min;
    private final int max;

    public NumberWindow(int min, int max) {
        this.min = min;
        this.max = max;
    }

    public int getMin() {
        return min;
    }

    public int getMax() {
        return max;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        NumberWindow that = (NumberWindow) o;
        return min == that.min && max == that.max;
    }

    @Override
    public int hashCode() {
        return Objects.hash(min, max);
    }

    @Override
    public long maxTimestamp() {
        return Long.MAX_VALUE;
    }
}

Window 实现还必须配套一个序列化器,主要是实现 两个int变量到窗口对象的转换。

public static class Serializer extends TypeSerializerSingleton<NumberWindow> {

    @Override
    public boolean isImmutableType() {
        return true;
    }

    @Override
    public NumberWindow createInstance() {
        return new NumberWindow(0, 0);
    }

    @Override
    public NumberWindow copy(NumberWindow numberWindow) {
        return numberWindow;
    }

    @Override
    public NumberWindow copy(NumberWindow numberWindow, NumberWindow t1) {
        return numberWindow;
    }

    @Override
    public int getLength() {
        return 8;
    }

    @Override
    public void serialize(NumberWindow numberWindow, DataOutputView dataOutputView) throws IOException {
        dataOutputView.writeInt(numberWindow.getMin());
        dataOutputView.writeInt(numberWindow.getMax());
    }

    @Override
    public NumberWindow deserialize(DataInputView dataInputView) throws IOException {
        return new NumberWindow(dataInputView.readInt(), dataInputView.readInt());
    }

    @Override
    public NumberWindow deserialize(NumberWindow numberWindow, DataInputView dataInputView) throws IOException {
        return this.deserialize(dataInputView);
    }

    @Override
    public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
        dataOutputView.writeInt(dataInputView.readInt());
        dataOutputView.writeInt(dataInputView.readInt());
    }

    @Override
    public TypeSerializerSnapshot<NumberWindow> snapshotConfiguration() {
        return new TimeWindowSerializerSnapshot();
    }

    public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<NumberWindow> {

        public TimeWindowSerializerSnapshot() {
            super(Serializer::new);
        }
    }
}

自定义WindowAssigner

窗口对象定义好了,接下来就是定义窗口分配对象。

简单原则,我们把数字划分为三个窗口,分别是:小数窗口、中位数窗口、大数窗口。
如下示例,继承 WindowAssigner 类,重写 assignWindows 方法,把数字划分到对应的窗口中。

public static class MyWindowAssigner extends WindowAssigner<Integer, NumberWindow> {

    private final int startingMedian;
    private final int startingLarge;

    public MyWindowAssigner(int startingMedian, int startingLarge) {
        this.startingMedian = startingMedian;
        this.startingLarge = startingLarge;
    }

    @Override
    public Collection<NumberWindow> assignWindows(Integer element, long timestamp, WindowAssignerContext windowAssignerContext) {
        // 将数字划分到 小数、中位数、大数 窗口
        NumberWindow window;
        if (element < startingMedian) {
            window = new NumberWindow(Integer.MIN_VALUE, startingMedian - 1);
        } else if (element < startingLarge) {
            window = new NumberWindow(startingMedian, startingLarge - 1);
        } else {
            window = new NumberWindow(startingLarge, Integer.MAX_VALUE);
        }
        return List.of(window);
    }

    @Override
    public Trigger<Integer, NumberWindow> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {
        return null;
    }

    @Override
    public TypeSerializer<NumberWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new NumberWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return false;
    }
}

把流程串起来

窗口对象和窗口分配的逻辑都有了,接下来就是把整个流程给串起来。

如下示例程序,我们定义了一个一秒内生成10个一百以内随机数的数据源Source,然后将这些数字流分为一组,并为其指定我们自定义的 MyWindowAssigner 窗口分配策略,策略中划分了三个窗口,数字小于20的归为小数一档、20到80的归为中位数一档、大于80的归为大数一档,根本数字分配对应的窗口。然后我们自定义了 Trigger,当窗口内积攒的数字达到十个,就触发窗口计算并关闭窗口。最终 ProcessWindowFunction 打印窗口内的数字并求和。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.addSource(new SourceFunction<Integer>() {
                @Override
                public void run(SourceContext<Integer> sourceContext) throws Exception {
                    while (true) {
                        Threads.sleep(100);
                        sourceContext.collect(ThreadLocalRandom.current().nextInt(100));
                    }
                }

                @Override
                public void cancel() {

                }
            }).keyBy(i -> "all")
            .window(new MyWindowAssigner(20, 80))
            .trigger(new Trigger<Integer, NumberWindow>() {

                @Override
                public TriggerResult onElement(Integer element, long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {
                    ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<>("count", Integer.class));
                    Integer count = Optional.ofNullable(countState.value()).orElse(0) + 1;
                    if (count < 10) {
                        countState.update(count);
                        return TriggerResult.CONTINUE;
                    }
                    countState.update(0);
                    return TriggerResult.FIRE_AND_PURGE;
                }

                @Override
                public TriggerResult onProcessingTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {
                    return null;
                }

                @Override
                public TriggerResult onEventTime(long timestamp, NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {
                    return null;
                }

                @Override
                public void clear(NumberWindow numberWindow, TriggerContext triggerContext) throws Exception {

                }
            }).process(new ProcessWindowFunction<Integer, Object, String, NumberWindow>() {
                @Override
                public void process(String key, ProcessWindowFunction<Integer, Object, String, NumberWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {
                    StringBuilder builder = new StringBuilder("[" + context.window().getMin() + " - " + context.window().getMax() + "] [");
                    int sum = 0;
                    for (Integer value : iterable) {
                        builder.append(value + ",");
                        sum += value;
                    }
                    builder.append("] sum=" + sum);
                    System.err.println(builder.toString());
                }
            });
    environment.execute();
}

运行 Flink 作业,控制台输出:

[20 - 79] [30,32,24,66,63,37,] sum=252
[20 - 79] [71,48,41,55,75,79,] sum=369
[80 - 2147483647] [99,90,88,98,85,99,] sum=559
[20 - 79] [74,30,56,70,36,78,] sum=344

尾巴

Flink 的 WindowAssigner 在数据处理中发挥着关键作用。它决定了如何将源源不断的数据流切分成不同的窗口,以便进行有针对性的聚合、计算和分析。
通过合理配置 WindowAssigner,我们能够根据时间、数量或自定义的逻辑来划分数据,灵活地适应各种业务场景。这使得 Flink 能够对海量的实时数据进行高效且精准的处理,帮助我们从数据中提取有价值的信息和洞察。


原文地址:https://blog.csdn.net/qq_32099833/article/details/142958307

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!