网站关键词提升,wordpress添加用户,帝国cms灵动标签做网站地图,中铁建设集团门户网登录官网查询#x1f680; 优质资源分享 #x1f680;
学习路线指引#xff08;点击解锁#xff09;知识定位人群定位#x1f9e1; Python实战微信订餐小程序 #x1f9e1;进阶级本课程是python flask微信小程序的完美结合#xff0c;从项目搭建到腾讯云部署上线#xff0c;打造一… 优质资源分享
学习路线指引点击解锁知识定位人群定位 Python实战微信订餐小程序 进阶级本课程是python flask微信小程序的完美结合从项目搭建到腾讯云部署上线打造一个全栈订餐系统。Python量化交易实战入门级手把手带你打造一个易扩展、更安全、效率更高的量化交易系统
dolphinscheduler简单任务定义及跨节点传参
转载请注明出处 https://blog.csdn.net/funnyzpc/p/16395094.html
写在前面
dolphinscheduler是一款非常不错的调度工具本文我就简称ds啦可单机可集群可容器可调度sql、存储过程、http、大数据也可使用shell、python、java、flink等语言及工具功能强大类型丰富适合各类调度型任务社区及项目也十分活跃现在github中已有8.2k的star 所以本篇博文开始会逐步讲一些ds相关的东西也期待各位同行能接触到此并能实际解决一些生产上的问题
一.准备工作
| | 阅读本博文前建议您先阅读下官方的文档[https://dolphinscheduler.apache.org/zh-cn/docs/latest/user\_doc/guide/parameter/context.html)(https://dolphinscheduler.apache.org/zh-cn/docs/latest/user\_doc/guide/parameter/context.html)虽然也会碰到一些坑 |
| | 这里先准备下sql表资源以下为postgresql的sql脚本: |
表结构
| | |
| | CREATE TABLE dolphinscheduler.tmp ( |
| | id int4 NOT NULL, |
| | name varchar(50) NULL, |
| | label varchar(50) NULL, |
| | update\_time timestamp NULL, |
| | score int4 NULL, |
| | CONSTRAINT tmp\_pkey PRIMARY KEY (id) |
| | ); |
表数据
| | INSERT INTO tmp (id,name,label,update\_time,score) VALUES |
| | (3,二狗子,,2022-07-06 21:49:26.872,NULL), |
| | (2,马云云,,NULL,NULL), |
| | (1,李思,,2022-07-05 19:54:31.880,85); |
因为个人使用的postgresql的数据库如果您是mysql或者其他数据的用户请自行更改以上表和数据并添加到库中即可 表及数据入库请将tmp所属的库配置到 ds后台-数据源中心-创建数据源 以下是我的配置记住这里面的所有数据库配置均遵守所属数据库类型的jdbc的driver的配置参数配置完成也会在ds的数据库生成一条jdbc的连接地址这点要明白
二.简单的项目创建及说明
| | 因为ds的任务是配置在项目下面所以第一步得新建一个项目这样ds后台-项目管理-创建项目这是我创建的请看 | 准备完项目之后鼠标点进去并进入到 工作流定义菜单 页面如下图
先简单到解释下ds的一点儿基本结构首先ds一般部署在linux服务器下创建任务的用户需要在admin账户下创建重要的是创建的每个工作账户需要与操作系统用户一一对应比如你创建了一个 test 的ds账户那ds所在的服务器也必须有一个test的账户才可行这是ds的规则我没法解释为什么。 每个用户下(除了admin外所能创建的调度任务均在各自创建的项目下每个项目又分为多个任务(工作流定义),一个任务下又可分为多个任务节点下图为任务定义 ok,如果已经准备好以上步骤下面开始定义一个简单的调度任务继续哈
三.简单的参数传递
先看表 我们先做个简单的比如图中如果二狗子的本名叫李思需要我们取id1的name放到id3的label中并且更新update_time
1.这里第一步 在工作流定义列表点击 创建工作流 就进入一个具体的任务(工作流)的定义,同时我们使用的是sql任务所以就需要从左侧拖动一个sql任务到画布中(右侧空白处) 因为拖动sql任务到画布会自动弹出节点定义上图为当前节点的一个定义重点是数据源、sql类型、sql语句如官方所说如果将name传递到下游则需要在自定义参数重定义这个name为out方向 类型为varchar。2.因为传递到参数需要写入到表这里我们再定义一个节点这个节点负责接收上游传递到name执行update时使用这个name以下是我的定义 看到没这里不仅仅要注意sql类型(sql类型与sql语句是一一对应的类型不能错) 还有就是前置任务一定要选中(上面定义的)node1节点。 另外需要注意的是当前任务是上下游传参所以在node2中是直接使用node1中定义的name这个参数哈3.定义完成当前任务就需要保存点右上角保存填写并保存后点关闭以退出定义 4.因为定义的任务需要上线了才可执行所以在工作流定义列表先点该任务的黄色按钮(任务上线)然后才是点绿色按钮(执行任务) 5.任务执行成功与否具体得看任务实例这是执行node2节点的日志
顺带再看看数据库表是否真实成功
完美
四.复杂的跨节点传参
首先看表 思考一个问题可以看到李思的score是85根据score应该被评为 B(90的为A)并写入到label字段该怎么办呢如果这个分数是90分又该怎么办呢如果根本没有score(分值) 这个任务是不是就不需要更新李思的label(评分)呢 对于上面问题可以有一些偏门的解决方法比如在sql中塞一个异常值这样看似不错不过作为调度工具建议还是在condition节点或者switch节点处理是最好的不过就目前我用的2.0.5版本的ds对于这两类任务节点是没法接收参数的这是一个遗憾遂~个人觉得较好的方式是在写入节点之前增加一个判断节点将错误抛出(没有score的最好对于此我使用了一个shell的中间节点。 下面是我定义的三个节点
node1节点定义 node2节点定义
(脚本内容)
| | #!/bin/bash |
| | echo input param start |
| | echo id${id} |
| | echo score${score} |
| | echo input param end |
| | |
| | id${id} |
| | echo ${setValue(id2$id)} |
| | |
| | if [ ${score} -ge 90 ];then |
| | echo ${setValue(label2level A)} |
| | echo level A |
| | elif [ ${score} -ge 80 ];then |
| | echo ${setValue(label2level B)} |
| | echo level B |
| | elif [ ${score} -ge 60 ];then |
| | echo ${setValue(label2level C)} |
| | echo level C |
| | elif [ ${score} -ge 0 ];then |
| | echo ${setValue(label2F!)} |
| | echo F! |
| | else |
| | echo NO score ,please check! |
| | exit 1 |
| | fi |
node3节点定义 看一眼结果
五.中间的坑
对于复杂节点传参数也碰到一些坑这些坑大概有这些
1.对于shell脚本不熟悉的判断节点其实还是有一些难度的这是很重要的一点2.node2(判断节点)不能有重复的参数不管局部的还是node1(上一级)传递过来的均不能重复3.因为在node2(判断节点)需要将id以及label继续往下传(to node3),这时候就需要给id以及label定义一个映射的out变量(id2、label2)4.node2中重新设置参数麻烦需要在shell中重新定义变量(id2、label2),同时需要在shell任务内使用拼接的方式赋值(如echo ${setValue(id2$id)})5.sql类型以及不同节点下不同参数时常搞错不是任何节点都可以接收上级节点参数以及局部变量与传递变量以及全局变量优先级区别及可能造成冲突6.ds列表传参(2.0是不可以的)很鸡肋,对于列表传参又不能在下一级节点做循环赋值这点对于ds是有改进的空间的7.等等… 对于ds还有很多可扩展的地方(因为实际需要)所以我就做了一些二次开发后面会聊…大家期待哟