有没有给做淘宝网站的,做会计需要了解的网站及软件,策划案需要给做网站吗,现在网站还用asp做概述把 mysql 的数据迁移到 es 有很多方式#xff0c;比如直接用 es 官方推荐的 logstash 工具#xff0c;或者监听 mysql 的 binlog 进行同步#xff0c;可以结合一些开源的工具比如阿里的 canal。这里打算详细介绍另一个也是不错的同步方案#xff0c;这个方案基于 kafka…概述把 mysql 的数据迁移到 es 有很多方式比如直接用 es 官方推荐的 logstash 工具或者监听 mysql 的 binlog 进行同步可以结合一些开源的工具比如阿里的 canal。这里打算详细介绍另一个也是不错的同步方案这个方案基于 kafka 的连接器。流程可以概括为mysql连接器监听数据变更把变更数据发送到 kafka topic。ES 监听器监听kafka topic 消费写入 ES。Kafka Connect有两个核心概念Source和Sink。 Source负责导入数据到KafkaSink负责从Kafka导出数据它们都被称为Connector也就是连接器。在本例中mysql的连接器是sourcees的连接器是sink。这些连接器本身已经开源我们之间拿来用即可。不需要再造轮子。过程详解准备连接器工具我下面所有的操作都是在自己的mac上进行的。首先我们准备两个连接器分别是 kafka-connect-elasticsearch 和 kafka-connect-elasticsearch 你可以通过源码编译他们生成jar包源码地址我个人不是很推荐这种源码的编译方式因为真的好麻烦。除非想研究源码。我是直接下载 confluent 平台的工具包里面有编译号的jar包可以直接拿来用下载地址我下载的是 confluent-5.3.1 版本, 相关的jar包在 confluent-5.3.1/share/java 目录下我们把编译好的或者下载的jar包拷贝到kafka的libs目录下。拷贝的时候要注意除了 kafka-connect-elasticsearch-5.3.1.jar 和 kafka-connect-jdbc-5.3.1.jar相关的依赖包也要一起拷贝过来比如es这个jar包目录下的http相关的jersey相关的等否则会报各种 java.lang.NoClassDefFoundError 的错误。另外mysql-connector-java-5.1.22.jar也要放进去。数据库和ES环境准备数据库和es我都是在本地启动的这个过程具体就不说了网上有很多参考的。我创建了一个名为test的数据库里面有一个名为login的表。配置连接器这部分是最关键的我实际操作的时候这里也是最耗时的。首先配置jdbc的连接器。我们从confluent工具包里拷贝一个配置文件的模板(confluent-5.3.1/share目录下)自带的只有sqllite的配置文件拷贝一份到kafka的config目录下改名为sink-quickstart-mysql.properties文件内容如下# tasks to create:namemysql-login-connectorconnector.classio.confluent.connect.jdbc.JdbcSourceConnectortasks.max1connection.urljdbc:mysql://localhost:3306/test?userrootpassword11111111modetimestampincrementingtimestamp.column.namelogin_timeincrementing.column.nameidtopic.prefixmysql.table.whitelistloginconnection.url指定要连接的数据库这个根据自己的情况修改。mode指示我们想要如何查询数据。在本例中我选择incrementing递增模式和timestamp 时间戳模式混合的模式 并设置incrementing.column.name递增列的列名和时间戳所在的列名。混合模式还是比较推荐的它能尽量的保证数据同步不丢失数据。具体的原因大家可以查阅相关资料这里就不详述了。topic.prefix是众多表名之前的topic的前缀table.whitelist是白名单表示要监听的表可以使组合多个表。两个组合在一起就是该表的变更topic比如在这个示例中最终的topic就是mysql.login。connector.class是具体的连接器处理类这个不用改。其它的配置基本不用改。接下来就是ES的配置了。同样也是拷贝 quickstart-elasticsearch.properties 文件到kafka的config目录下然后修改我自己的环境内容如下nameelasticsearch-sinkconnector.classio.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max1topicsmysql.loginkey.ignoretrueconnection.urlhttp://localhost:9200type.namemysqldatatopics的名字和上面mysql设定的要保持一致同时这个也是ES数据导入的索引。从里也可以看出ES的连接器一个实例只能监听一张表。type.name需要关注下我使用的ES版本是7.1我们知道在7.x的版本中已经只有一个固定的type(_doc)了使用低版本的连接器在同步的时候会报错误我这里使用的5.3.1版本已经兼容了。继续看下面的章节就知道了。关于es连接器和es的兼容性问题有兴趣的可以看看下面这个issue启动测试当然首先启动zk和kafka。然后我们启动mysql的连接器./bin/connect-standalone.sh config/connect-standalone.properties config/source-quickstart-mysql.properties 接着手动往login表插入几条记录正常情况下这些变更已经发到kafka对应的topic上去了。为了验证我们在控制台启动一个消费者从mysql.login主题读取数据./bin/kafka-console-consumer.sh --bootstrap-serverlocalhost:9092 --topic mysql.login --from-beginning可以看到刚才插入的数据。把数据从 MySQL 移动到 Kafka 里就算完成了接下来把数据从 Kafka 写到 ElasticSearch 里。首先启动ES和kibana当然后者不是必须的只是方便我们在IDE环境里测试ES。你也可以通过控制台给ES发送HTTP的指令。先把之前启动的mysql连接器进程结束(因为会占用端口)再启动 ES 连接器./bin/connect-standalone.sh config/connect-standalone.properties config/quickstart-elasticsearch.properties 如果正常的话ES这边应该已经有数据了。打开kibana的开发工具在console里执行GET _cat/indices这是获取节点上所有的索引你应该能看到green open mysql.login 1WqRjkbfTlmXj8eKBPvAtw 1 1 4 0 12kb 7.8kb说明索引已经正常创建了。然后我们查询下GET mysql.login/_search?prettytrue结果如下{took : 1,timed_out : false,_shards : {total : 1,successful : 1,skipped : 0,failed : 0},hits : {total : {value : 4,relation : eq},max_score : 1.0,hits : [{_index : mysql.login,_type : mysqldata,_id : mysql.login00,_score : 1.0,_source : {id : 1,username : lucas1,login_time : 1575870785000}},{_index : mysql.login,_type : mysqldata,_id : mysql.login01,_score : 1.0,_source : {id : 2,username : lucas2,login_time : 1575870813000}},{_index : mysql.login,_type : mysqldata,_id : mysql.login02,_score : 1.0,_source : {id : 3,username : lucas3,login_time : 1575874031000}},{_index : mysql.login,_type : mysqldata,_id : mysql.login03,_score : 1.0,_source : {id : 4,username : lucas4,login_time : 1575874757000}}]}}参考1.《kafka权威指南》关注公众号犀牛饲养员的技术笔记