自学内容网 自学内容网

Flink 窗口触发器Triggers

Triggers

定义:触发器决定了窗口何时被触发。在Flink中,窗口的触发是通过设置定时器来实现的。

作用:控制窗口数据的聚合时机,确保数据在适当的时间点被处理和输出。

Trigger关键方法

onElement: 当元素被添加到窗口时调用,用于注册定时器或更新窗口状态。

onElement(T element, long timestamp, W window, TriggerContext ctx);

onEventTime:当事件时间计时器触发时调用,用于处理事件时间相关的触发逻辑。

onEventTime(long time, W window, TriggerContext ctx);

onProcessingTime :当处理时间计时器触发时调用,这里时间指机器处理时间,而不考虑时间本身的时间。见后文ProcessingTimeTrigger实现

onProcessingTime(long time, W window, TriggerContext ctx);

clear 当窗口被删除时调用,用于清理窗口的状态和定时器。

clear(W window, TriggerContext ctx);

内置Trigger

Flink提供了多种内置的触发器,以下为几种常用类型:

  • EventTimeTrigger 工作原理:基于事件时间和水印(Watermark)机制来触发窗口计算。当窗口的最大时间戳小于等于当前的水印时,立即触发窗口计算。

  • ProcessingTimeTrigger 工作原理:基于处理时长(即机器的系统时间)来触发窗口计算。当处理时间达到窗口的结束时间时,触发窗口计算。

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }
  • CountTrigger 工作原理:根据窗口内元素的数量来触发计算。当窗口内的元素数量达到预设的阈值时,触发窗口计算。

  ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
  count.add(1L);
  if (count.get() >= maxCount) {
      count.clear();
      return TriggerResult.FIRE;
  }
  return TriggerResult.CONTINUE;
  • ContinuousEventTimeTrigger和ContinuousProcessingTimeTrigger 工作原理:根据间隔时间周期性触发窗口计算,或者当窗口的结束时间小于当前的时间(事件时间或处理时间)时触发计算。适用场景:适用于需要周期性处理数据的场景,如实时监控、周期性报表等。

       if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
        }

  • DeltaTrigger 工作原理:根据接入数据计算出的Delta指标是否超过指定的阈值来触发窗口计算。适用场景:适用于需要基于数据变化量进行处理的场景,如异常检测、趋势分析等。

    if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
        lastElementState.update(element);
        return TriggerResult.FIRE;
    }
        

自定义一个Trigger

实现一个CountTrigger 窗口元素数量达到阈值时,触发计算

package com.codetonight.datastream.trigger;

import org.apache.flink.streaming.api.windowing.triggers.Trigger;

import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;  
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;  
  
public class CountTrigger<T> extends Trigger<T, GlobalWindow> {  
  
    private final long countThreshold;  
    private long count = 0L;  
  
    public CountTrigger(long countThreshold) {  
        this.countThreshold = countThreshold;  
    }  
  
    @Override  
    public TriggerResult onElement(T element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {  
        count++;  
        if (count >= countThreshold) {  
            // 触发窗口并清除计数器
            count = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }  
        return TriggerResult.CONTINUE;  
    }

    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    // 其他方法(onEventTime, onProcessingTime, onMerge, clear)可以留空或实现特定的逻辑  
  
    @Override  
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {

        count = 0L;  
    }  

}
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;  
import org.apache.flink.streaming.api.windowing.windows.Window;  
  
public class FlinkGlobalWindowExample {  
  
    public static void main(String[] args) throws Exception {  
  
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  
        DataStream<Long> source = env.fromElements(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L);  
  
        // 应用全局窗口和自定义触发器  
        DataStream<Long> result = source
            .keyBy(value -> 1)
          
            .windowAll(GlobalWindows.create())
            .trigger(new CountTrigger<>(5)) // 当接收到5个元素时触发
            .reduce(new ReduceFunction<Long>() {  
                @Override  
                public Long reduce(Long value1, Long value2) {  
                    return value1 + value2;  
                }  
            });  
  
        // 打印结果  
        result.print();  
  
        // 执行作业  
        env.execute("Flink Global Window Example");  
    }  
}

Evictor

Flink 的窗口模型允许在指定 WindowAssigner 和 Trigger 之外,还可以选择性地指定一个 Evictor。

Evictor 的功能是在触发器触发后,且在窗口函数应用之前和/或之后,从窗口中移除元素。为了实现这一功能,Evictor 接口定义了两个方法:

public interface Evictor<T, W extends Window> extends Serializable {


    void evictBefore(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);


    void evictAfter(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);


}

通过这两个方法,Evictor 提供了在窗口生命周期中灵活控制元素保留与移除的能力。

内置Evictor

这些 Evictor 可以单独使用,也可以与 Flink 的 WindowAssigner 和 Trigger 一起使用, 以创建复杂而强大的窗口处理逻辑。通过灵活组合这些组件, Flink 用户可以处理各种实时数据流场景,包括滑动窗口、滚动窗口、会话窗口等。

CountEvictor: 功能:保留窗口中用户指定的元素数量,并从窗口缓冲区的开头丢弃剩余的元素。应用场景:当你只需要保留窗口中最新的 N 个元素时,这个 Evictor 非常有用。


DeltaEvictor: 移除逻辑代码比较清晰:

  1. 取窗口最后一个元素lastElement

  2. 所有元素与lastElement 比较计算出差值( Delta )

  3. 差值( Delta ) 超过阈值则移除

DeltaFunction用于计算两个元素之间的差值

    private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
        TimestampedValue<T> lastElement = Iterables.getLast(elements);
        for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext(); ) {
            TimestampedValue<T> element = iterator.next();
            if (deltaFunction.getDelta(element.getValue(), lastElement.getValue())
                    >= this.threshold) {
                iterator.remove();
            }
        }
    }

TimeEvictor: 功能:基于时间戳来移除窗口中的元素。它接受一个时间间隔(以毫秒为单位),对于给定的窗口,它会找到元素中的最大时间戳 max_ts,并移除所有时间戳小于 max_ts 减去指定时间间隔的元素。应用场景:当你希望基于时间戳来过滤窗口中的旧元素时,这个 Evictor 非常有用。

TimeEvictor evcit 方法代码逻辑,方法命名很清晰。

  1. 取窗口元素最大的时间戳 currentTime,

  2. 保留的时间戳阈值evictCutoff = currentTime -windowSize

  3. 循环遍历移除不在evictCutoff 之前的元素

    private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
        if (!hasTimestamp(elements)) {
            return;
        }

        long currentTime = getMaxTimestamp(elements);
        long evictCutoff = currentTime - windowSize;

        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                iterator.hasNext(); ) {
            TimestampedValue<Object> record = iterator.next();
            if (record.getTimestamp() <= evictCutoff) {
                iterator.remove();
            }
        }
    }

原文地址:https://blog.csdn.net/happycao123/article/details/142990026

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!