自学内容网 自学内容网

彻底理解Flink的多种部署方式

一.部署模式概述

在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求,Flink为各种场景提供了不同的部署模式,主要有以下三种:

1.会话模式(Session Mode)

会话模式其实最符合常规思维。我们需要先启动一个Flink集群,保持一个会话,在这个会话中通过客户端提交作业,集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源

会话模式比较适合于单个规模小、执行时间短的大量作业。

2.单作业模式(Per-Job Mode)

会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,它不会提前启动Flink集群,当作业提交了,现启动一个集群供其使用,这就是所谓的单作业(Per-Job)模式。

作业完成后,集群就会关闭,所有资源也会释放;

这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式

在Flink1.17版本上还在使用,但已经被官方标记为已过时,后续会弃用。

缺点:该模式下需要的资源更多。

注意:Flink本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架
来启动集群,比如YARN、Kubernetes(K8S)。

3.应用模式(Application Mode)

在Flink1.11版本以后才有的该模式。

问题:前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给JobManager的,但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager,加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。

解决:不要客户端了,直接把应用提交到JobManger上运行,而这也就代表着,需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群,这个JobManager只为执行这一个应用而存在,执行结束之后JobManager也就关闭了,这就是所谓的应用模式。

应用模式与单作业模式,都是提交作业之后才创建集群,单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群,而应用模式下,是直接由JobManager执行应用程序的。 

它们的三者区别主要在于集群的生命周期以及资源的分配方式(一人独享还是多人共享),以及应用的main方法到底在哪里执行,客户端(Client) 还是 JobManager。

二.具体运行模式

上面所说的部署模式,相对是比较抽象的概念,实际应用时,一般需要和资源管理平台结合起来,选择特定的模式来分配资源、部署应用,接下来,就针对不同的资源提供者的场景,具体介绍Flink的部署方式。

1.Standalone 运行模式(了解)

独立模式是独立运行的,自己来管理资源,不依赖任何外部的资源管理平台,当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理,所以独立模式一般只用在开发测试或作业非常少的场景。

 在《快速入门Flink》中"下载与安装"中,有对Standalone 运行模式部署步骤的说明。

(1)会话模式部署

提前启动集群,并通过 Web 页面客户端提交任务(可以多个任务,但是集群资源固定)。

(2)单作业模式部署

Flink 的 Standalone 集群并不支持单作业模式部署,因为单作业模式需要借助一些资源管
理平台。

(3)应用模式部署

应用模式下不会提前创建集群,所以不能调用 start-cluster.sh 脚本,此时可以使用同样在bin目录下的 standalone-job.sh来创建一个JobManager。

注意:该模式下应用程序的jar 包只能放在lib目录下,其他地方是不行的。

可以使用《快速入门Flink》中"下载与安装"部署好的环境,将应用的jar包移动到lib目录下。

1)启动JobManager:

./standalone-job.sh start --job-classname com.ls.WordCountstreamUnboundedDemo

2)启动TaskManager(每个节点都需要手动启动):

./taskmanager.sh start

3)停止集群:

./taskmanager.sh stop
./standalone-job.sh stop

2.Yarn 运行模式(重点)

(1)前提

该模式依赖于Hadoop,Flink 可以运行在 Hadoop Yarn集群上,Yarn 作为资源管理器来调度和分配计算资源,这个模式适用于需要在已有 Hadoop 集群上运行的场景。

(2)Yarn上的部署过程

1)客户端把 Flink 应用提交给 Yarn 的 ResourceManager,Yarn 的 ResourceManager 会向Yam的NodeManager 申请容器;

2)在这些容器上,Flink 会部署 JobManager 和 TaskManager 的实例,从而启动集群;

3)Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源(这一点是它的优势):

a.启动Yarn没有任务时:

b.启动Yarm上传一个任务后:

c.当任务执行完后,资源会被回收:

(3)相关准备和配置

在将 Flink 任务部署至 Yarn 集群之前,需要确认集群是否安装有Hadoop,建议Hadoop版本3.1.x以上版本,并且集群中安装有HDFS服务,此处不讲解Hadoop的部署,可自行百度。

按照好Hadoop集群后:

1)环境变量配置

$ sudo vim /etc/profile.d/my env.sh

HADOOP_HOME=/opt/module/hadoop-3.3.4
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF DIR${HADOOP HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

2)启动 Hadoop集群,包括HDFS和Yarn

./start-dfs.sh
./start-yarn.sh

(4)会话模式部署

Yarn 的会话模式与独立集群略有不同,需要首先申请一个Yarn 会话(YARN Session)来启动 Flink集群,具体步骤如下:

1)启动 Hadoop 集群(HDFS、Yarn);

2)执行脚本命令向 Yarn 集群申请资源,开启一个 Yarn 会话,启动 Flink 集群:

./yarn-session.sh nm test

常用参数说明:

-d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,Yarn session 也可以后台运行;

-jmm(--jobManagerMemory):配置JobManager 所需内存,默认单位 MB;

-nm(--name):配置在 Yarm UI界面上显示的任务名;
-qu(--queue):指定Yarm队列名;
-tm(--taskManager):配置每个TaskManager 所使用内存。

