自学内容网 自学内容网

【Flink】旁侧流的应用场景及代码实战

0、引言

在 Flink 中,"旁侧流"(Side Output)是一种特殊的输出流,它可以从同一个操作中分离出不同的数据流,从而实现数据的多路复用。

一、应用场景

旁侧流在 Flink 中有多种应用场景,以下是一些常见的例子:

  • 数据分流:在某些业务场景中,可能需要将数据根据特定条件分流到不同的输出,例如,将正常数据和异常数据分别发送到不同的主题或存储系统中。
  • 特征工程:在机器学习或数据分析中,可能需要从原始数据流中提取不同的特征集,用于不同的模型训练或分析。
  • 实时监控:在处理数据流时,可能需要实时监控数据的某些指标,如流量大小、异常率等,并将监控结果输出到监控系统。
  • 数据备份:为了保证数据的安全性,可能需要将处理过程中的数据备份到不同的存储系统中,以便于数据恢复
  • 迟到数据处理:在流处理系统中,对于晚于预期到达的数据,通过特定的机制(如水印和窗口)来识别并进行特殊处理,以确保数据的准确性和时效性,同时减少对整体处理流程的影响

 二、代码实现

2.1 数据分流

描述:将数据根据特定条件分流到不同的输出,例如,将正常数据和异常数据分别发送到不同的主题或存储系统。

代码示例


import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.output.SideOutput;
import org.apache.flink.streaming.api.output.SideOutputMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;

public class DataDivertingExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 定义两个旁侧输出标签
        final OutputTag<String> normalOutputTag = new OutputTag<String>("normal-output"){};
        final OutputTag<String> errorOutputTag = new OutputTag<String>("error-output"){};

        DataStream<String> input = env.fromElements("data1", "errorData", "data2", "data3");

        DataStream<String> normalDataStream = input
            .filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    return !value.contains("error");
                }
            })
            .map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
                    return "Normal: " + value;
                }
            });

        DataStream<String> errorDataStream = input
            .filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    return value.contains("error");
                }
            })
            .map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
                    return "Error: " + value;
                }
            })
            .output(errorOutputTag);

        // 获取旁侧输出流
        DataStream<String> normalSideOutput = normalDataStream.getSideOutput(normalOutputTag);
        DataStream<String> errorSideOutput = errorDataStream.getSideOutput(errorOutputTag);

        // 输出主数据流和旁侧数据流
        normalSideOutput.print("Normal Data Stream:");
        errorSideOutput.print("Error Data Stream:");

        env.execute("Flink Data Divert Example");
    }
}

2.2 特征工程

描述:从原始数据流中提取不同的特征集,用于不同的模型训练或分析。

代码示例

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.output.SideOutput;
import org.apache.flink.streaming.api.output.SideOutputMode;

public class FeatureEngineeringExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final OutputTag<String> featureAOutputTag = new OutputTag<String>("feature-a-output"){};
        final OutputTag<String> featureBOutputTag = new OutputTag<String>("feature-b-output"){};

        DataStream<String> input = env.fromElements("feature1", "feature2", "feature3");

        DataStream<String> featureAStream = input.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                // 假设提取特征A的逻辑
                return "Feature A: " + value;
            }
        }).output(featureAOutputTag);

        DataStream<String> featureBStream = input.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                // 假设提取特征B的逻辑
                return "Feature B: " + value;
            }
        }).output(featureBOutputTag);

        // 获取旁侧输出流
        DataStream<String> featureASideOutput = featureAStream.getSideOutput(featureAOutputTag);
        DataStream<String> featureBSideOutput = featureBStream.getSideOutput(featureBOutputTag);

        // 输出特征数据流
        featureASideOutput.print("Feature A Stream:");
        featureBSideOutput.print("Feature B Stream:");

        env.execute("Flink Feature Engineering Example");
    }
}

2.3 实时监控

描述:实时监控数据流中的某些指标,如流量大小、异常率等,并将监控结果输出到监控系统。

