做神马网站优化快速,php网站是什么数据库文件,做网站jsp和php,制作网页完整步骤1、什么是Airflow
Airflow 是一个 Airbnb 的 Workflow 开源项目#xff0c;使用Python编写实现的任务管理、调度、监控工作流平台。Airflow 是基于DAG(有向无环图)的任务管理系统#xff0c;可以简单理解为是高级版的crontab#xff0c;但是它解决了crontab无法解决的任务…1、什么是Airflow
Airflow 是一个 Airbnb 的 Workflow 开源项目使用Python编写实现的任务管理、调度、监控工作流平台。Airflow 是基于DAG(有向无环图)的任务管理系统可以简单理解为是高级版的crontab但是它解决了crontab无法解决的任务依赖问题。与crontab相比Airflow可以方便查看任务的执行状况执行是否成功、执行时间、执行依 赖等可追踪任务历史执行情况任务执行失败时可以收到邮件通知查看错误日志。
2、Airflow与同类产品的对比 系统名称 介绍 Apache Oozie 使用XML配置, Oozie任务的资源文件都必须存放在HDFS上. 配置不方便同时也只能用于Hadoop. Linkedin Azkaban web界面尤其很赞, 使用java properties文件维护任务依赖关系, 任务资源文件需要打包成zip, 部署不是很方便. Airflow 具有自己的web任务管理界面dag任务创建通过python代码可以保证其灵活性和适应性
3、Airflow基础概念 1DAG有向无环图Directed Acyclic Graph描述数据流的计算过程。
2OperatorsDAG中一个Task要执行的任务如①BashOperator为执行一条bash命令②EmailOperator用于发送邮件③HTTPOperator用于发送HTTP请求④PythonOperator用于调用任意的Python函数。
3Task是DAG中的一个节点是Operator的一个实例。
4Task Instance记录Task的一次运行Task Instance有自己的状态包括running、success、failed、 skipped、up for retry等。
5Trigger Rulestask的触发条件。
4 、Airflow安装
依赖yum -y install python-devel libevent-devel mysql-devel mysqlclient
1安装airflowpip install apache-airflow
2修改airflow对应的环境变量export AIRFLOW_HOME/usr/local/airflow
3执行airflow version在/usr/local/airflow目录下生成配置文件
4修改默认数据库修改/usr/local/airflow/airflow.cfg [core] executor LocalExecutor sql_alchemy_conn mysql://airflow:123456192.168.48.102:3306/airflow
5创建airflow用户,创建airflow数据库并给出所有权限给次用户
create database airflow; create user tairflow% identified by 123123; GRANT all privileges on airflow.* TO testairflow% IDENTIFIED BY 123456; FLUSH PRIVILEGES;
6初始化数据库airflow initdb
7启动web服务器airflow webserver –p 8080
在安装过程中如遇到如下错误 在my.cnf中加explicit_defaults_for_timestamp1然后重启数据库 5、Airflow主要功能模块
下面通过Airflow调度任务管理的主界面了解一下各个模块功能这个界面可以查看当前的DAG任务列表有多少任务运行成功失败以及正在当前运行中等 在Graph View中查看DAG的状态。DAG是一个有向无环图它是一个单向流动的ETL流程图。只有前置task执行成功后后续task才会被Trigger如果后续task有并行分支会被同时Trigger执行。
对于已经执行完的task鼠标停留在task上面会自动浮现出一个黑色的提醒框显示该task的基本情况。①Airflow当前UTC时间②默认显示一个与①一样的时间自动跟随①的时间变动而变动③DAG当前批次触发的时间也就是Dag Run时间没有什么实际意义④数字4该task开始执行的时间⑤该task开始执行和结束执行的UTC时间⑥该task开始执行和结束执行的CST时间也就是香港本地时间。
Airflow中每一个task可能有8种状态使用8种不同的颜色标注分别是success、running、failed、skipped、up_for_reschedule、up_for_retry、queued、no_status。每一个task被调度执行前都是no_status状态当被调度器传入作业队列之后状态被更新为queued被调度器调度执行后状态被更新为running如果该task执行失败如果没有设置retry参数状态立马被更新为failed如果有设置retry参数第一次执行失败后会被更新为up_for_retry状态等待重新被调度执行执行完retry次数仍然失败则状态会被更新为failedskipped状态是指该task被跳过不执行up_for_reschedule状态是指等待重新调度 每点击一个button可以跳转到对应页面查看这个task对应的Task Instance Details、Rendered、Task Instance、Log
Run可以单次执行该task右边3个button是执行task时可以选择的条件鼠标停留在每一个条件上会显示该条件表示的含义。选择Ignore All Deps表示忽略该task的前后依赖条件及之前批次的执行状态直接执行该task。
Clear表示可以清除当前task的执行状态清除执行状态后该task会被自动重置为no_status等待Airflow调度器自动调度执行Downstream和Recursive是默认选中的是当你点击Clear后当前task及所有后置task的状态都会被清除即当前task及所有后置task都会重新等待调度执行如果同时选中Upstream和Recursive点击Clear后则表示从Dag第一个task到当前task这条路径上的所有task会被重新调度执行
点击Clear按钮后会将当前task及所有后续task作业的task id打印出来。点击OK后Airflow会将这些task的最近一次执行记录清除然后将当前task及后续所有task生成新的task instance将它们放入队列由调度器调度重新执行 以树状的形式查看各个Task任务的调度如下图 显示DAG调度持续的时间 甘特图显示每个任务的起止、持续时间
】
配置DAG运行的默认参数 查看DAG的调度脚本 6、DAG脚本示例
以官网的脚本为例进行说明
from datetime import timedelta
# The DAG object; well need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args {owner: airflow,depends_on_past: False,start_date: days_ago(2),email: [airflowexample.com],email_on_failure: False,email_on_retry: False,retries: 1,retry_delay: timedelta(minutes5),# queue: bash_queue,# pool: backfill,# priority_weight: 10,# end_date: datetime(2016, 1, 1),# wait_for_downstream: False,# dag: dag,# sla: timedelta(hours2),# execution_timeout: timedelta(seconds300),# on_failure_callback: some_function,# on_success_callback: some_other_function,# on_retry_callback: another_function,# sla_miss_callback: yet_another_function,# trigger_rule: all_success
}
dag DAG(tutorial,default_argsdefault_args,descriptionA simple tutorial DAG,schedule_intervaltimedelta(days1),
)# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 BashOperator(task_idprint_date,bash_commanddate,dagdag,
)t2 BashOperator(task_idsleep,depends_on_pastFalse,bash_commandsleep 5,retries3,dagdag,
)
dag.doc_md __doc__t1.doc_md \
#### Task Documentation
You can document your task using the attributes doc_md (markdown),
doc (plain text), doc_rst, doc_json, doc_yaml which gets
rendered in the UIs Task Instance Details page.
templated_command
{% for i in range(5) %}echo {{ ds }}echo {{ macros.ds_add(ds, 7)}}echo {{ params.my_param }}
{% endfor %}
t3 BashOperator(task_idtemplated,depends_on_pastFalse,bash_commandtemplated_command,params{my_param: Parameter I passed in},dagdag,
)t1 [t2, t3]
1需要引入的包 2DAG默认参数配置
①depends_on_past是否依赖上游任务即上一个调度任务执行失 败时该任务是否执行。可选项包括True和FalseFalse表示当前执 行脚本不依赖上游执行任务是否成功
②start_date表示首次任务的执行日期
③email设定当任务出现失败时用于接受失败报警邮件的邮箱地址
④email_on_failure当任务执行失败时是否发送邮件。可选项包括 True和FalseTrue表示失败时将发送邮件
⑤retries表示执行失败时是否重新调起任务执行1表示会重新调起
⑥retry_delay表示重新调起执行任务的时间间隔 3实例化DAG
设定该DAG脚本的id为tutorial 设定每天的定时任务执行时间为一天调度一次。
调度时间还可以以“* * * * *”的形式表示执行时间分别是“分时天月年”
注意① Airflow使用的时间默认是UTC的当然也可以改成服务器本地的时区。
②*/30 * * * * 指的是每个小时的30分的时候调度而不是半小时一次比如说1:30 , 2:30 ...
半小时调度一次的写法应该是0/30 * * * 4Operator即Task要执行的任务
段脚本中引入了需要执行的task_id并对dag 进行了实例化。
里面的bash_command参数是对于具体执行这个task任务的脚本或命令。
还有Trigger_rule参数为该task任务执行的触发条件官 方文档里面该触发条件有5种状态一般常用的包括 “ ALL_DONE ” 和 ”ALL_SUCCESS” 两 种 。 其中 “ALL_DONE”为当上一个task执行完成该task即 可执行而”ALL_SUCCESS”为只当上一个task执行成功时该task才能调起执行执行失败时本 task不执行任务。 5Task脚本的调度顺序
t1 [t2, t3]命令为task脚本的调度顺序在该命令中先执行“t1” 任务后执行“t2, t3”任务。
一旦Operator被实例化它被称为“任务”。实例化为在调用抽象Operator时定义一些特定值参数化任务使之成为DAG中的一个节点。 调度顺序的其他表示方式①t1 t2 等价于t1.set_downstream(t2) 表示t1任务先执行②t1 t2 等价于t1.set_upstream(t2) 表示t2任务先执行
7 Airflow常用命令行
Airflow通过可视化界面的方式实现了调度管理的界面操作但在测试脚本或界面操作失败的时候可通过命令行的方式调起任务。下面介绍几个常用的命令
命令描述airflow list_tasks userprofile用于查看当前DAG任务下的所有task列表其中userprofile是DAG名称airflow test userprofile age_task 20200101 用于测试DAG下面某个task是否能正常执行其中userprofile是DAG名称age_task是其中一个task名称airflow backfill -s 2020-01-01 -e 2020-01-02 userprofile用于调起整个DAG脚本执行任务其中userprofile是DAG名称2020-01-01是脚本执行的开始日期