注意:Flink1.11.0 版本不再使用 -n 参数和 -s 参数分别指定 TaskManager 数量和 slot 数量,Yarn 会按照需求动态分配 TaskManager 和 slot,所以从这个意义上讲,Yarn 的会话模式也不会把集群资源固定,同样是动态分配的。

3)停止会话:

Web页面可以直接停止,下面说的是使用命令的方式:

a.产于启动日志,有如下命令

$ echo "stop" | ./bin/yarn-session.sh -id application_1680702304497_0003

b.直接使用该命令就可以停止会话。

(5)单作业模式部署

在 Yarn 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 Yarn 提交一个单独的作业,从而启动一个 Flink 集群。

1)执行命令提交作业

./flink run -d -t yarn-per-job -c com.ls.WordCountstreamUnboundedDemo flinkdemo-1.0-SNAPSHOT.jar

2)可以使用命令行查看或取消作业

./flink list -tyarn-per-job -Dyarn.application.id=application_XXXX_YY
./flink canal -tyarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

application_XXXX_YY为真实的应用id。

注意:如果启动过程中报如下异常

Exception in thread "Thread-5" java.lang.IllegalStateException:Trying to access closed classloader.Please check if you store classloaders directly or indirectly in static fields.If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration classloader`check-leaked-classloader` at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassloaders

修改Flink的config目录下的flink-conf.yaml配置文件添加下面命令即可:

classloader.check-leaked-classloader: false

(6)应用模式部署

应用模式同样非常简单,与单作业模式类似,直接执行inkrun-application命令即可。

1)命令行提交

a.执行命令提交作业

./flink run-application -t yarn-application -c com.ls.WordCountstreamUnboundedDemo flinkdemo-1.0-SNAPSHOT.jar(实际地址)

b.在命令行中查看或取消作业

./flink list -tyarn-per-job -Dyarn.application.id=application_XXXX_YY
./flink canal -tyarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

application_XXXX_YY为真实的应用id。

2)上传 HDFS 提交

可以通过 yarn.provided.lib.dirs 配置选项指定位置,将 flink的依赖上传到远程。

a.上传 flink 的 lib 和 plugins 到 HDFS 上

hadoop fs -mkdir /flink-dist
hadoop fs -put lib/ /flink-dist
hadoop fs -put plugins/ /flink-dist

b.上传自己的 jar 包到 HDFS

hadoop fs -mkdir /flink-jars
hadoop fs -put flinkdemo-1.0-SNAPSHOT.jar

c.提交作业

./flink run-application -t yarn-application -c com.ls.WordCountstreamUnboundedDemo -Dyarn.provided.lib.dirs="hdfs://ip:8020/flink-dist" -c com.ls.WordCountstreamUnboundedDemo hdfs://ip:8020/flink-jars/flinkdemo-1.0-SNAPSHOT.jar

 该方式推荐在生成环境去使用。

3.K8S 运行模式(了解)

容器化部署是如今业界流行的一项技术,基于Docker 镜像运行能够让用户更加方便地对应用进行管理和运维,容器管理工具中最为流行的就是Kubernetes(k8s),而Flink也在最近的版本(1.20.0)中支持了 k8s 部署模式,基本原理与 Yarn 是类似的,具体配置可以参见官网说明,这里就不做过多解释了

三.历史服务器

1.运行 Flinkjob 的集群一旦停止,只能去 yam 或本地磁盘上查看日志,不再可以查看作业挂掉之前的运行的 Web U,很难清楚知道作业在挂的那一刻到底发生了什么,如果我们还没有 Metrics 监控的话,那么完全就只能通过日志去分析和定位问题了,所以如果能还原之前的Web UI,我们可以通过 UI发现和定位一些问题;

2.Flink 提供了历史服务器,用来在相应的Flink 集群关闭后查询已完成作业的统计信息,我们都知道只有当作业处于运行中的状态,才能够查看到相关的WebUI统计信息。通过History Server 我们才能查询这些已完成作业的统计信息,无论是正常退出还是异常退出;

3.此外,它对外提供了RESTAPI,它接受HTTP请求并使用JSON数据进行响应,Flink任务停止后,JobManager 会将已经完成任务的统计信息进行存档,HistoryServer 进程则在任务停止后可以对任务统计信息进行查询(比如最后一次的Checkpoint、任务运行时的相关配置)。

步骤:

1)创建存储目录

hadoop fs -mkdir -p /logs/flink-job

 2)在 ink-confg.yaml 中添加如下配置

jobmanager.archive.fs.dir: hdfs://ip:8020/logs/flink-job
historyserver.web.address:ip
historyserver.web.port:8082
historyserver.archive.fs.dir: hdfs://ip:8020/logs/flink-job
historyserver.archive.fs.refresh-interval:5000

3)启动历史服务器

./historyserver.sh start

4)停止历史服务器

./historyserver.sh stop

5)在浏览器地址栏输入:http://ip:8082,查看已经停止的job的统计信息。


    原文地址:https://blog.csdn.net/BestandW1shEs_lsy/article/details/145316001

    免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!