汕头拿家做网站,厚街做网站公司,化妆品网站建设经济可行性分析,访问紧急升级中通知问升级文章目录 1、kafka确保消息不丢失#xff1f;1.1、生产者端确保消息不丢失1.2、kafka服务端确保消息不丢失1.3、消费者确保正确无误的消费 2、生产者发送消息 KafkaService3、UserInfoServiceImpl - login()4、service-account - AccountListener.java 1、kafka确保消… 文章目录 1、kafka确保消息不丢失1.1、生产者端确保消息不丢失1.2、kafka服务端确保消息不丢失1.3、消费者确保正确无误的消费 2、生产者发送消息 KafkaService3、UserInfoServiceImpl - login()4、service-account - AccountListener.java 1、kafka确保消息不丢失
1.1、生产者端确保消息不丢失
发送模式发后即忘、同步阻塞确认、异步非阻塞确认生产者acks模式props.put(“acks”, “all”)、acks: all-1配置重试props.put(“retries”, 3)、retries: 3
1.2、kafka服务端确保消息不丢失
kafka是文件型的消息中间件不会单纯的因为服务器宕机导致消息丢失消息的log日志文件损坏搭建kafka集群副本
1.3、消费者确保正确无误的消费
偏移量提交 自动提交enable-auto-commit: true 手动提交ack-mode: manual_immediate同步提交 异步提交推荐偏移量重置 auto-offset-reset: earliest - 如果有偏移量则继续消费如果偏移量没了从头重新进行消费可能会存在幂等性问题 auto-offset-reset: latest - 如果有偏移量则继续消费如果偏移量不存在只消费新消息旧消息没消费完就丢掉了 auto-offset-reset: none - 如果有偏移量则继续消费如果偏移量不存在抛出异常消费者重试重试主题和死信主题 RetryableTopic()
2、生产者发送消息 KafkaService
package com.atguigu.tingshu.common.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;Service
public class KafkaService {private static final Logger logger LoggerFactory.getLogger(KafkaService.class);Autowiredprivate KafkaTemplate kafkaTemplate;/*** 向指定主题发送消息* 此方法通过调用重载的sendMsg方法向指定主题发送消息使用默认的消息标签和消息键** param topic 发送消息的主题* param msg 需要发送的消息内容*/public void sendMsg(String topic, String msg){// 调用重载的sendMsg方法传入默认值以简化调用this.sendMsg(topic, null, null, msg);}/*** 发送消息到指定的Kafka主题** param topic 消息主题* param partition 分区编号* param key 消息键值* param msg 消息内容*/public void sendMsg(String topic, Integer partition, String key, String msg){// 发生消息并返回异步结果CompletableFutureSendResult future this.kafkaTemplate.send(topic, partition, key, msg);// 异步处理发送结果future.whenCompleteAsync((result, ex) - {if (ex ! null){// 如果发送过程中出现异常logger.error(生产者发送消息失败原因{}, ex.getMessage());}});}}
whenCompleteAsync异步完成时的处理、当异步操作完成时
3、UserInfoServiceImpl - login()
此时 service-user 是生产者 发送消息 Slf4j
Service
SuppressWarnings({unchecked, rawtypes})
public class UserInfoServiceImpl extends ServiceImplUserInfoMapper, UserInfo implements UserInfoService {Autowiredprivate WxMaService wxMaService;Autowiredprivate RedisTemplate redisTemplate;Autowiredprivate UserAccountFeignClient userAccountFeignClient;Autowiredprivate KafkaService kafkaService;/*** 根据微信返回的code进行用户登录* param code 微信登录凭证* return 返回包含登录令牌的Map对象*///GlobalTransactional//TransactionalOverridepublic MapString, Object login(String code) {// 创建一个HashMap对象用于存放返回的数据HashMapString, Object map new HashMap();try {// 通过微信服务获取用户的会话信息WxMaJscode2SessionResult sessionInfo this.wxMaService.getUserService().getSessionInfo(code);// 获取用户的openidString openid sessionInfo.getOpenid();// 查询数据库中是否存在该openid对应的用户信息UserInfo userInfo this.getOne(new LambdaQueryWrapperUserInfo().eq(UserInfo::getWxOpenId, openid));if (userInfo null) {// 如果用户不存在则创建一个新的UserInfo对象userInfo new UserInfo();// 设置用户的openiduserInfo.setWxOpenId(openid);// 设置用户的昵称其中包含一个随机生成的IDuserInfo.setNickname(这家伙太懒 IdWorker.getIdStr());// 设置用户的头像URLuserInfo.setAvatarUrl(https://img0.baidu.com/it/u1633409170,3159960019fm253fmtautoapp138fJPEG?w500h500);// 保存用户信息到数据库this.save(userInfo);// 初始化用户账号信息//userAccountFeignClient.initAccount(userInfo.getId());this.kafkaService.sendMsg(KafkaConstant.QUEUE_USER_REGISTER,userInfo.getId().toString());//int i 1 / 0;}// 生成一个随机的登录令牌String token UUID.randomUUID().toString();// 创建一个UserInfoVo对象用于存放用户信息UserInfoVo userInfoVo new UserInfoVo();// 将UserInfo对象的属性复制到UserInfoVo对象中BeanUtils.copyProperties(userInfo, userInfoVo);// 将用户信息存储到Redis中设置过期时间为30分钟this.redisTemplate.opsForValue().set(RedisConstant.USER_LOGIN_KEY_PREFIX token, userInfoVo,RedisConstant.USER_LOGIN_KEY_TIMEOUT, TimeUnit.SECONDS);// 将生成的登录令牌放入Map对象中map.put(token, token);// 返回包含登录令牌的Map对象return map;} catch (WxErrorException e) {// 如果发生微信错误异常抛出自定义的异常throw new GuiguException(ResultCodeEnum.LOGIN_AUTH);}}} 4、service-account - AccountListener.java
此时 service-account 是消费者 接收消息 Slf4j
Component
public class AccountListener {Autowiredprivate UserAccountService userAccountService;RetryableTopic(backoff Backoff(2000))KafkaListener(topics KafkaConstant.QUEUE_USER_REGISTER)public void listen(String userId, Acknowledgment ack){// 如果是空消息直接确认掉后续不用再执行if (StringUtils.isBlank(userId)) {ack.acknowledge();return;}// 初始化账户this.userAccountService.saveAccount(Long.valueOf(userId));ack.acknowledge();// 手动确认}
}