当前位置: 首页 > news >正文

苏州 网站建设页面永久升级

苏州 网站建设,页面永久升级,优普南通网站建设,优秀企业宣传ppt文章目录 今日内容1 Kafka1.1 消息中间件对比1.2 kafka介绍1.3 kafka安装及配置1.4 kafka案例1.4.1 导入kafka客户端1.4.2 编写生产者消费者1.4.3 启动测试1.4.4 多消费者启动 1.5 kafka分区机制1.5.1 topic剖析 1.6 kafka高可用设计1.7 kafka生产者详解1.7.1 同步发送1.7.2 异… 文章目录 今日内容1 Kafka1.1 消息中间件对比1.2 kafka介绍1.3 kafka安装及配置1.4 kafka案例1.4.1 导入kafka客户端1.4.2 编写生产者消费者1.4.3 启动测试1.4.4 多消费者启动 1.5 kafka分区机制1.5.1 topic剖析 1.6 kafka高可用设计1.7 kafka生产者详解1.7.1 同步发送1.7.2 异步发送1.7.3 参数详解1.7.3.1 ack1.7.3.2 retries1.7.3.3 消息压缩 1.8 kafka消费者详解1.8.1 消费者组1.8.2 消息有序性1.8.3 提交和偏移量1.8.3.1 同步提交1.8.3.2 异步提交1.8.3.3 同步异步混合提交 1.9 Spring集成kafka1.9.1 导入依赖1.9.2 创建配置文件1.9.3 创建生产者1.9.4 创建消费者1.9.5 启动类1.9.6 测试 1.10 kafka传递对象1.10.1 创建User1.10.2 添加User的发送和接收 2 自媒体文章上下架2.1 接口定义2.2 Controller2.3 Service2.4 通知Article修改文章配置2.4.1 导入kafka依赖2.4.2 在Nacos中配置kafka的生产者2.4.3 自媒体通知Article2.4.4 在Nacos中配置kafka的消费者2.4.5 配置ap_article_config表2.4.6 article端监听2.4.7 测试 今日内容 1 Kafka 1.1 消息中间件对比 1.2 kafka介绍 1.3 kafka安装及配置 Docker安装zookeeper 拉取镜像 docker pull zookeeper:3.4.14创建容器 docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14Docker安装kafka 下载镜像 docker pull wurstmeister/kafka:2.12-2.3.1创建容器 docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME192.168.204.129 \ --env KAFKA_ZOOKEEPER_CONNECT192.168.204.129:2181 \ --env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.204.129:9092 \ --env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS-Xmx256M -Xms256M \ --nethost wurstmeister/kafka:2.12-2.3.1docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME192.168.204.129 \ --env KAFKA_ZOOKEEPER_CONNECT192.168.204.129:2181 \ --env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.204.129:9092 \ --env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS-Xmx256M -Xms256M \ -p 9092:9092 wurstmeister/kafka:2.12-2.3.1-p 9092:9092做端口映射 1.4 kafka案例 1.4.1 导入kafka客户端 在heima-leadnews-test模块中创建kafka-demo的模块 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId /dependency1.4.2 编写生产者消费者 创建com.heima.kafka.sample包 下面两个类ConsumerQuickStart和ProducerQuickStart类 生产者 package com.heima.kafka.sample; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/ public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.204.129:9092);//发送失败失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//2.生产者对象KafkaProducerString,String producer new KafkaProducerString, String(properties);/*** 第一个参数topic 第二个参数key 第三个参数value*///封装发送的消息ProducerRecordString,String record new ProducerRecordString, String(topic-first,key-001,hello kafka);//3.发送消息producer.send(record);//4.关闭消息通道必须关闭否则消息发送不成功producer.close();}}消费者 public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.204.129:9092);//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);//2.消费者对象KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);//3.订阅主题consumer.subscribe(Collections.singletonList(topic-first));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}1.4.3 启动测试 消费者成功收到消息 1.4.4 多消费者启动 同一个组下只能有一个消费者的收到消息 如果想一对多则需要将消费者放在不同组中 1.5 kafka分区机制 1.5.1 topic剖析 ProducerRecordString,String record new ProducerRecordString, String(topic-first,key-001,0,hello kafka);在发送消息时可以指定分区partition 1.6 kafka高可用设计 1.7 kafka生产者详解 1.7.1 同步发送 /*** 第一个参数topic 第二个参数key 第三个参数value*/ //封装发送的消息 ProducerRecordString,String record new ProducerRecordString, String(topic-first,key-001,hello kafka);//3.发送消息 //producer.send(record);//3.1 同步发送消息 RecordMetadata recordMetadata producer.send(record).get(); System.out.println(同步发送消息结果topicrecordMetadata.topic(),partitionrecordMetadata.partition(),offsetrecordMetadata.offset());发送结果 同步发送消息结果topictopic-first,partition0,offset11.7.2 异步发送 //3.2 异步发送消息 producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e!null){e.printStackTrace();}else{System.out.println(异步发送消息结果topicrecordMetadata.topic(),partitionrecordMetadata.partition(),offsetrecordMetadata.offset());}} });发送结果 异步发送消息结果topictopic-first,partition0,offset21.7.3 参数详解 1.7.3.1 ack 1.7.3.2 retries 1.7.3.3 消息压缩 1.8 kafka消费者详解 1.8.1 消费者组 1.8.2 消息有序性 1.8.3 提交和偏移量 手动提交 //手动提交偏移量 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);1.8.3.1 同步提交 把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量commitSync()将会提交poll返回的最新的偏移量所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。 只要没有发生不可恢复的错误commitSync()方法会一直尝试直至提交成功如果提交失败也可以记录到错误日志里。 while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());try {consumer.commitSync();//同步提交当前最新的偏移量}catch (CommitFailedException e){System.out.println(记录提交失败的异常e);}} }1.8.3.2 异步提交 手动提交有一个缺点那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率但这个会增加消息重复的概率和自动提交一样。另外一个解决办法是使用异步提交的API commitAsync()。 while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata map, Exception e) {if(e!null){System.out.println(记录错误的提交偏移量 map,异常信息e);}}}); }1.8.3.3 同步异步混合提交 异步提交也有个缺点那就是如果服务器返回提交失败异步提交不会进行重试。 相比较起来同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为如果同时存在多个异步提交进行重试可能会导致位移覆盖。 举个例子假如我们发起了一个异步提交commitA此时的提交位移为2000随后又发起了一个异步提交commitB且位移为3000commitA提交失败但commitB提交成功此时commitA进行重试并成功的话会将实际上将已经提交的位移从3000回滚到2000导致消息重复消费。 try {while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync();} }catch (Exception e){e.printStackTrace();System.out.println(记录错误信息e); }finally {try {consumer.commitSync();}finally {consumer.close();} }1.9 Spring集成kafka 1.9.1 导入依赖 在kafka-demo中导入依赖 dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- kafkfa --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdexclusionsexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactId/dependency /dependencies1.9.2 创建配置文件 在resources下创建文件application.yml server:port: 9991 spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.204.129:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer1.9.3 创建生产者 创建com.heima.kafka.controller.HelloController类负责发送消息 RestController public class HelloController {Autowiredprivate KafkaTemplateString, String kafkaTemplate;GetMapping(/hello)public String hello() {kafkaTemplate.send(itcast-topic, hello kafka);return success;} }1.9.4 创建消费者 建com.heima.kafka.listener.HelloListener类负责监听消息 Component public class HelloListener {KafkaListener(topics itcast-topic)public void listen(String message) {if(!StringUtils.isEmpty(message)) {System.out.println(message message);}} }1.9.5 启动类 SpringBootApplication public class KafkaAppication {public static void main(String[] args) {SpringApplication.run(KafkaAppication.class, args);} }1.9.6 测试 打开localhost:9991/hello 已经接收到消息 1.10 kafka传递对象 1.10.1 创建User 创建com.heima.kafka.pojo.User Data public class User {private String username;private Integer age; }1.10.2 添加User的发送和接收 使用fastjson进行转换 Controller GetMapping(/user) public String user() {User user new User();user.setUsername(zhangsan);user.setAge(20);kafkaTemplate.send(user-topic, JSON.toJSONString(user));return success; }Listener KafkaListener(topics user-topic) public void listenUser(String message) {if(!StringUtils.isEmpty(message)) {User user JSON.parseObject(message, User.class);System.out.println(user);} }2 自媒体文章上下架 2.1 接口定义 2.2 Controller PostMapping(/down_or_up) public ResponseResult downOrUp(RequestBody WmNewsDto wmNewsDto){return wmNewsService.downOrUp(wmNewsDto); }2.3 Service 接口 ResponseResult downOrUp(WmNewsDto wmNewsDto);实现 Override public ResponseResult downOrUp(WmNewsDto wmNewsDto) {// 1.参数检查if(wmNewsDto.getId()null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,文章id不能为空);}// 2.查询文章WmNews wmNews getById(wmNewsDto.getId());if(wmNews null){return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,文章不存在);}// 3.修改文章状态if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,只有已发布的文章才能上下架);}if(wmNewsDto.getEnable()null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,enable不能为空);}wmNews.setEnable(wmNewsDto.getEnable());updateById(wmNews);// 4.返回结果return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }2.4 通知Article修改文章配置 2.4.1 导入kafka依赖 在heima-leadnews-common模块下导入kafka依赖 !-- kafkfa -- dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId /dependency dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId /dependency2.4.2 在Nacos中配置kafka的生产者 在自媒体端的nacos配置中心配置kafka的生产者在heima-leadnews-wemedia下的配置文件中配置kafka spring:kafka:bootstrap-servers: 192.168.204.129:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer2.4.3 自媒体通知Article 创建com.heima.common.constants.mNewsMessageConstants常量类保存kafka的topic. public class WmNewsMessageConstants {public static final String WM_NEWS_UP_OR_DOWN_TOPICwm.news.up.or.down.topic; }注入kafka Autowired private KafkaTemplateString,String kafkaTemplate;发送消息通知article端修改文章配置 //发送消息通知article端修改文章配置 if(wmNews.getArticleId() ! null){MapString,Object map new HashMap();map.put(articleId,wmNews.getArticleId());map.put(enable,dto.getEnable());kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map)); }2.4.4 在Nacos中配置kafka的消费者 在article端的nacos配置中心配置kafka的消费者 spring:kafka:bootstrap-servers: 192.168.204.129:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer2.4.5 配置ap_article_config表 因为需要修改ap_article_config所以需要创建对应service和mapper 创捷Servicecom.heima.article.service.ApArticleConfigService接口 public interface ApArticleConfigService extends IServiceApArticleConfig {/*** 修改文章配置* param map*/public void updateByMap(Map map); }实现 Service Slf4j Transactional public class ApArticleConfigServiceImpl extends ServiceImplApArticleConfigMapper, ApArticleConfig implements ApArticleConfigService {/*** 修改文章配置* param map*/Overridepublic void updateByMap(Map map) {//0 下架 1 上架Object enable map.get(enable);boolean isDown true;if(enable.equals(1)){isDown false;}//修改文章配置update(Wrappers.ApArticleConfiglambdaUpdate().eq(ApArticleConfig::getArticleId,map.get(articleId)).set(ApArticleConfig::getIsDown,isDown));} }2.4.6 article端监听 在article端编写监听接收数据 Component Slf4j public class ArtilceIsDownListener {Autowiredprivate ApArticleConfigService apArticleConfigService;KafkaListener(topics WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){Map map JSON.parseObject(message, Map.class);apArticleConfigService.updateByMap(map);log.info(article端文章配置修改articleId{},map.get(articleId));}} }2.4.7 测试 启动相应启动类 打开自媒体管理界面准备下架这个新闻 下架该文件发现两张表都已经修改完美进行下架 说明我们kafka的消息传递已经成功。
http://www.zqtcl.cn/news/380162/

