当前位置: 首页 > news >正文

网站建设的难点兰州网络推广的平台

网站建设的难点,兰州网络推广的平台,烟台高新区规划国土建设局网站,苏州网站建设哪家快引言#xff1a;《异构数据源的CDC实时同步系统》 系列第一篇 (已完成)《零编码打造异构数据实时同步系统——异构数据源CDC之2》 系列第二篇(已完成)《零编码打造异构数据实时同步系统——异构数据源CDC之3》 系列第三篇(已完成)《异构数据源的CDC实时同步系统——最终选型实…引言《异构数据源的CDC实时同步系统》 系列第一篇 (已完成)《零编码打造异构数据实时同步系统——异构数据源CDC之2》 系列第二篇(已完成)《零编码打造异构数据实时同步系统——异构数据源CDC之3》 系列第三篇(已完成)《异构数据源的CDC实时同步系统——最终选型实战》 系列第四篇(已完成)7.debeziumdebezium是由redhat支持的开源分布式CDC系统支持多端数据源如mysql、mongodb、postgresql、oracle、sql server和Cassandra社区非常活跃很多的新功能和新数据源都在快速发展中源码地址https://github.com/debezium/debezium我们使用debezium主要是看中它支持多数据源同时与kafka的整合在CDC领域不能忽略的一个商用产品是kafka conflent,在它的产品中连接源端的组件就是debezium我们一度就想使用这个商用组件但是试用版本仅支持一个broker无法在真正的的生产环境使用它的优势在于配置的可视化后来我们使用kafka eagle来进行kafka的管理后才彻底下定决心自己使用开源版本搞一套。我们最终采用的整体方案是debeziumkafkakafka-connect-jdbc,管理端使用的kafka eagle.关于confluent的资料网上很多我们在实际配置的过程中也参考了很多它的建议。注意事项1)debezium需要设置的mysql权限GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO user IDENTIFIED BY password;2)采用阿里云的rds的mysql数据源非常坑默认是不开启SHOW DATABASES权限的需要在debezium中单独配置属性database.history.store.only.monitored.tables.ddl:true3)debezium配合kafaka启动使用properties方式也就是说第一个源需要配置为文本模式后续可采用动态增加源的方式动态增加但是文件模式需要为json8.kafka-connect-jdbc开源地址https://github.com/confluentinc/kafka-connect-jdbc它是confluent开源的兼容jdbc的数据库同步数据的kafka connect连接器。这个组件支持的目的端的源非常多理论上有java客户端的基本都支持所以基本上可以涵盖你能用到的绝大多数数据源它的延迟非常好比之前的bireme好太多了毕竟是国外大厂支持的组件是国内小公司开源组件所不能比拟的。9.最终选型方案上图为我们最终确定的方案在实际生产中除了直接DB层级的数据实时同步外我们还有一套pulsar的比较灵活的数据接口方案不在此次讨论范围之内也就是说我们最终实现了基于DB和业务层级的实时数据同步方案。业界其他公司的CDC方案.实际生产配置过程1.kafka安装配置以standalone为例需要单独说明的是因为gpdb6目前还不支持upsert模式debezium的新增和更新均会导致一条新增加的完整数据到kafka默认kafka按批提交的模式会造成gpdb6的主键冲突需要修改模式为逐条应用同时配合自己单独写的check程序进行offset错误的自动修正#1)安装kafka注意2.30有个bugtar -zxvf kafka_2.12-2.4.0.tgzcd kafka_2.12-2.4.0Vim config/server.properties #单机版只需要消息存放路径即可log.dirs/opt/kafka_2.12-2.4.0/kafka-logs#增加可以删除topicdelete.topic.enabletrue#保留日志大小1GB不设置的话会日志撑爆log.retention.bytes1073741824mkdir -p /opt/kafka_2.12-2.4.0/kafka-logs#修改kafka的connect-standalone.properties设置为逐条应用consumer.max.poll.records1#2)修改内置zk的配置vim config/zookeeper.properties#制定zk元数据存放路径dataDir/opt/kafka_2.12-2.4.0/zdatamkdir -p /opt/kafka_2.12-2.4.0/zdata#3)启动服务先启动zk再启动kafkacd /opt/kafka_2.12-2.4.0/nohup bin/zookeeper-server-start.sh config/zookeeper.properties nohup bin/kafka-server-start.sh config/server.properties —加守护进程启动bin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties2.kafka基本命令#4)查看服务是够启动 jps#5)创建一个测试用的topicbin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test#查询topic列表bin/kafka-topics.sh --list --zookeeper localhost:2181#查看topic信息bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test #删除topic(只会删除元数据)配置上面的delete.topic.enabletrue后可生效bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test#手动删除文件bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test./kafka-topics.sh --zookeeper 192.168.6.42:2181 --describe --topic itslawnode1./kafka-consumer-groups.sh --describe --group test-consumer-group --zookeeper localhost:2181 #查看offset信息bin/kafka-consumer-groups.sh --bootstrap-server 192.168.6.42:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 0#查看和删除群组bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --listbin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group connect-sink-judge-up#从开始的消费信息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test#6)创建控制台生产者生产数据bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test#7)新开一个进程创建消费者数据bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3.debezium安装配置#下载debezium-connector-mysql,将文件中的jar包copy到kafka的libs目录cd /opt/kafka_2.12-2.4.0/tableconfig #tableconfig是新建目录存放配置文件#######第一个启动的properties文件格式############nameauthorization-mysql-connector-new-01connector.classio.debezium.connector.mysql.MySqlConnectordatabase.hostnamemysql源IPdatabase.port3306database.user账号database.password密码database.server.id1database.server.namedebeziumdatabase.whitelistplatform_authorizationdatabase.serverTimezoneUTCtable.whitelistplatform_authorization.lawyer_authorization,platform_authorization.lawyer_authorization_recorddatabase.history.kafka.bootstrap.serverslocalhost:9092database.history.kafka.topicauth.platform_authorizationinclude.schema.changesfalse#使用table名作为topic名字因为machine.db.table默认topictransformsroutetransforms.route.typeorg.apache.kafka.connect.transforms.RegexRoutertransforms.route.regex([^.]).([^.]).([^.])transforms.route.replacement$3#不进行初始化只获取当前的schema初始化采用rds_dbsync比较方便实际测试比init方式快几十倍因为此处是逐条应用的snapshot.modeschema_only##########json格式的文件#########{name:hanukkah-mysql-connector,config: {connector.class:io.debezium.connector.mysql.MySqlConnector,database.hostname:mysql主机名,database.port:3306,database.user:用户名,database.password:密码,database.server.id:1,database.server.name:debezium,database.whitelist:hanukkah,database.serverTimezone:UTC,table.whitelist:hanukkah.cooperation_lawyer,database.history.kafka.bootstrap.servers:localhost:9092,database.history.kafka.topic:mysql1.hanukkah,include.schema.changes:false,transforms:route,transforms.route.type:org.apache.kafka.connect.transforms.RegexRouter,transforms.route.regex:([^.]).([^.]).([^.]),transforms.route.replacement:$3,snapshot.mode:schema_only}} 4.sink配置#首先下载kafka-connect-jdbc-5.3.1.jar并防止到kafka的libs目录即可{ name: sink-cooperation_lawyer-ins, config: { connector.class: io.confluent.connect.jdbc.JdbcSinkConnector, tasks.max: 1, topics: cooperation_lawyer, connection.url: jdbc:postgresql://目的IP:5432/目的DB?user用户password密码stringtypeunspecified¤tSchema当前schema名, transforms: unwrap, transforms.unwrap.type: io.debezium.transforms.UnwrapFromEnvelope, transforms.unwrap.drop.tombstones: false, auto.create: true, insert.mode: insert, delete.enabled: true,table.name.format: platform.cooperation_lawyer, pk.fields: id, pk.mode: record_key }}需要额外说明的是在目的是greenplum数仓环境下1)如果mysql源端字段类型是timestamp则需要在gpdb端配置字段类型为timestamptz后无需额外配置sink项2)如果mysql源端字段类型是datetime则目的端字段类型需要配置为timestamp同时需要sink文件中增补TimestampConverter配置项有几个datetime字段配置几个配置项3)如果mysql源端datetime配置了精度需要debezium配置增加time.precision.modeconnect4) auto.evolve: true 则源端表结构变更后会自动在目的端创建对应数据结构 auto.create: true 则源端新增表后会自动同步到目的端{ name: sink-pa_course-ins, config: { connector.class: io.confluent.connect.jdbc.JdbcSinkConnector, tasks.max: 1, topics: pa_course, connection.url: jdbc:postgresql://目的IP:5432/目的DB?user用户password密码stringtypeunspecified¤tSchema当前schema名, transforms: unwrap,TimestampConverter, transforms.unwrap.type: io.debezium.transforms.UnwrapFromEnvelope, transforms.unwrap.drop.tombstones: false, transforms.TimestampConverter.type: org.apache.kafka.connect.transforms.TimestampConverter$Value, transforms.TimestampConverter.format: yyyy-MM-dd HH:mm:ss, transforms.TimestampConverter.field: create_time, transforms.TimestampConverter.target.type: string, auto.create: true, auto.evolve: true, insert.mode: insert, delete.enabled: true, pk.fields: id, pk.mode: record_key }}5.启动服务#启动kafka进程多了Kafka和QuorumPeerMainbin/zookeeper-server-start.sh -daemon config/zookeeper.propertiesbin/kafka-server-start.sh -daemon config/server.properties#启动第一个sourcebin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1connector-logs/connector.log 21 增补其他sourcecurl -X POST -H Content-Type:application/json -d tableconfig/authorization-source.json http://localhost:8083/connectors/#启动sinkcurl -i -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d tableconfig/paod-base-ins.properties ...#查看所有的connectors curl -X GET http://127.0.0.1:8083/connectors 6.使用eagle进行topic状态查看和管理关于kafka eagle的下载安装特别简单就不单独说明了这里仅贴出效果图从上图非常清楚的能看到哪些topic是有问题的(红色)绝大多数问题在于offset的错误导致的在实际使用中我们通过一个简单python守护进程的代码进行了管理import requestsimport loggingimport psycopg2import jsonimport reimport time# 获取数仓连接def get_gp_conn(): conn psycopg2.connect(host192.168.2.175, port5432, username, passwordpassword,dbnamedatawarehouse) return conn删除目的端的主键IDdef del_dup_id(tablefullname,dup_id): db get_gp_conn() cursor db.cursor() sql delete from tablefullname where id dup_id cursor.execute(sql) db.commit() cursor.close() db.close()重启sinkdef restart_sink(sinkname,configname): delurl http://127.0.0.1:8083/connectors/ sinkname del_res requests.delete(delurl) print(del resp:,del_res) url http://127.0.0.1:8083/connectors/ headers Content-Type:application/json,Accept:application/json datas tableconfig/ configname start_res requests.post(url,datadatas,headersheaders) print(start resp:,start_res) #checkurl http://127.0.0.1:8083/connectors/ sinkname /tasks/0/status url http://127.0.0.1:8083/connectors/ sinkname /tasks/0/restart requests.post(url)检测任务状态def check_sink_status(sinkname,tablefullname,configname): sinkurl http://127.0.0.1:8083/connectors/ sinkname /tasks/0/status print(sinkurl) resp requests.get(sinkurl) status json.loads(resp.text) state status[state] if state FAILED: trace status[trace] pattern re.compile(rKey (id)((.)) already exists) search re.search(pattern, trace) #print(search) if search: del_id search.group(1) print(duplicate key is {}, now to del this record of target database.format(del_id)) del_dup_id(tablefullname,del_id) restart_sink(sinkname,configname)获取任务列表def get_sink_list(): conn get_gp_conn() cur conn.cursor() cur.execute(select taskname,tableschema,tablename,configname from platform.tasklist where tablename is not null) print(current time is:,time.strftime(%Y-%m-%d %H:%M:%S,time.localtime(time.time()))) rows cur.fetchall() for row in rows: taskname row[0] schema row[1] tablename row[2] configname row[3] tablefullname schema .tablename check_sink_status(taskname,tablefullname,configname) cur.close() conn.close()if __name__ __main__: get_sink_list()同时为了避免standalone进程的异常终止我们用shell的守护进行进行了监控#! /bin/bashfunction check(){ countps -ef |grep $1 |grep -v grep |wc -l #echo $count if [ 0 $count ];then time$(date %Y-%m-%d %H:%M:%S) echo standalone restart at:${time} cd /opt/kafka_2.12-2.4.0/ bin/connect-standalone.sh config/connect-standalone.properties tableconfig/paod-source.properties 1connector-logs/connector.log 21 sleep 60scurl -X POST -H Content-Type:application/json -d tableconfig/platform-source.json http://localhost:8083/connectors/ ...... sleep 10s curl -i -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d tableconfig/paod-base-ins.properties  ..... fi } check ConnectStandalone另外在实际运行过程中会出现offset错误的情况极其特殊情况下使用上面的方法无法快速解决问题建议使用kafkacat查看详细信息人为跳过offset具体细节不再赘述。如喜欢此专题请关注并提问技术人驱动自身的是永不停歇的渴望。
http://www.zqtcl.cn/news/44781/

