自学内容网 自学内容网

零基础入门Flink,掌握基本使用方法

Flink基本概念

首先来讲,Flink是一个面向数据流处理批处理的分布式开源计算框架。

那么,流处理和批处理分别处理什么样的数据呢,这就涉及两个概念-无界流和有界流

无界流VS有界流

任何类型的数据都可以形成流数据,比如用户交互记录, 传感器数据,事件日志等等。

Apache Flink 擅长处理无界和有界数据集。 精确的时间控制和有状态的计算,使得 Flink能够运行
任何处理无界流的应用

流数据分为无界流和有界流。

  • 1) 无界流:有定义流的开始,但没有定义流的结束, 会不停地产生数据,无界流采用的是流处理方式。
  • 2) 有界流:有定义流的开始, 也有定义流的结束, 需要在获取所有数据后再进行计算,有界流采用的是批处理方式。

组件结构

DataSet 一般用来处理有界流数据。
DataStream一般用来处理无界流数据。

Flink基础Demo案例

1、基本环境搭建

pom.xml核心配置

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.11.2</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <log4j.version>2.12.1</log4j.version>
        <spring.boot.version>2.1.6.RELEASE</spring.boot.version>
        <mysql.jdbc.version>5.1.47</mysql.jdbc.version>
    </properties>

    <dependencies>
        <!-- Flink 的示例和常用工具类 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--Flink 的流处理核心库-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--Flink 的客户端库,用于提交作业等-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
        </dependency>
    </dependencies>

2、批处理Demo实现

这个demo实现-----通过批处理方式,统计日志文件中的异常数量。

文件准备order_info.log,文件内容如下

2019-08-25 16:32:55,626 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$e1b53a9c] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:32:56,918 [main] INFO  [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
2019-08-25 16:32:57,829 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:32:57,834 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:32:57,847 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 5ms. Found 0 repository interfaces.
2019-08-25 16:32:57,858 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:32:57,859 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:32:57,870 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
2019-08-25 16:32:57,908 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
2019-08-25 16:32:57,928 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2019-08-25 16:32:58,144 [main] INFO  [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
2019-08-25 16:32:58,155 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2019-08-25 16:32:58,162 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2019-08-25 16:32:58,176 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2019-08-25 16:32:58,212 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:32:58,267 [configOperate_1_2] WARN  [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
2019-08-25 16:32:58,269 [main] WARN  [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
2019-08-25 16:32:58,280 [main] INFO  [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-08-25 16:32:58,289 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)
at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)
at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
... 19 common frames omitted
Caused by: io.seata.common.exception.NotSupportYetException: not support register type: null
at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)
at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)
at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.CGLIB$globalTransactionScanner$0(<generated>)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba$$FastClassBySpringCGLIB$$ec5dcab5.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.globalTransactionScanner(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
... 20 common frames omitted
Caused by: java.lang.IllegalArgumentException: illegal type:null
at io.seata.config.ConfigType.getType(ConfigType.java:62)
at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)
... 35 common frames omitted
2019-08-25 16:36:05,248 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$ed668c12] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:36:06,559 [main] INFO  [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
2019-08-25 16:36:07,554 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:36:07,555 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:36:07,568 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 6ms. Found 0 repository interfaces.
2019-08-25 16:36:07,581 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:36:07,583 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:36:07,595 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
2019-08-25 16:36:07,639 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
2019-08-25 16:36:07,661 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2019-08-25 16:36:07,919 [main] INFO  [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
2019-08-25 16:36:07,930 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2019-08-25 16:36:07,937 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2019-08-25 16:36:07,944 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2019-08-25 16:36:07,987 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:36:10,247 [configOperate_1_2] WARN  [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
2019-08-25 16:36:14,137 [main] WARN  [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
2019-08-25 16:36:14,150 [main] INFO  [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-08-25 16:36:14,161 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)
at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)
at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
... 19 common frames omitted
Caused by: io.seata.common.exception.NotSupportYetException: not support register type: null
at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)
at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)
at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.CGLIB$globalTransactionScanner$0(<generated>)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830$$FastClassBySpringCGLIB$$691b278a.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.globalTransactionScanner(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
... 20 common frames omitted
Caused by: java.lang.IllegalArgumentException: illegal type:null
at io.seata.config.ConfigType.getType(ConfigType.java:62)
at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)
... 35 common frames omitted
2019-08-25 16:36:21,421 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean

 代码实现

public class BatchProcessorApplication {
    public static void main(String[] args) throws Exception{
        //1.定义Flink运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2.读取数据源(日志文件信息)
        DataSource<String> logData = env.readTextFile("./data/order_info.log");

        //3.清洗转换数据
        logData.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                // 1) 根据正则,提取每行日志的级别
                Pattern pattern = Pattern.compile("\\[main\\](.*?)\\[");
                Matcher matcher = pattern.matcher(value);
                if (matcher.find()){
                    // 2) 如果匹配符合规则,放置元组,输出数据
                    collector.collect(new Tuple2<>(matcher.group(1).trim(),1));
                }
            }
            //groupBy(0)代表对collector中每个tuple2的进行分组,sum(1)代表对tuple2中的Integer进行求和
        }).groupBy(0).sum(1).print();
    }
}