相关文章:

  • 服务器怎么设置ip做网站凌云seo博客
  • 莱芜四大金刚是谁啊镇江网站优化推广
  • 上海门户网站开发企业号码查询系统
  • 西安做网站设计的公司golang 网站开发 教程
  • 做网站哪些公司专业做app软件开发公司
  • 蒙特网站建设湖北省建设厅网站上岗证查询
  • 宁波网站建设 联系哪家电子商务网站建设过程范文
  • 南宁商城网站建设网站建设的需求文档
  • dedeampz 部署wordpress 网站访问慢如何评价网站是否做的好处
  • 怎样建设个人影视网站设计学专业
  • 没有公司 接单做网站网站建设加盟合作
  • 如何将域名和网站绑定做网站找投资人
  • 网站开发 平台WordPress首页可见
  • 沧州做网站费用打开上海发布
  • 重庆潼南网站建设公司电话网站能调用一些字体
  • 摄影网站设计素材做彩票网站电话多少
  • 开网站公司企业管理网课
  • 相城高端网站建设施工建设集团网站
  • .电子商务网站的开发原则包括网络服务示范区创建情况
  • 网站如何做权重php做网站登陆验证
  • 昆山制造网站的地方网站建设 有聊天工具的吗
  • 自己做网站制作需要多少钱如何免费注册网站域名
  • 如何做网站美化怎样写网站文案
  • 做网站排名的wordpress 调整 行距
  • 三亚文明城市建设服务中心报名网站房地产活动策划网站
  • 休闲食品网站建设规划书常德做网站专业公司
  • 做美工好的网站网页设计排版布局
  • 网站建设公司合同模板下载wordpress微信公众平台开发教程
  • 快速wordpress 建网站免费代理游戏
  • 网站模板 寻模板大气宽屏网站模板企业源码带后台