营销型网站建设要懂代码吗,室内装修设计软件推荐,2020新闻大事件摘抄,设计师培训招生视频# Airflow 1.10安装本次安装Airflow版本为1.10#xff0c;其需要依赖Python和DB#xff0c;本次选择的DB为Mysql。本次安装组件及版本如下#xff1a;Airflow 1.10.0Python 3.6.5Mysql 5.7# 整体流程1. 建表2. 安装3. 配置4. 运行5. 配置任务启动scheduleairflow schedul…# Airflow 1.10安装本次安装Airflow版本为1.10其需要依赖Python和DB本次选择的DB为Mysql。本次安装组件及版本如下Airflow 1.10.0Python 3.6.5Mysql 5.7# 整体流程1. 建表2. 安装3. 配置4. 运行5. 配置任务启动scheduleairflow scheduler -D启动webserverairflow webserver -Dps -ef|grep -Ei (airflow-webserver)| grep master | awk {print $2}|xargs -i kill {}ps -ef | grep -Ei airflow | grep -v grep | awk {print $2} | xargs -i kill {}## 建库、建用户库名为airflowcreate database airflow;建用户用户名为airflow并且设置所有ip均可以访问。create user airflow% identified by airflow;create user airflowlocalhost identified by airflow;用户授权这里为新建的airflow用户授予airflow库的所有权限grant all on airflow.* to airflow%;flush privileges## Airflow安装这里通过 virtualenv 进行安装。----- 通过virtualenv安装$ mkdir /usr/local/virtual_env cd /usr/local/virtual_env # 创建目录$ virtualenv --no-site-packages airflow --pythonpython # 创建虚拟环境$ source /usr/local/virtual_env/airflow/bin/activate # 激活虚拟环境----- 安装指定版本或者默认$ pip install apache-airflow -i https://pypi.douban.com/simple在安装完一堆的依赖后就需要配置 AIRFLOW_HOME 环境变量后续的 DAG 和 Plugin 都将以该目录作为根目录查找如上可以直接设置为 /tmp/project 。报错ERROR: flask 1.1.1 has requirement Jinja22.10.1, but youll have jinja2 2.10 which is incompatible.ERROR: flask 1.1.1 has requirement Werkzeug0.15, but youll have werkzeug 0.14.1 which is incompatible.执行pip3 install -U Flask1.0.4执行pip3 install -U pika0.13.1重新执行 pip install apache-airflow -i https://pypi.douban.com/simple----- 设置环境变量(airflow) $ export AIRFLOW_HOME/tmp/airflow----- 查看其版本信息(airflow) $ airflow version ____________ _____________ ____ |__( )_________ __/__ /________ ______ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.8.0执行了上述的命令后会生成 airflow.cfg 和 unittests.cfg 两个文件其中前者是一个配置文件 。## airflow 配置----- 修改Airflow DB配置### 1. 安装Mysql模块pip install apache-airflow[mysql]这里可以简单说下airflow依赖的其他组件均可以此方式安装。在之后安装password组件同样是通过此方式。修改Airflow DB配置修改${AIRFLOW_HOME}/airflow.cfgsql_alchemy_conn mysqlmysqldb://airflow:airflowlocalhost:3306/airflow参数的格式为mysql://帐号:密码ip:port/db初始化db新建airflow依赖的表。airflow initdb如报错 Cant connect to local MySQL server through socket /var/lib/mysql/mysql.sock (2)需改sql_alchemy_conn mysqlmysqldb://airflow:airflow127.0.0.1:3306/airflow### 2. 用户认证本文采用的用户认证方式为password方式其他方式如LDAP同样支持但是本文不会介绍。笔者在安装时实验过LDAP方式但是未成功过。安装passsword组件pip install apache-airflow[password]2. 修改 airflow.cfg[webserver]authenticate Trueauth_backend airflow.contrib.auth.backends.password_auth3. 在python环境中执行如下代码以添加账户import airflow from airflow import models, settings from airflow.contrib.auth.backends.password_auth import PasswordUser user PasswordUser(models.User()) user.username admin # 用户名user.email emailExample163.com # 用户邮箱 user.password password # 用户密码session settings.Session() session.add(user) session.commit() session.close() exit() ### 3. 配置邮件服务此配置设置的是dag的task失败或者重试时发送邮件的发送者。配置如下[smtp]# If you want airflow to send emails on retries, failure, and you want to use# the airflow.utils.email.send_email_smtp function, you have to configure ansmtp_host smtp.163.comsmtp_starttls Truesmtp_ssl False# Uncomment and set the user/pass settings if you want to use SMTP AUTHsmtp_user mailExample163.comsmtp_password passwordsmtp_port 25smtp_mail_from mailExample163.com接下来简单把dag的Python代码列出来以供参考default_args { owner: ownerExample, start_date: datetime(2018, 9, 18), email: [mailReceiver163.com], # 出问题时发送报警Email的地址可以填多个用逗号隔开。 email_on_failure: [mailReceiver163.com], # 任务失败且重试次数用完时发送Email。 email_on_retry: True, # 任务重试时是否发送Email depends_on_past: False, # 是否依赖于过去。如果为True那么必须要昨天的DAG执行成功了今天的DAG才能执行。 retries: 3, retry_delay: timedelta(minutes3),}### 4、配置Executor设置Executor修改airflow.cfgexecutor LocalExecutor本文中由于只有单节点所以使用的是LocalExecutor模式。### 5. 修改log地址[core]base_log_folder /servers/logs/airflow[scheduler]child_process_log_directory servers/logs/airflow/scheduler### 6. 修改webserver地址修改webserver地址[webserver]base_url http://host:port可以通过上面配置的地址访问webserver。### 7. 可选配置可选修改Scheduler线程数如果调度任务不多的话可以把线程数调小默认为32。参数为parallelism可选不加载example dag如果不想加载示例dag可以把load_examples配置改为False默认为True。这个配置只有在第一次启动airflow之前设置才有效。如果此方法不生效可以删除${PYTHON_HOME}/site-packages/airflow/example_dags目录也是同样的效果。可选修改检测新dag间隔修改min_file_process_interval参数为10每10s识别一次新的dag。默认为0没有时间间隔。## 运行airflow启动scheduleairflow scheduler 启动webserverairflow webserver ## 安装问题汇总1. Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql修改Mysql配置文件my.cnf具体步骤如下查找my.cnf文件位置mysql --help | grep my.cnf下图红框处为my.cnf文件所在位置修改文件explicit_defaults_for_timestamptrue注意必须写在【mysqld】下重启Mysqlsudo systemctl restart mysqld.service查看修改是否生效。执行如下SQL如果值为1则为生效。2. pip install apache-airflow[mysql]报错mysql_config not found安装mysql-devel:首先查看是否有mysql_config文件。find / -name mysql_config如果没有安装mysql-develyum install mysql-devel安装之后再次查找结果如图3. 其他问题找我## 配置任务在 AirFlow 中每个节点都是一个任务可以是一条命令行 (BashOperator)可以是一段 Python 脚本 (PythonOperator) 等等然后这些节点根据依赖关系构成了一条流程一个图称为一个 DAG 。默认会到 ${AIRFLOW_HOME}/dags 目录下查找可以直接在该目录下创建相应的文件。如下是一个简单的示例。import airflowfrom airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom airflow.operators.python_operator import PythonOperatorfrom datetime import timedelta, datetimeimport pytz# -------------------------------------------------------------------------------# these args will get passed on to each operator# you can override them on a per-task basis during operator initializationdefault_args { owner: qxy, depends_on_past: False, email_on_failure: False, email_on_retry: False, retries: 1, retry_delay: timedelta(minutes5),}tz pytz.timezone(Asia/Shanghai)# naive datetime.strptime(2018-06-13 17:40:00, %Y-%m-%d %H:%M:%S)# local_dt tz.localize(naive, is_dstNone)# utc_dt local_dt.astimezone(pytz.utc).replace(tzinfoNone)dt datetime(2019, 7, 16, 16, 30, tzinfotz)utc_dt dt.astimezone(pytz.utc).replace(tzinfoNone)dag DAG( airflow_interval_test, default_argsdefault_args, descriptionairflow_interval_test, schedule_interval35 17 * * *, start_dateutc_dt)t1 BashOperator( task_idsleep, bash_commandsleep 5, dagdag)t2 BashOperator( task_idprint_date, bash_commanddate, dagdag)t1 t2该文件创建一个简单的 DAG只有三个运算符两个 BaseOperator 也就是执行 Bash 命令分别打印日期以及休眠 5 秒另一个为 PythonOperator 在执行任务时调用 print_hello() 函数。文件创建好后放置到 ${AIRFLOW_HOME}/dagsairflow 自动读取该DAG。----- 测试是否正常如果无报错那么就说明正常$ python /tmp/project/dags/hello_world.py转载于:https://www.cnblogs.com/duanhaoxin/p/11211815.html