3、流处理Demo实现

本地模拟socket请求

代码实现

public class StreamProcessorApplication {

    public static void main(String[] args) throws Exception{
        //1.定义Flink执行环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.从socket数据流中读取实时流数据
        DataStreamSource<String> dataStreamSource = streamEnv.socketTextStream("127.0.0.1", 9999);

        dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] split = line.split("\t");
                collector.collect(new Tuple2<>(split[0],1));
            }
            // setParallelism设置并行流计算有多少个线程
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

        streamEnv.execute();
    }

}

Flink部署安装配置

首先确保你的Linux系统已经安装好了JDK

解压flink安装包

tar -zxvf flink-1.11.2-bin-scala_2.11.tgz

进入flink配置目录

masters 文件用于指定 Flink 集群中的主节点(JobManager)地址。它帮助集群中的各个节点知道主节点的位置,从而能够正确地连接和通信。

 flink-conf.yaml 文件是 Flink 的主要配置文件,用于配置 Flink 集群的各种参数。这个文件包含了丰富的配置选项,涵盖了从基本的集群设置到高级的性能调优。

其中的常见配置:

  • jobmanager.rpc.address:

    • 描述: 指定 JobManager 的主机名或IP地址。
    • 示例jobmanager.rpc.address: localhost
  • jobmanager.rpc.port:

    • 描述: 指定 JobManager 的RPC端口号。
    • 示例jobmanager.rpc.port: 6123
  • jobmanager.memory.process.size:

    • 描述: 指定 JobManager 的总内存大小。
    • 示例jobmanager.memory.process.size: 1600m
  • taskmanager.memory.process.size:

    • 描述: 指定 TaskManager 的总内存大小。
    • 示例taskmanager.memory.process.size: 1600m
  • taskmanager.numberOfTaskSlots:

    • 描述: 指定每个 TaskManager 的任务槽位数。
    • 示例taskmanager.numberOfTaskSlots: 4
  • parallelism.default:

    • 描述: 指定默认的并行度。
    • 示例parallelism.default: 4
  • state.backend:

    • 描述: 指定状态后端,可以选择 MemoryStateBackendFsStateBackend 或 RocksDBStateBackend
    • 示例state.backend: rocksdb
  • rest.address:

    • 描述: 指定 REST API 的主机名或IP地址。
    • 示例rest.address: localhost
  • rest.port:

    • 描述: 指定 REST API 的端口号。
    • 示例rest.port: 8081

启动flink,进入到bin目录

 

 

访问8081端口

到此flink安装完毕 

Flink任务提交

在pom.xml文件中配置打包插件

<build>
        <plugins> <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding} </encoding>-->
                </configuration>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!-- zip -d learn_spark.jar META- INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 可以设置jar包的入口类(可选) -->
                                    <mainClass>com.demo.flink.usage.stream.StreamProcessorApplication</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

点击package打成jar包

界面提交

上传jar包,信息无误后,点击提交

在这里就能看到输出结果了

命令行提交

上传jar包至linux

进入flink的bin目录

执行命令

./flink run -c com.demo.flink.usage.stream.StreamProcessorApplication /usr/local/flink-usage-1.0-SNAPSHOT.jar

 


原文地址:https://blog.csdn.net/qq_46248151/article/details/143885601

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!