当前位置: 首页 > news >正文

网络营销之网站建设广州好玩的地方和景点

网络营销之网站建设,广州好玩的地方和景点,安卓4.3网站开发兼容,可以自己做直播网站吗文章目录 前言一、批量注册bean定义#xff1a;1.1 定义Canal注解#xff1a;1.2 canal bean定义注册#xff1a;1.3 canal bean 生成#xff1a; 二、canal客户端获取mysql数据变动2.1 canal客户端2.2 消息处理 总结参考 前言 在项目中如果想要多个Canal 客户端通过tcp直… 文章目录 前言一、批量注册bean定义1.1 定义Canal注解1.2 canal bean定义注册1.3 canal bean 生成 二、canal客户端获取mysql数据变动2.1 canal客户端2.2 消息处理 总结参考 前言 在项目中如果想要多个Canal 客户端通过tcp直连接入Canal 服务端显然需要定义多个连接不同实例的客户端而每个客户端除了连接到的实例不同其它配置几乎都相同如果定义多个客户端显然会造成很多重复代码那么spring 中有什么办法可以批量定义canal客户端 一、批量注册bean定义 我们知道spring中bean 的生成是依靠bean 定义所以如果我们可以批量定义canal客户端 BeanDefinition 然后将其注册到spring 这样spring 就可以来生成我们需要的bean。而在spring 中我们可以 使用ImportBeanDefinitionRegistrar来自定义bean 1.1 定义Canal注解 CanalConfig.java import org.springframework.context.annotation.Import;import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;Retention(RetentionPolicy.RUNTIME) Target(ElementType.TYPE) Import(CanalConnectorRegistry.class) public interface CanalConfig {// 定义需要连接的canal 实例数组String[] destinations() default ; }然后在 spring 启动类 就可以增加改注解 import com.example.spring_canal.config.CanalConfig; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication CanalConfig(destinations {test,aabbcc}) public class SpringCanalApplication {public static void main(String[] args) {SpringApplication.run(SpringCanalApplication.class, args);}}1.2 canal bean定义注册 CanalConnectorRegistry.java import org.springframework.beans.MutablePropertyValues; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.BeanNameGenerator; import org.springframework.beans.factory.support.GenericBeanDefinition; import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; import org.springframework.core.type.AnnotationMetadata;import java.util.Map;public class CanalConnectorRegistry implements ImportBeanDefinitionRegistrar {Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) {// 获取CanalConfig 的注解内容MapString, Object annotationAttributes importingClassMetadata.getAnnotationAttributes(CanalConfig.class.getName());// 获取destinations 要连接的实例String[] destinations (String[]) annotationAttributes.get(destinations);for (int i 0; i destinations.length; i) {GenericBeanDefinition beanDefinition new GenericBeanDefinition();// 为CanalConnectorFactory 设置 destinationRegistry属性参数MutablePropertyValues properties new MutablePropertyValues();properties.add(destinationRegistry, destinations[i]);beanDefinition.setPropertyValues(properties);// 定义 使用CanalConnectorFactory 来生成bean 对象beanDefinition.setBeanClass(CanalConnectorFactory.class);// 因为要生成的canalConnector bean对象都是CanalConnector 类型所以bean 的名称不能重复// 本文生成bean 的名称为canalConnector0canalConnector1registry.registerBeanDefinition(canalConnector i, beanDefinition);}}Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {ImportBeanDefinitionRegistrar.super.registerBeanDefinitions(importingClassMetadata, registry);} } 1.3 canal bean 生成 CanalConnectorFactory.java import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component;import java.net.InetSocketAddress;Component public class CanalConnectorFactory implements FactoryBean {private String destinationRegistry;// 定义canal 服务段的地址和端口Value(${canal.server.host})private String canalServerHost;Value(${canal.server.port})private int canalServerPort;public void setDestinationRegistry(String destinationRegistry) {this.destinationRegistry destinationRegistry;}public CanalConnector createConnector(String destination, String username, String password) {return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerHost, canalServerPort),destination, username, password);}public CanalConnector createConnector(String destination) {return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerHost, canalServerPort),destination, , );}Overridepublic Object getObject() throws Exception {// 生成 canal 客户端的beanreturn createConnector(destinationRegistry);}Overridepublic Class? getObjectType() {return CanalConnector.class;} } canal 服务端ip 端口定义 canal.server.hostlocalhost canal.server.port11111这样当在项目中 去获取 canalConnector0canalConnector1这样的bean 时就会通过 CanalConnectorFactory 的getObject() 去生成bean 二、canal客户端获取mysql数据变动 2.1 canal客户端 CanalService2.java import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationContext; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils;import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List;Slf4j Component // 根据canal.enable 属性值取加载 这个bean 如果 canal.enable 为false 则不加载bean ConditionalOnProperty(name canal.enable, havingValue true) public class CanalService2 implements DisposableBean {Autowiredprivate ApplicationContext context;Autowiredprivate RedisTemplate redisTemplate;// 定义要连接的实例数组// 注意顺序和 CanalConfig(destinations {test,aabbcc}) 保持一致Value(#{${canal.destination.values}.split(,)})private ListString destinations;// 定义每个实例中要监听的表注意顺序和canal.destination.values 保持一致Value(#{${canal.client.subscribe.filters}.split(,)})private ListString canalFilters;// 定义每个实例中每次要获取表动条数注意顺序和canal.destination.values 保持一致Value(#{${canal.client.batch.sizes}.split(,)})private ListInteger batchSizes;Autowiredprivate CanalListener canalListener;private ListCanalConnector connectors new ArrayList(1 3);PostConstructpublic void run() {// 开启线程进行数据消费for (int i 0; i destinations.size(); i) {int finalI i;new Thread(() - toConsumeMessage(finalI, destinations.get(finalI))).start();}}private void toConsumeMessage(int i, String destination) {// 获取spring 容器中的 CanalConnector beanCanalConnector canalConnector (CanalConnector) context.getBean(canalConnector i);// 收集项目中使用到的CanalConnector bean 便于后续项目终止进行端口连接使用connectors.add(canalConnector);// 定义最后消费的位点long lastOffset fetchFromPosition(canalConnector, i, destination);while (true) {// 获取消息并且不进行ack 确认Message message canalConnector.getWithoutAck(batchSizes.get(i));long batchId message.getId();ListCanalEntry.Entry entryList message.getEntries();int size message.getEntries().size();// 如果没有获取到消息则2s 后在次进行获取if (batchId -1 || entryList.isEmpty()) {try {// 线程休眠2秒Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}continue;}// 比对消费位点如果项目中已经消费过该条数据则继续进行下一次数据拉取long nowOffset entryList.get(0).getHeader().getLogfileOffset();if (nowOffset lastOffset) {continue;}try {// 消费数据canalListener.onMessage(message);// 向服务端提交ack 确认canalConnector.ack(batchId);// 保存最后消费的位点防止项目重启后 重复消费消息lastOffset message.getEntries().get(size - 1).getHeader().getLogfileOffset();savePositionState(lastOffset, destination);} catch (Exception ex) {log.error(consume error:{}, ex.getMessage());// 回滚到未进行 ack 的地方指定回滚具体的batchIdcanalConnector.rollback(batchId);}}}// 获取并设置消费的起始位点private long fetchFromPosition(CanalConnector canalConnector, int i, String key) {// Canal 连接器连接canalConnector.connect();// 订阅数据变更canalConnector.subscribe(canalFilters.get(i));// 回滚到未进行 ack 的地方下次fetch的时候可以从最后一个没有 ack 的地方开始拿canalConnector.rollback();// 从存储中获取上次消费的位点long position getPositionState(key);return position;}// 获取位点状态private long getPositionState(String key) {// TODO: 从存储中获取上次消费的位点Object slot redisTemplate.opsForValue().get(canal: key);if (null ! slot) {if (slot instanceof Long) {return (long) slot;} else {return ((Integer) slot).longValue();}}return -1;}// 保存位点状态private void savePositionState(long position, String key) {// TODO: 将 position 保存到存储中redisTemplate.opsForValue().set(canal: key, position);}Overridepublic void destroy() throws Exception {// 项目关闭断开连接if (null ! connectors !CollectionUtils.isEmpty(connectors)) {connectors.stream().forEach(oneConnect - {if (null ! oneConnect) {oneConnect.disconnect();}});}} } 参数配置 canal.enabletrue canal.destination.valuestest,aabbcc canal.client.subscribe.filterstest.test_user|test.user,biglog.about_us canal.client.batch.sizes10,102.2 消息处理 CanalListener.java public interface CanalListener {void onMessage(Message msg); }MyCanalListener.java import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;import java.util.HashMap; import java.util.List; import java.util.Map;Slf4j Component public class MyCanalListener implements CanalListener {Overridepublic void onMessage(Message msg) {ListCanalEntry.Entry entries msg.getEntries();for (CanalEntry.Entry entry : entries) {if (entry.getEntryType() CanalEntry.EntryType.ROWDATA) {CanalEntry.RowChange rowChange null;try {rowChange CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {throw new RuntimeException(parse error, e);}String tableName entry.getHeader().getTableName();CanalEntry.EventType eventType rowChange.getEventType();ListCanalEntry.RowData rowDataList rowChange.getRowDatasList();String schemaName entry.getHeader().getSchemaName();// 处理数据变更事件for (CanalEntry.RowData rowData : rowDataList) {switch (eventType) {case INSERT:// 处理插入事件dealInsert(schemaName, tableName, rowData.getAfterColumnsList());break;case UPDATE:// 处理更新事件dealUpdate(schemaName, tableName, rowData.getAfterColumnsList());break;case DELETE:// 处理删除事件dealDelate(schemaName, tableName, rowData.getBeforeColumnsList());break;default:break;}}}}}private void dealDelate(String schemaName, String tableName, ListCanalEntry.Column afterColumnsList) {MapString, Object dataMap new HashMap();for (CanalEntry.Column column : afterColumnsList) {dataMap.put(column.getName(), column.getValue());} // log.debug(delate data:{}, afterColumnsList);log.debug(delate map data:{}, dataMap);}private void dealUpdate(String schemaName, String tableName, ListCanalEntry.Column columns) {MapString, Object dataMap new HashMap();for (CanalEntry.Column column : columns) {dataMap.put(column.getName(), column.getValue());} // log.debug(update data:{}, columns);log.debug(update map data:{}, dataMap);}private void dealInsert(String schemaName, String tableName, ListCanalEntry.Column columns) {MapString, Object dataMap new HashMap();for (CanalEntry.Column column : columns) {dataMap.put(column.getName(), column.getValue());} // log.debug(insert data:{}, columns);log.debug(insert map data:{}, dataMap);} } 总结 本文通过ImportBeanDefinitionRegistrar 进行canal客户端bean 定义的注册通过FactoryBean 注意canal 客户端的默认的id 为1001目前canal server上的一个instance只能有一个client消费。 参考 Canal ClientAPI 参考
http://www.zqtcl.cn/news/674219/