代码示例

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.output.SideOutput;
import org.apache.flink.streaming.api.output.SideOutputMode;
import org.apache.flink.streaming.api.windowing.time.Time;

public class RealTimeMonitoringExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final OutputTag<String> monitoringOutputTag = new OutputTag<String>("monitoring-output"){};

        DataStream<String> input = env.fromElements("data1", "data2", "data3");

        // 计算窗口内数据的数量
        DataStream<Long> countStream = input
            .keyBy((value) -> "monitoring-key") // 假设所有数据使用相同的key进行聚合
            .timeWindow(Time.seconds(10))
            .reduce(new ReduceFunction<String>() {
                @Override
                public String reduce(String value1, String value2) throws Exception {
                    return "Count: " + 1; // 简化示例,实际应计算数量
                }
            });

        // 将监控数据输出到旁侧流
        countStream.output(monitoringOutputTag);

        // 获取旁侧输出流
        DataStream<String> monitoringSideOutput = countStream.getSideOutput(monitoringOutputTag);

        // 输出监控数据流
        monitoringSideOutput.print("Monitoring Stream:");

        env.execute("Flink Real-time Monitoring Example");
    }
}

2.4 数据备份

描述:在处理数据流时,将处理过程中的数据备份到不同的存储系统中,以便于数据恢复。

代码示例


import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.output.SideOutput;
import org.apache.flink.streaming.api.output.SideOutputMode;

public class DataBackupExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final OutputTag<String> backupOutputTag = new OutputTag<String>("backup-output"){};

        DataStream<String> input = env.fromElements("data1", "data2", "data3");

        DataStream<String> backupStream = input.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                // 假设备份数据的逻辑
                return "Backup: " + value;
            }
        }).output(backupOutputTag);

        // 获取旁侧输出流
        DataStream<String> backupSideOutput = backupStream.getSideOutput(backupOutputTag);

        // 输出备份数据流
        backupSideOutput.print("Backup Data Stream:");

        env.execute("Flink Data Backup Example");
    }
}

2.5 迟到数据处理

描述:在实时数据流处理中,由于网络延迟或数据生成的不均匀性,数据可能会迟到。Flink 允许定义水印(Watermark)来处理迟到数据,旁侧流可以用于将迟到的数据分流出来进行特殊处理。

代码示例

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.watermark.WatermarkGenerator;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.output.SideOutput;
import org.apache.flink.streaming.api.output.SideOutputMode;

public class LateDataHandlingExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final OutputTag<String> lateOutputTag = new OutputTag<String>("late-data-output"){};
        
        DataStream<String> input = env.fromElements("event1", "event2", "event3");

        // 定义水印生成器,这里简化为每隔5秒生成一个时间戳为当前时间的水印
        WatermarkGenerator<String> watermarkGenerator = WatermarkGenerator.forBoundedOutOfOrderness((Time.seconds(5)), input)
            .withTimestampAssigner((value, timestamp) -> timestamp);

        // 使用旁侧流处理迟到数据
        DataStream<String> mainDataStream = input
            .assignTimestampsAndWatermarks(watermarkGenerator)
            .keyBy((value) -> "key") // 假设所有数据使用相同的key
            .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
            .sideOutputLateData(lateOutputTag, Evictor.<String>noOp());

        // 获取旁侧输出流
        DataStream<String> lateDataStream = mainDataStream.getSideOutput(lateOutputTag);

        // 输出主数据流和迟到数据流
        mainDataStream.print("On-Time Data Stream:");
        lateDataStream.print("Late Data Stream:");

        env.execute("Flink Late Data Handling Example");
    }
}

三、注意事项

  1. 性能影响:使用旁侧流可能会对性能产生一定的影响,因为数据需要被复制到不同的输出。

  2. 资源管理:旁侧流也需要资源来存储和管理,因此在设计系统时需要考虑资源的分配。

  3. 数据一致性:在使用旁侧流时,需要确保数据的一致性,特别是在有状态的 Flink 应用中。


原文地址:https://blog.csdn.net/u011487470/article/details/142714370

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!