相关文章:

  • 比较好的建立站点贵阳网站优化排名
  • 怎么做网站门户电子商务网站建设与原理
  • 中国做外贸的网站有哪些WordPress小说漫画主题国外
  • 客户提出网站建设申请益阳网站建设益阳
  • wordpress插件视频去广告 ck-video0.65.zipwordpress 中文seo
  • 广州网站制作方法百度互联网公司邯郸分公司
  • 网站后台管理密码破解深圳找工作58同城最新招聘
  • 阿里网站建设教程网站做友链盈利
  • 防钓鱼网站宣传世界各国足球联赛排名
  • 怎么建立本地网站visual studio怎么创建网页
  • php帝国建站系统短视频宣传片制作
  • 网站建设的内容要怎么写泉州网站关键词推广
  • 蛋糕网站建设规划书临汾哪做网站
  • 网站seo方案建议明年做那些网站致富
  • 做任务的正规网站重庆专业seo
  • 网站备案号找回密码网站后台登陆口
  • 网站建设打造营销型网站网站建设营销攻略
  • 百度搜索网站的图片黄江二手车东莞网站建设
  • 深圳微商城网站制作联系电话自己做网站 怎样下载模板
  • 肥西网站推广公司做网站后端要什么技术
  • 基于MVC网站建设课程设计报告wordpress是可视化编辑
  • a站是什么做网页兼职网站
  • 延吉制作网站针织东莞网站建设技术支持
  • 网站建设管理员网站建设合同书下载
  • 做站群的网站要备案吗控制面板网站
  • 五分钟自己创建网站的方法皮肤科医生免费问诊
  • seo整站优化方案案例台州seo排名扣费
  • 汉中商城网站建设外贸网站建站多少钱
  • 个人网站做导航网站网站怎么做json数据库
  • 设计网站赤壁市药监局网站建设方案