Flink 任务启动脚本-V2(包括ck启动)
#!/bin/bash
#crontab时设置,如果依赖其他环境变量配置,可以在脚本执行一下环境变量脚本
source /etc/profile
# 进入脚本目录
curdir=`dirname "$0"`
curdir=`cd "$curdir"; pwd`
echo "进入启动脚本目录 $curdir"
# 定义应用程序名称
APP_NAME="orderTest"
# 定义checkpoint路径
CHECKPOINT_BASE_PATH="hdfs:///jobs/flink/checkpoints/$APP_NAME/"
MAIN_CLASS="com.test.mainTest"
#绝对路径
JAR_PATH="/$curdir/flink-test-1.0.0.jar"
# 默认从checkpoint启动
USE_CHECKPOINT=true
# 显示使用说明
usage() {
echo "用法: $0 [-n] [-h]"
echo "选项:"
echo " -n 不从checkpoint启动任务(默认从最新的checkpoint启动)"
echo " -h 显示此帮助信息"
echo
echo "示例:"
echo " $0 # 从最新的checkpoint启动任务"
echo " $0 -n # 不使用checkpoint启动任务"
exit 1
}
# 解析命令行参数
while getopts ":nh" opt; do
case $opt in
n)
USE_CHECKPOINT=false
echo "已设置:不从checkpoint启动任务"
;;
h)
usage
;;
\?)
echo "错误:无效的选项 -$OPTARG"
echo "使用 -h 查看帮助信息"
exit 1
;;
esac
done
# 如果设置了无效参数,显示使用说明
if [ $OPTIND -gt 1 ]; then
shift $((OPTIND-1))
if [ "$#" -gt 0 ]; then
echo "错误:存在额外的参数 $@"
echo "使用 -h 查看帮助信息"
exit 1
fi
fi
# 检查是否存在指定应用程序在运行中
is_running=$(yarn application -list | grep -w "$APP_NAME" | grep -c "RUNNING")
if [ $is_running -gt 0 ]; then
echo "应用程序 $APP_NAME 在运行中,退出脚本"
exit 1
else
echo "应用程序 $APP_NAME 不在运行中,准备拉起任务"
fi
# 函数:获取最新成功的checkpoint地址
get_latest_checkpoint() {
latest_checkpoint=$(hdfs dfs -ls -t -R $CHECKPOINT_BASE_PATH | grep '_metadata' | sort -k6,7r | head -n 1 | awk '{print $8}' )
if [ -z "$latest_checkpoint" ]; then
return 1
fi
# 去掉文件名,只保留目录路径
checkpoint_dir=$(dirname "$latest_checkpoint")
echo $checkpoint_dir
return 0
}
# 构建基础命令
CMD="flink run \
-t yarn-per-job \
-d \
-p 1 \
-Dyarn.application.queue=realtime \
-Dyarn.application.name=$APP_NAME \
-Dtaskmanager.memory.process.size=4096mb \
-Dtaskmanager.memory.network.max=64mb \
-Dtaskmanager.memory.managed.size=1024mb \
-Dtaskmanager.numberOfTaskSlots=1 \
-c $MAIN_CLASS"
if [ "$USE_CHECKPOINT" = true ]; then
# 获取最新的checkpoint地址
LATEST_CHECKPOINT=$(get_latest_checkpoint)
if [ $? -ne 0 ]; then
echo "没有找到适合的ck,退出执行"
exit 1
fi
echo "'$APP_NAME' 任务将从 '$LATEST_CHECKPOINT' 启动"
CMD="$CMD \
-s $LATEST_CHECKPOINT"
else
echo "'$APP_NAME' 任务将不从checkpoint启动"
fi
# 添加最终的jar包和配置文件参数
CMD="$CMD \
$JAR_PATH "
# 执行命令
echo " 任务启动命令: '$CMD' "
eval $CMD
本次新增一个不从 ck 启动的选择,默认从 最新 ck 启动,用法 sh start.sh -n 。
备注:由于Flink checkpoint 个别情况下,不一定能保证落地的checkpoint文件一定有效,所以需要人工介入支持。
原文地址:https://blog.csdn.net/zfqzpp/article/details/144082653
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!