自学内容网 自学内容网

数据开发|如何使用Apache Airflow进行任务调度?干货分享!

作者:李代伟| 后端开发工程师

一、ApacheAirflow

概述

随着数据复杂性的不断增加,管理和调度数据处理任务变得越来越具有挑战性。Apache Airflow 是一个开源平台,专为开发、调度和监控批处理工作流而设计。作为一个功能强大的工作流编排工具,Airflow 提供了一种简单而灵活的方法来定义、安排和监控数据工作流。

Apache Airflow 特点


直观的工作流定义:Airflow 使用 DAGs(有向无环图)来表示工作流,使得用户可以清晰地看到任务之间的依赖关系。


强大的调度功能:Airflow 支持多种调度方式,包括定时调度、事件触发等,满足不同的调度需求。


易于扩展:Airflow 的模块化设计使得用户能够轻松地扩展其功能,满足特定的业务需求。


实时监控和日志记录:Airflow 提供实时的任务监控和日志记录功能,使用户能够方便地跟踪任务的执行情况并解决问题。


丰富的插件系统:Airflow 提供了丰富的插件系统,支持多种数据源、计算引擎和通知方式,方便用户集成各种工具和系统。

Apache Airflow 的架构核心组件

Scheduler(调度器):负责根据定义的 DAG(Directed Acyclic Graph,有向无环图)图,计划和触发任务的执行。调度器将任务按照依赖关系组织成可执行的工作流程,并将其分发给可用的执行器。


Executor(执行器):执行器负责执行调度器分发的任务。Airflow 支持多种执行器,包括本地执行器(SequentialExecutor)、Celery 执行器和 Dask 执行器等。执行器将任务实际执行在相应的工作节点上,并将执行结果返回。


Web Server(Web 服务器):提供 Web 用户界面,用于监控和管理工作的状态、任务的执行情况、查看日志以及触发任务的手动运行等。通过 Web 界面,用户可以直观地了解工作流的整体情况。

Database(元数据库):元数据库存储了 Airflow 的元数据,包括 DAG 的定义、任务实例的状态、任务执行日志等。这允许用户在不同的任务和工作流之间共享信息,并支持任务的重试、回溯和监控。


Worker(工作节点):执行器通过工作节点在集群或计算资源上执行任务。工作节点可以是单个服务器或集群,具体取决于所选的执行器类型。


Message Queue(消息队列):消息队列用于在调度器和执行器之间传递任务信息。调度器将任务信息放入队列,执行器从队列中获取任务并执行。常见的消息队列包括 RabbitMQ、Apache Kafka 和 Celery 等。

二、安装与部署

前置条件

  • Conda创建的python3.8环境
  • MySQL版本8+

创建环境

#创建python虚拟环境
conda create -n airflow python=3.8
#激活环境
conda activate airflow

安装依赖

# 下载airflow相关依赖
pipinstall "apache-airflow==2.9.2"--constraint https://raw.githubusercont
ent.com/apache/airflow/constraints-2.9.2/constraints-3.8.txt
# 安装mysql连接依赖
pip install pymysql

修改配置文件

找到⽂件中~/airflow/airflow.cfg⽂件,搜索sql_alchemy_conn将其修改成你⾃⼰的MySQL数据库

  • 将其修改成连接MySQL
#需要将username、password、dbname替换成⾃⼰的
sql_alchemy_conn = mysql+pymysql://<username>:<password>@127.0.0.1:3306/<db
name>?charset=utf8&use_unicode=true
#例如
sql_alchemy_conn = mysql+pymysql://root:rootroot@127.0.0.1:3306/airflow?cha
rset=utf8&use_unicode=true

  • 修改默认的执行器

默认为顺序的执行器executor=SequentialExecutor

详细请参考:https://airflow.apache.org/docs/apache-airflow/2.9.2/core-concepts/executor/index.html

# 若需要调度的任务不是很多,可直接使⽤本地的执⾏器
executor= LocalExecutor

  • 修改dags_folder的文件目录
# 存放⽤户定义的DAG⽂件
dags_folder= /Users/david/airflow/dags

启动Airflow

#初始化数据库
airflow db init
#创建⽤户
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email spiderman@superhero.org
#后台启动Web服务端
airflow webserver --port 8080 -D
#启动airflow调度
airflow scheduler -D

启动成功并登录你就可以访问Web⻚⾯了

编写启动和关闭Airflow脚本

#!/bin/bash
case$1in
"start"){
echo"-------------启动 airflow-----------"
conda activate airflow
airflow webserver -p 8080 -D
airflow scheduler -D
conda deactivate
};;
"stop"){
echo"-------------关闭 airflow-----------"
ps -ef | egrep "scheduler|airflow-webserver"|grep -v grep|awk "{print $2
}"|xargs kill-15
};;
esac

三、使⽤Airflow调用远程的Datax服务

安装SSHOperator依赖,⽤于远程调用datax服务

pip installapache-airflow-providers-ssh

配置SSHConnection连接

编写DAGpython配置⽂件

将编写完成的DAGpython配置⽂件上传到dags_folder的文件目录。

fromdatetime importdatetime, timedelta
fromairflow.models.dag importDAG
fromairflow.providers.ssh.operators.ssh importSSHOperator
default_args = {
"depends_on_past": False, # 是否依赖上⼀次执⾏结果
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1, # 重试次数
"retry_delay": timedelta(minutes=5), # 重试间隔
}
withDAG(
"datax",
default_args=default_args,
description="执⾏datax任务",
schedule_interval=timedelta(days=1), # schedule_interval 应该是 timedelta
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["datax任务"],
) asdag:
t1 = SSHOperator(
task_id="run_datax_task",
ssh_conn_id="ssh-datax", # 配置在Airflow webui Connection中配置的SSH Conn id
command="""
sh /employee-portrait/process-job/process-job-master/datax/run.sh
employee_skill
""",
)

查看任务调度情况

重启Airflowwebsever与scheduler,登录webui,开启调度。

点击你所配置的DAG

查看运行日志

参考资料:

Airflow⽂档:https://airflow.apache.org/docs/apache-airflow/stable/index.html

版权声明:本文由神州数码云基地团队整理撰写,若转载请注明出处。

公众号搜索神州数码云基地,了解更多技术干货!


原文地址:https://blog.csdn.net/CBGCampus/article/details/142450551

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!