自助下单网站咋做,上海网站推广营销设计,学做ppt的网站,wordpress官方程序下载1 概述
Airflow是一个以编程方式编写#xff0c;用于管理和调度工作流的平台。可以帮助你定义复杂的工作流程,然后在集群上执行和监控这些工作流。
Airflow计划程序在遵循指定的依赖项#xff0c;同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度…1 概述
Airflow是一个以编程方式编写用于管理和调度工作流的平台。可以帮助你定义复杂的工作流程,然后在集群上执行和监控这些工作流。
Airflow计划程序在遵循指定的依赖项同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变的轻而易举。Airflow的可扩展Python框架可以让你构建连接几乎任何技术的工作流程。丰富的用户界面可以随时查看生产中正在运行的管道帮助你管理工作流程的状态监视进度以及需要时对问题进行故障排除。
Airflow的主要组件有
DAG(有向无环图)使用Airflow将工作流编写任务的有向无环图DAG。一个DAG定义了一个工作流,它包含所有任务、任务的依赖关系和时间表。
任务(Task)一个任务定义了一个单独的单元工作,有一个确定的开始和结束。一个任务可以依赖于其他任务。
运算符(Operator)一个运算符封装了一个任务,并定义了它的执行逻辑。Airflow内置了许多运算符,如BashOperator、PythonOperator、EmailOperator等。你也可以自定义运算符。
时间轴(Timeline)时间轴让你以图形方式查看 DAG 的运行情况和状态。
调度器(Scheduler)调度器监视时间轴并触发需要运行的任务。
执行器(Executor)executor负责实际运行任务。Airflow支持多种executor如LocalExecutor, CeleryExecutor, KubernetesExecutor 等。
2 名词
1DynamicAirflow管道是用Python代码配置的,允许动态生成管道。Airflow配置需要使用Python这允许编写可动态实例化管道的代码。
2ExtensibleAirflow框架包含许多运算符来连接各种技术。Airflow的所有组件都是可扩展的。轻松定义自己的运算符执行程序并扩展库使其适合于您的环境。
3ElegantAirlfow是精简灵活的使用功能强大的Jinja模板引擎将脚本参数化内置于Airflow的核心中。
4ScalableAirflow具有模板块架构并使用消息队列来安排任意数量的工作任务。 3 airflow优缺点
优点
Python脚本实现DAG非常容易扩展
可实现复杂的依赖规则
外部依赖较少搭建容易仅依赖DB和rabbitmq
工作流依赖可视化。有一套完整的UI可视化展现所有任务的状态及历史信息本人刚开始主要看重这点
完全支持crontab定时任务格式可以通过crontab格式指定任务何时进行
业务代码和调度系统解耦每个业务的流程代码以独立的Python脚本描述里面定义了流程化的节点来执行业务逻辑支持任务的热加载.
缺点
Airflow是为有限的批处理工作流构建的。虽然CLI和REST API确实允许触发工作流但Airflow不是为无限运行的基于事件的工作流构建的。Airflow不是流解决方案。然而像Apache Kafka这样的流系统通常与Apache Airflow一起使用。Kafka可以用于实时接收和处理事件数据事件数据被写入存储位置Airflow定期启动处理一批数据的工作流。
如果你更喜欢点击而不是编码Airflow可能不是正确的解决方案。Web界面旨在最大限度地简化工作流的管理Airflow框架不断改进以最大限度地简化开发人员体验。然而Airflow的理念是将工作流定义为代码所以代码始终是必需的。
4 Airflow安装
airflow官网地址https://airflow.apache.org。
1先安装并配置好python环境可以参考Anaconda安装即可如果项目不需要依赖太多工具包可选择更简洁的MiniConda并激活。
2安装airflow
pip install apache-airflow
3初始化airflow
airflow db init
4查看版本
airflow version
5启动airflow web服务,启动后浏览器访问http://ip_address:12025如果不知道ip地址的就用ifconfig命令去linux下获取
airflow webserver -p 12025 -D
6启动airflow调度
airflow scheduler -D
7创建账号(斜杠别忘记了)
airflow users create \ --username admin \ --firstname trisyp \ --lastname trisyp \ --role Admin \ --email trisypemail.com
回车之后会让你输入两次password我们就用123456
8启动停止脚本
vim af.sh #!/bin/bash case $1 in
start){ echo --------启动 airflow------- ssh ip_address conda activate airflow;airflow webserver -p 12025 -D;airflow scheduler -D; conda deactivate
};;
stop){ echo --------关闭 airflow------- ps -ef|egrep scheduler|airflow-webserver|grep -v grep|awk {print $2}|xargs kill -15
};;
esac
添加权限即可使用。
trisypip_address bin]$ chmod x af.sh
5 修改数据库为MySQL
1先在MySQL中建库
mysql CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
2如果报错Linux error:1425F102:SSL routines:ssl_choose_client_version:unsupported protocol可以关闭MySQL的SSL证书
查看SSL是否开启 YES为开启
mysql SHOW VARIABLES LIKE %ssl%;
--------------------------------
| Variable_name | Value |
--------------------------------
| have_openssl | YES |
| have_ssl | YES |
| ssl_ca | ca.pem |
| ssl_capath | |
| ssl_cert | server-cert.pem |
| ssl_cipher | |
| ssl_crl | |
| ssl_crlpath | |
| ssl_key | server-key.pem |
--------------------------------
3修改配置文件my.cnf注意直接数据库修改值不起作用加入以下内容
# disable_ssl
skip_ssl
4添加python连接的依赖官网介绍的方法有两种 这里我们选择mysqlmysqlconnector。
pip install mysql-connector-python
5修改airflow的配置文件(vim ~/airflow/airflow.cfg)
[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
# More information here:
# http://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri
#sql_alchemy_conn sqlite:home/trisyp/airflow/airflow.db
sql_alchemy_conn mysqlmysqlconnector://root:123456ip_address:3306/airflow_db
6关闭airflow初始化后重启
af.sh stop
airflow db init
af.sh start
7若初始化报错1067 - Invalid default value for ‘update_at’
原因字段 update_at 为 timestamp类型取值范围是1970-01-01 00:00:00 到 2037-12-31 23:59:59UTC 8 北京时间从1970-01-01 08:00:00 开始而这里默认给了空值所以导致失败。
推荐修改mysql存储时间戳格式
mysql set GLOBAL sql_mode STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
重启MySQL会造成参数失效注意这样就需要重新创建账号推荐将参数写入到配置文件my.cnf中。
sql_mode STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION 6 修改执行器
官网不推荐在开发中使用顺序执行器会造成任务调度阻塞。 1修改airflow的配置文件(vim ~/airflow/airflow.cfg)
[core]
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor,
# KubernetesExecutor, CeleryKubernetesExecutor or the
# full import path to the class when using a custom executor.
executor LocalExecutor
可以使用官方推荐的几种执行器也可以自定义。这里我们选择本地执行器即可。
7 部署使用
1测试环境启动
本次测试使用的是spark的官方案例所有需要启动hadoop和spark的历史服务器。
myhadoop.sh start
cd /opt/module/spark-yarn/sbin/start-history-server.sh
2查看Airflow配置文件
vim ~/airflow/airflow.cfg 3编写.py脚本创建work-py目录用于存放python调度脚本
mkdir ~/airflow/dags
cd dags/
然后把脚本文件放到dags文件夹代码如下
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta default_args { # 设置默认参数。 # 用户 owner: test_owner, # 是否开启任务依赖 depends_on_past: True, # 邮箱 email: [trisypemail.com], # 启动时间 start_date:datetime(2022,11,28), # 出错是否发邮件报警 email_on_failure: False, # 重试是否发邮件报警 email_on_retry: False, # 重试次数 retries: 3, # 重试时间间隔 retry_delay: timedelta(minutes5),
}
# 声明任务图schedule_interval调度频率。
dag DAG(test, default_argsdefault_args, schedule_intervaltimedelta(days1)) # 创建单个任务
t1 BashOperator( # BashOperator具体执行任务如果为true前置任务必须成功完成才会走下一个依赖任务如果为false则忽略是否成功完成。 # 任务id任务唯一标识必填。 task_iddwd, # 具体任务执行命令。 bash_commandssh ip_address /opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 , # 重试次数 retries3, # 把任务添加进图中 dagdag) t2 BashOperator( task_iddws, bash_commandssh ip_address /opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 , retries3, dagdag) t3 BashOperator( task_idads, bash_commandssh ip_address /opt/module/spark-yarn/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark-yarn/examples/jars/spark-examples_2.12-3.1.3.jar 10 , retries3, dagdag) # 设置任务依赖ads任务依赖dws任务依赖dwd任务。
t2.set_upstream(t1)
t3.set_upstream(t2)
4等待一段时间刷新任务列表
airflow dags list 5已出现myairflow_execute_bash任务刷新页面 6点击运行 7查看dag图、甘特图点击成功任务查看日志 8查看脚本代码 9Dag任务操作
9.1 删除Dag任务
主要删除DAG任务不会删除底层文件过一会还会自动加载回来。 9.2 查看当前所有dag任务
# 查看所有任务
airflow dags list
# 查看单个任务
airflow tasks list test --tree 8 配置邮件服务器
1保证邮箱已开SMTP服务 2修改airflow配置文件用stmps服务对应587端口
vim ~/airflow/airflow.cfg
smtp_host smtp.qq.com
smtp_starttls True
smtp_ssl False
smtp_user trisypemail.com
# smtp_user
smtp_password qluxdbuhgrhgbigi
# smtp_password
smtp_port 587
smtp_mail_from trisypemail.com
3重启airflow
af.sh stop
af.sh start
4编辑test.py脚本加入emailOperator
from airflow.operators.email_operator import EmailOperator emailEmailOperator( task_idemail, toyaohm163163.com , subjecttest-subject, html_contenth1test-content/h1, cctrisypemail.com , dagdag) t2.set_upstream(t1)
t3.set_upstream(t2)
email.set_upstream(t3)
5查看页面是否生效 6运行测试 9 避坑指南
1Exception rendering Jinja template for task 2Intel MKL FATAL ERROR: Cannot load ../numexpr/../../../libmkl_rt.so.1.
强制更新airflow到最新版 3error: subprocess-exited-with-error 解决方案
错误有明确的提示缺少pkg-config所以就先安装这个包然后在安装mysqlclient。
sudo apt-get install pkg-config 4Cant connect to local MySQL server through socket /tmp/mysql.sock (2) 解决方案
先用命令“find / -name ‘mysql.sock”来查看下这个文件所在目录如果有就建立软连接不要想着拷贝复制无效的命令是“ln -s /tmp/mysql.sock”。如果没有就找my.cnf文件一般文件地址为/etc/mysql/my.cnf然后通过vim加上socket路径信息一定要加mysqld这个分组不然会报找不到分组这个错Found option without preceding group 5Segmentation fault (core dumped) 解决方案
在配置mysql存储的时候要加上mysqlconnector就解决了。这个坑非常恶心你参照某些教程直接只配mysql忽视了connector碰到了还找不到解决方案因为核心存储转移你不知道怎么搞。 cd /etc
vim profile
加入
export AIRFLOW_HOME/root/airflow sudo mysql
create database airflow_db;
create user airflow% identified by 123456;
grant all on airflow_db .* to airflow%; sql_alchemy_conn mysql://airflow:12345610.0.0.22:3306/airflow_db 10 参考链接 https://yuchaoshui.com/1bd10cc/