网站建设太难了,2021个人网站盈利模式,无锡网站建设 推荐无锡立威云商,高校二级网站建设方案说明
nifi版本#xff1a;1.23.2#xff08;docker镜像#xff09;
需求背景
将数据库中的数据同步到另一个数据库中#xff0c;要求对于新增的数据和历史有修改的数据进行增量同步
模拟数据
建表语句
源数据库和目标数据库结构要保持一致#xff0c;这样可以避免后…说明
nifi版本1.23.2docker镜像
需求背景
将数据库中的数据同步到另一个数据库中要求对于新增的数据和历史有修改的数据进行增量同步
模拟数据
建表语句
源数据库和目标数据库结构要保持一致这样可以避免后面单独转换
-- 创建测试表
CREATE TABLE sys_user (id bigint NOT NULL AUTO_INCREMENT COMMENT 用户ID,name varchar(50) NOT NULL DEFAULT COMMENT 姓名,age int NOT NULL DEFAULT 0 COMMENT 年龄,gender tinyint NOT NULL COMMENT 性别,1:男,0:女,create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 创建时间,modify_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 修改时间,is_deleted tinyint NOT NULL DEFAULT 0 COMMENT 是否已删除,PRIMARY KEY (id) USING BTREE
) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_0900_ai_ci ROW_FORMATDYNAMIC COMMENT用户表;
测试数据
-- 模拟数据
INSERT INTO sys_user (name, age, gender) VALUES (测试数据1, 20, 1);
INSERT INTO sys_user (name, age, gender) VALUES (测试数据2, 21, 1);
INSERT INTO sys_user (name, age, gender) VALUES (测试数据3, 21, 0);
INSERT INTO sys_user (name, age, gender) VALUES (测试数据4, 18, 0);
INSERT INTO sys_user (name, age, gender) VALUES (测试数据5, 22, 1);
完整测试数据 配置数据库连接池
在画布空白位置鼠标右键选择Configure 新增配置
在弹出的界面点击号添加新的数据库连接池配置如果已经有了配置该步骤可以跳过 在弹出的界面筛选对应类型的连接池我这里选择DBCPConnectionPool然后点击ADD 点击刚才新添加的那一条数据右侧的小齿轮进行连接池相关的配置 配置连接池相关属性
主要配置以下几个内容其他的根据情况决定是否需要修改,密码输入后是不会显示的 校验属性
校验配置是否正确点击右上角的对钩然后在弹出的界面点击VERIFY进行验证 验证通过会全部显示绿色如果某一条不通过会有提示最后点击APPLY 可选操作给配置起个名字
为了方便后续使用给连接池起个名字要不然以后配置多了会分不清 激活连接池的配置
点击右侧的闪电标志激活配置在新的页面中点击ENABLE激活最后点击CLOSE关闭 已激活的配置 同理增加目标数据库的连接池配置步骤和上面是一样的这里不再重复了最终配置好后会有两个连接池的配置。如下 获取数据库表数据
添加处理器QueryDatabaseTable
点击工具栏的Processor拖拽到画布中筛选QueryDatabaseTable处理器然后点击ADD添加到画布中 配置处理器QueryDatabaseTable
双击处理器切换到PROPERTIES选项卡,配置以下内容 Maximum-value Columns最大值列官方文档是这么解释的以逗号分隔的列名列表。处理器将跟踪自处理器开始运行以来返回的每一列的最大值。使用多个列意味着列列表的顺序并且每列的值预计比前几列的值增加得更慢。因此使用多个列意味着列的分层结构通常用于对表进行分区。此处理器可用于仅检索自上次检索以来添加/更新的那些行。请注意某些 JDBC 类型如 bit/boolean不利于保持最大值因此这些类型的列不应列在此属性中并且将导致处理过程中的错误。如果未提供列则将考虑表中的所有行这可能会对性能产生影响。注意为给定表使用一致的最大值列名非常重要这样增量提取才能正常工作。支持表达式语言true 校验属性 给处理器起个名字表示当前整个工作流的作用 拆分数据
添加处理器SplitAvro 配置处理器SplitAvro
双击处理器切换到PROPERTIES选项卡,所有内容默认即可 数据入库
添加处理器PutDatabaseRecord 配置处理器PutDatabaseRecord
双击处理器切换到PROPERTIES选项卡
新增Record Reader 配置AvroReader
点击右侧的箭头在弹出的界面选择刚才配置的Reader,然后点击右侧的小齿轮 在弹出的界面根据自己的需要自行配置这里按照默认的配置即可 激活Reader
点击右侧的闪电标志进行激活 激活后的状态变为Enabled 其他配置 校验属性 连接所有处理器
连接处理器
连接QueryDatabaseTable和SplitAvro两个处理器勾选For Relationships下的success 连接SplitAvro和PutDatabaseRecord两个处理器勾选For Relationships下的split 处理SplitAvro处理器的告警
双击SplitAvro处理器切换到RELATIONSHIPS勾选下面的两个选项然后点击APPLY 处理PutDatabaseRecord处理器的告警
双击PutDatabaseRecord处理器切换到RELATIONSHIPS勾选下面的选项然后点击APPLY 完整配置 启动所有处理器
QueryDatabaseTable处理器默认是一分钟执行一次的可以在SCHEDULING选项卡下面进行配置这里按照默认的时间来执行 在画布空白位置鼠标右键选择Start启动所有的处理器 查看目标数据库数据
等待一分钟后查看目标数据库数据发现源数据库的5条数据被同步到了目标数据库 修改源数据库的数据
UPDATE sys_user SET is_deleted 1 WHERE id 1;
UPDATE sys_user SET is_deleted 1 WHERE id 4;
INSERT INTO sys_user (name, age, gender) VALUES (测试数据6, 22, 1);
再次查看目标数据库数据
等待处理器执行后查看目标数据库数据发现新的数据已经被同步过去 可以看到最后一个处理器最终由8条记录流入 结束语
以上便是使用NIFI增量同步数据库数据的全过程如果有什么疑问欢迎评论区进行评论。