相关文章:

  • 网站建设推广话术wordpress 不显示缩略图
  • 企业电子商务网站建设和一般百拓公司做网站怎么样
  • 吉林网站建设司上海什么做网站的公司比较好
  • 吉安市建设规划局网站jsp wordpress
  • 建设银行贵金属网站微信小程序注册后怎么使用
  • 如何做律师网站河南建网站 优帮云
  • 云阳如何做网站网站建设旅游
  • 推荐一个简单的网站制作单位网站服务的建设及维护
  • tp5网站文档归档怎么做网站 信用卡支付接口
  • phpcms 企业网站网站建设中单页代码
  • 坑梓网站建设方案网络编程技术及应用
  • 电子商务网站建设 价格新媒体运营需要具备哪些能力
  • 做生存分析的网站电商网站运营建设的目标
  • 佛山 做网站邮箱官方网站注册
  • 生成flash的网站源码表白二维码制作网站
  • 定做专业营销型网站网站开发应用
  • 万盛建设局官方网站如何用群晖nas做网站
  • 建设装饰网站郑州惠济区建设局网站
  • 网站做标题有用吗网站优化多少钱
  • 婚庆设备租赁网站源码如何进行网站的建设和维护
  • 青岛做网站公wordpress文章付费阅读
  • 小灯具网站建设方案360优化大师
  • 开发公司与物业公司前期合同网站优化的推广
  • 汉堡云虚拟主机aso安卓优化公司
  • 医院 网站建设 新闻营销外包
  • 优秀网站网址郑州无痛人流哪家医院好
  • 备案网站能打开吗大良营销网站建设流程
  • 哪些网站可以做淘宝店招石油网站编辑怎么做
  • 网站出现建设中集团网站建设特点
  • asp网站开发 pdf企业展厅设计公司盛世笔特