最近在弄画像标签每天etl的调度事情,这篇文章分享一下一个开源的etl工具airflow。
一、基础概念
airflow是airbnb内部发起并开源的一个etl管理平台,使用python编写实现的任务管理、调度、监控工作流平台。这是其官方文档地址:apache airflow (incubating) documentation,关于airflow产品的使用,里面有详细的介绍。
airflow的调度依赖于crontab命令,与crontab相比airflow可以直观的看到任务执行情况、任务之间的逻辑依赖关系、可以设定任务出错时邮件提醒、可以查看任务执行日志。
而crontab命令管理的方式存在以下几方面的弊端:
1、在多任务调度执行的情况下,难以理清任务之间的依赖关系;
2、不便于查看当前执行到哪一个任务;
3、任务执行失败时不便于查看执行日志,也即不方便定位报错的任务和错误原因;
4、不便于查看调度流下每个任务执行的起止消耗时间,这对于优化task作业是非常重要的;
5、不便于记录历史调度任务的执行情况,而这对于优化作业和错误排查是很重要的;
airflow中有两个最基本的概念:dag和task,下面主要介绍一下。
dag是什么:
dag是directed acyclic graph的缩写,即有向无环图。是所有要执行任务脚本(即task)的集合,在这个dag中定义了各个task的依赖关系、调度时间、失败重启机制等。通过dagid来标识每个dag任务
每个dag是由1到多个task组成
task是什么:
task是具体执行的任务脚本,可以是一个命令行(bashoperator),也可以是python脚本等。
二、主要功能键介绍
1、dag管理
在airflow的主页,可以看到当前所有的dag列表(通俗点说就是所有的调度任务列表),中间“task by state”那一列显示任务的执行状态。深绿色的表示已执行成功的task,浅绿色的表示当前正在执行的task。
右侧“links”那一列可以链接查看当前dag任务的依赖关系、执行时间、执行脚本等情况。
当点击具体某一个dag任务时,就可以进去查看该dag的调度依赖、执行时长、调度脚本等具体执行情况
2、调度依赖查看
通过“graph view”选项可以查看当前调度任务的依赖关系,当调度作业较为复杂时,这种图形化方式展示的依赖关系可以帮助用户迅速理清。
在用户画像的调度管理中,每天需要执行cookieid和userid两个维度的画像脚本,因此可以设定并行执行任务,让cookieid和userid的脚本同时执行调度作业
3、执行状态
通过“tree view”选项可以查看当前任务的执行状态,包括当前执行到哪一个task,还有哪些task未执行。哪些task执行成功,哪些task执行失败。
也可以查看历史上该dag下面各task的执行情况。
4、各task执行时间
通过“gantt”选项可以查看各task任务的执行起止时间的甘特图。
了解各task执行的时间可以有针对性地优化执行时间长的task对应脚本。
5、dag调度脚本
通过“code”选项,可以查看当前dag调度的脚本。脚本里面定义了需要执行的task、执行顺序及依赖、调度时间、失败发送邮件或重调机制等方法
三、脚本实例
在开发过程中,task脚本是需要被调度的脚本,在airflow中主要需要开发的是dag脚本,即管理task任务的脚本。通过一个dag脚本,将各个调度作业脚本串起来,按照业务逻辑去执行。
1、dag脚本
下面通过一个具体dag脚本实例来了解一下:
from airflow.operators.bash_operator import bashoperator import airflow from airflow.models import dag from airflow import operators from airflow.contrib.hooks import sshhook from airflow.models import baseoperator from airflow.contrib.operators import sshexecuteoperator from airflow.operators.latest_only_operator import latestonlyoperator import os import sys from datetime import timedelta,date,datetime import pendulum from airflow.utils.trigger_rule import triggerrule default_args = { 'owner': 'superuserprofile', 'depends_on_past': false, 'start_date': datetime(2018, 06, 01), 'email': ['administer@testemail.com'], 'email_on_failure': true , 'email_on_retry': true, 'retries': 1, 'retry_delay': timedelta(minutes=1), } os.environ['spark_home'] = '/usr/local/spark-2.1.1-bin-hadoop2.6' sys.path.append(os.path.join(os.environ['spark_home'], 'bin'))
该段脚本定义了需要引入的包,以及默认的dag参数配置,包括task是否依赖上游任务,首次调度时间、任务失败接收邮箱、任务失败是否重新调起等
dag = dag( 'superuserprofile', default_args=default_args, description='a userprofile test', schedule_interval='00 08 * * *' )
该段脚本实例化了dag,设置了dagid,调度执行时间
gender_task = bashoperator( task_id='gender', bash_command=' sudo -e -h -u userprofile spark-submit --master yarn --deploy-mode client --driver-memory 1g --executor-memory 8g --executor-cores 2 --num-executors 200 /airflow/userprofile_gender.py {{ ds_nodash }} ', dag=dag, trigger_rule=triggerrule.all_done ) country_task = bashoperator( task_id='country', bash_command=' sudo -e -h -u userprofile spark-submit --master yarn --deploy-mode client --driver-memory 1g --executor-memory 4g --executor-cores 2 --num-executors 200 /airflow/userprofile_country.py {{ ds_nodash }} ', dag=dag, trigger_rule=triggerrule.all_done )
该段脚本设置了两个需要执行的task任务(userprofile_gender.py和userprofile_country.py)的实例化。
task直接的调度依赖关系可以通过set_upstream、set_downstream命令或符号>> 、<> country_task 命令指country_task 任务将依赖gender_task 任务先执行完,反之同理
2、命令行执行
airflow通过可视化界面的方式实现了调度管理的界面操作,但在测试脚本或界面操作失败的时候,可通过命令行的方式调起任务。下面介绍几个常用命令
命令1:airflow list_tasksuserprofile
该命令用于查看当前dag任务下的所有task的列表
其中userprofile是dagid,加粗的airflow list_tasks是关键字命令
-----------------------------------------------------------------------
命令2:airflow testuserprofile gender_task 20180601
该命令用于单独执行dag下面的某个task
其中userprofile是dagid,gender_task是要具体某个taskid,20180601是执行日期。加粗部分是关键字命令
-----------------------------------------------------------------------
命令3:airflow backfill -s2018-06-01-e2018-06-02 userprofile
该命令用于调起整个dag脚本执行
其中2018-06-01是执行脚本的开始日期, 2018-06-02是结束日期,userprofile是dagid,加粗部分是关键字命令。
骨传导耳机哪个好?骨传导耳机排行榜
基于JFET 2N5457的吉他前置放大器电路
如何焊接RGBWLED灯条
我国家电行业已进入成熟期,家电市场零售额逐月上扬
RNN基础知识介绍 为什么需要RNN
一个开源的ETL工具Airflow
微分放大电路的设计及计算例题
低压配电柜选型_低压配电柜组成_低压配电柜安装规范
脉冲信号采集系统|多通道高速脉冲采集软件NSAT-4000
多方面分析超级电容
2G、3G退网已成定局,Cat 1前途不可限量
工业大数据发展面临哪些挑战
鸿蒙系统概念股谁最有潜力?最核心概念股名单
从户外电源到储能柜,储能行业应用彻底爆发!
2亿只节能灯的后遗症
ThreadLocal的短板,我TTL来补!
电脑显卡与主板接口类型匹配解析
选购电动工具注意事项
疫情之下红外体温计需求猛增,科普红外体温计的应用
为迎接智能汽车发展新趋势,南京发布“四新”行动计划