公司网站未备案吗,h5商城网站怎么建设,美食网页设计报告,php与dw怎么做校园网站说明#xff1a;重复消费的原因大致是生产者将信息A发送到队列中#xff0c;消费者监听到消息A后开始处理业务#xff0c;业务处理完成后#xff0c;监听在告知rabbitmq消息A已经被消费完成途中中断#xff0c;也就时说我已经处理完业务#xff0c;而队列中还存在当前消息…说明重复消费的原因大致是生产者将信息A发送到队列中消费者监听到消息A后开始处理业务业务处理完成后监听在告知rabbitmq消息A已经被消费完成途中中断也就时说我已经处理完业务而队列中还存在当前消息A导致消费者服务恢复后又消费到消息A出现重复操作的业务。
解决思路我只要有一个地方记录了消息A已经被消费过了【这个消息必须得设置一个唯一标记】即使消息A再次被消费时比对一下如果有记录则说明消息A已经被消费如果没有说明没有被消费。
我使用redis及设置redis过期时间来解决重复消费问题。
工程图 1.pom.xml
?xml version1.0 encodingUTF-8?project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentartifactIdspring-boot-starter-parent/artifactId !-- 被继承的父项目的构件标识符 --groupIdorg.springframework.boot/groupId !-- 被继承的父项目的全球唯一标识符 --version2.2.2.RELEASE/version !-- 被继承的父项目的版本 --/parentgroupIdRabbitmqDemoOne/groupIdartifactIdRabbitmqDemoOne/artifactIdversion1.0-SNAPSHOT/versionpackagingwar/packagingnameRabbitmqDemoOne Maven Webapp/name!-- FIXME change it to the projects website --urlhttp://www.example.com/urlpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.target/propertiesdependencies!--spring boot核心--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependency!--spring boot 测试--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency!--springmvc web--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!--开发环境调试--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-devtools/artifactIdoptionaltrue/optional/dependency!--amqp 支持--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!--redis--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.78/version/dependency!-- commons-lang --dependencygroupIdcommons-lang/groupIdartifactIdcommons-lang/artifactIdversion2.5/version/dependency!--lombok--dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.16.10/version/dependency/dependenciesbuildfinalNameRabbitmqDemoOne/finalNamepluginManagement!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --pluginspluginartifactIdmaven-clean-plugin/artifactIdversion3.1.0/version/plugin!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging --pluginartifactIdmaven-resources-plugin/artifactIdversion3.0.2/version/pluginpluginartifactIdmaven-compiler-plugin/artifactIdversion3.8.0/version/pluginpluginartifactIdmaven-surefire-plugin/artifactIdversion2.22.1/version/pluginpluginartifactIdmaven-war-plugin/artifactIdversion3.2.2/version/pluginpluginartifactIdmaven-install-plugin/artifactIdversion2.5.2/version/pluginpluginartifactIdmaven-deploy-plugin/artifactIdversion2.8.2/version/plugin/plugins/pluginManagement/build
/project2.application.yml
server:port: 8080
spring:redis:host: 127.0.0.1port: 6379rabbitmq:port: 5672host: 192.168.199.139username: adminpassword: adminvirtual-host: /
3.RabbitMqConfig
package com.dev.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** author 李庆伟* title: RabbitMqConfig* date 2024/3/3 14:12*/
Configuration
public class RabbitMqConfig {/*** 队列* return repeatQueue队列名称 true 持久化*/Beanpublic Queue makeQueue(){return new Queue(repeatQueue,true);}}4.RedisTemplateConfig
package com.dev.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** author 李庆伟* title: RedisTemplateConfig* date 2024/3/3 14:24*/
Configuration
public class RedisTemplateConfig {Beanpublic RedisTemplateString, Object redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplateString, Object redisTemplate new RedisTemplate();redisTemplate.setConnectionFactory(redisConnectionFactory);// 设置键key的序列化采用StringRedisSerializer。redisTemplate.setKeySerializer(new StringRedisSerializer());//redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());//设置值value的序列化采用jdk// 设置值value的序列化采用FastJsonRedisSerializer。redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.afterPropertiesSet();return redisTemplate;}}5.RabbitRepeatController
package com.dev.controller;import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** author 李庆伟* title: RabbitRepeatContoller* date 2024/3/3 14:05*/
RestController
RequestMapping(repeatQueue)
public class RabbitRepeatContoller {AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法/*** 测试* return*/GetMapping(/sendMessage)public String sendMessage() {for (int i 0; i 1000; i) {String id UUID.randomUUID().toString().replace(-,);MapString,Object map new HashMap();map.put(id,id);map.put(name,张龙);map.put(phone,123..11);map.put(num,i);String str JSONObject.toJSONString(map);Message msg MessageBuilder.withBody(str.getBytes()).setMessageId(id).build();rabbitTemplate.convertAndSend(, repeatQueue, msg);}return ok;}}6.RabbitMqListener
package com.dev.listener;import com.alibaba.fastjson.JSON;
import com.dev.utils.RedisUtil;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;
import java.util.Map;/*** author 李庆伟* title: RabbitMqListener* date 2024/3/3 14:13*/
Component
public class RabbitMqListener {Autowiredprivate RedisUtil redisUtil;RabbitListener(queues repeatQueue)RabbitHandlerpublic void process(Message msg) throws UnsupportedEncodingException {//获取在发送消息时设置的唯一idString id msg.getMessageProperties().getMessageId();//去redis中查看是否有记录如果有证明已经消费过了String val redisUtil.get(id);if(StringUtils.isNotEmpty(val)){return;}String str new String(msg.getBody(),utf-8);if(StringUtils.isNotEmpty(str)){MapString,Object map JSON.parseObject(str,Map.class);System.out.println(map.get(num)----map.get(id)----map.get(name)----map.get(phone));//将消费过的消息记录到redis中失效时间为1个小时redisUtil.set(id,id,3600L);System.out.println(----------);}}}7.RedisUtil
package com.dev.utils;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** author 李庆伟* title: RedisUtil* date 2024/3/3 14:27*/Component
public class RedisUtil {Autowiredprivate RedisTemplate redisTemplate;/*** 批量删除对应的value** param keys*/public void remove(final String... keys) {for (String key : keys) {remove(key);}}/*** 批量删除key** param pattern*/public void removePattern(final String pattern) {SetSerializable keys redisTemplate.keys(pattern);if (keys.size() 0)redisTemplate.delete(keys);}/*** 删除对应的value** param key*/public void remove(final String key) {if (exists(key)) {redisTemplate.delete(key);}}/*** 判断缓存中是否有对应的value** param key* return*/public boolean exists(final String key) {return redisTemplate.hasKey(key);}/*** 读取缓存** param key* return*/public String get(final String key) {Object result null;redisTemplate.setValueSerializer(new StringRedisSerializer());ValueOperationsSerializable, Object operations redisTemplate.opsForValue();result operations.get(key);if(resultnull){return null;}return result.toString();}/*** 写入缓存** param key* param value* return*/public boolean set(final String key, Object value) {boolean result false;try {ValueOperationsSerializable, Object operations redisTemplate.opsForValue();operations.set(key, value);result true;} catch (Exception e) {e.printStackTrace();}return result;}/*** 写入缓存** param key* param value* return*/public boolean set(final String key, Object value, Long expireTime) {boolean result false;try {ValueOperationsSerializable, Object operations redisTemplate.opsForValue();operations.set(key, value);redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);result true;} catch (Exception e) {e.printStackTrace();}return result;}public boolean hmset(String key, MapString, String value) {boolean result false;try {redisTemplate.opsForHash().putAll(key, value);result true;} catch (Exception e) {e.printStackTrace();}return result;}public MapString,String hmget(String key) {MapString,String result null;try {result redisTemplate.opsForHash().entries(key);} catch (Exception e) {e.printStackTrace();}return result;}/*** 递增** param key 键* paramby 要增加几(大于0)* return*/public long incr(String key, long delta) {if (delta 0) {throw new RuntimeException(递增因子必须大于0);}return redisTemplate.opsForValue().increment(key, delta);}/*** 递减** param key 键* paramby 要减少几(小于0)* return*/public long decr(String key, long delta) {if (delta 0) {throw new RuntimeException(递减因子必须大于0);}return redisTemplate.opsForValue().increment(key, -delta);}/*** redis zset可已设置排序案例热搜** param key 键* paramby* return*/public void zadd(String key ,String name) {BoundZSetOperationsObject, Object boundZSetOperations redisTemplate.boundZSetOps(key);//自增长后的数据boundZSetOperations.incrementScore(name,1);}}8.App
package com.dev;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** author 李庆伟* title: App* date 2024/3/3 14:01*/
SpringBootApplication
public class App {public static void main(String[] args) {SpringApplication.run(App.class);}
}