dedecms做网站怎么查看,中山专业网站建设公司,品牌策划岗位职责,湖北十堰背景
消息服务通信机制为异步#xff0c;且网络连接不是100%可靠#xff0c;会因为网络闪断问题丢失消息#xff0c;作为企业应用#xff0c;需要保证业务消息传输的可靠性#xff0c;需实现以下机制#xff1a; a)发送方重发机制#xff1a;消息发送方对未收到响应的消…背景
消息服务通信机制为异步且网络连接不是100%可靠会因为网络闪断问题丢失消息作为企业应用需要保证业务消息传输的可靠性需实现以下机制 a)发送方重发机制消息发送方对未收到响应的消息进行重发重发时保证消息唯一性标识、消息内容不变 b)接收方消息去重机制消息接收方依据消息的唯一性标识对收到的消息进行验证判断是否已处理过如已处理过则不再进行处理
前面我们依托消息日志实现了消息服务端消息重发的设计与实现即通过消息日志表来缓存待发送或发送失败的消息然后通过定时器来执行一段逻辑从消息日志重建消息找到要接收消息的客户端连接然后推送消息。
问题
该方案虽然能实现消息重发但存在以下几个问题 1.依托消息日志来实现重发功能消息日志的职责不再单一 2.消息日志数量大的情况下查询待发送消息耗时长性能低 3.消息日志清理时需注意保留待发送的消息或已经发生尚未收到响应的消息
其本质问题还是在于消息日志的职责不单一带来肩负着额外的消息重发功能。
解决方案
重构优化新增活跃消息实体承接待发送或已发送尚未收到响应的消息当消息已发送且收到响应后再将其转移到消息日志中。
同时考虑到对接的相关方系统可能会因为系统异常如宕机导致消息服务中心的消息推送次数达到预设次数上限停止自动推送。
当相关方系统恢复正常时需要消息服务中心重新推送发送失败的消息因此新增手工重发功能在活跃消息列表页面可根据条件组合查询消息勾选记录后执行重发操作。
系统实现
实体配置
使用平台的实体配置功能拷贝现有的消息日志实体MessageLog更名为ActiveMessage。 执行生成库表、生成代码、拷贝代码、编译运行、配置菜单、设置权限等基础操作。
消息收发相关调整
消息服务端收到系统类请求消息如登录请求这类请求不需要消息转发因此继续使用原消息日志服务保存请求和响应。
收到业务请求消息如委托单创建需要回复一条消息确认继续使用原消息日志服务保存请求和响应 同时判断是否需要消息转发如需要则使用新建的活跃消息来处理发送请求。 /*** 发送消息** param appCode 应用标识* param content 消息内容* param id 消息标识*/public void sendMessage(String appCode, String content, String id) {// 生成请求消息RequestMessage message new RequestMessage();// 使用已有ID重置默认生成的ID用于关联消息if (StringUtils.isNotBlank(id)) {message.setId(id);}// 设置相关属性message.setTopic(super.getTopic());// 参数中消息内容优先如为空取对象属性的值if (StringUtils.isNotBlank(content)) {message.setContent(content);} else {message.setContent(this.getContent());}message.setPublishAppCode(appConfig.getMessage().getMessageServerAppCode());App app appService.getByCode(appCode);if (app.getIntegrationModel().equals(IntegrationModelEnum.CLIENT.name())) {// 客户端对接模式// 获取发送通道Channel channel MessageServerHolder.getChannel(appCode);if (channel ! null channel.isActive()) {ChannelFuture channelFuture channel.writeAndFlush(message);channelFuture.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (StringUtils.isBlank(id)) {// 创建活跃消息ActiveMessage activeMessage activeMessageService.createRequestPart(message, appCode);// 设置状态为已请求activeMessageService.updateStatus(MessageStatusEnum.REQUESTED.name(), activeMessage.getRequestId());// 发送次数加1activeMessageService.incrementSendCount(activeMessage.getRequestId());} else {// 更新发送次数activeMessageService.incrementSendCount(id);}}});} else {// 创建日志activeMessageService.createRequestPart(message, appCode);}} else {// api接口对接模式if (StringUtils.isBlank(id)) {// 创建活跃消息ActiveMessage activeMessage activeMessageService.createRequestPart(message, appCode);// 设置状态为待处理activeMessageService.updateStatus(ApiMessageStatusEnum.WAIT_HANDLE.name(), activeMessage.getRequestId());}}}收到响应消息如消息确认使用新建的活跃消息来处理响应查找相应的发送记录补全信息然后从活跃消息移动到消息日志中。 /*** 消息处理** param message 消息* param channel 通道*/Transactional(rollbackFor Exception.class)public void handleMessage(ResponseMessage responseMessage, Channel channel) {// 验证框架String appCode MessageServerHolder.getAppCode(channel);validateFramework(responseMessage, appCode);// 更新活跃消息activeMessageService.updateResponsePart(responseMessage);// 拷贝至消息日志ActiveMessage activeMessage activeMessageService.getByRequestMessageId(responseMessage.getRequestMessageId());MessageLog messageLog new MessageLog();BeanUtils.copyProperties(activeMessage, messageLog);messageLogService.add(messageLog);// 删除活跃消息activeMessageService.remove(activeMessage.getId());// 特殊处理messageOperation(responseMessage, channel);}
消息重发相关调整
消息重发原由消息日志服务来负责现变更为由活跃消息服务来处理。 public void resend() {// 需要进行异常处理否则某次异常会导致定时器停止运行try {// 先获取需要重发的应用列表ListString resendAppList activeMessageService.getResendAppList(appConfig.getMessage().getMaxSendCount());if (CollectionUtils.isNotEmpty(resendAppList)) {log.info(读取到需要重发的应用数量:{}, resendAppList.size());// 遍历应用列表获取要重发的消息for (String appCode : resendAppList) {// 查找待重发的消息ListActiveMessage list activeMessageService.getResendMessage(appConfig.getMessage().getSendMessageCount(),appConfig.getMessage().getMaxSendCount(), appCode);log.info(读取到需要重发至应用{}的消息数量:{}, appCode, list.size());for (int i 0; i list.size(); i) {ActiveMessage activeMessage list.get(i);// 根据消息主题构建发送器RequestMessageSender sender (RequestMessageSender) MessageSenderFactory.createSender(activeMessage.getRequestTopicCode());// 传入原请求的消息标识和消息内容sender.sendMessage(activeMessage.getResponseAppCode(), activeMessage.getRequestData(), activeMessage.getRequestId());}}}} catch (Exception e) {log.error(消息重发处理异常, e);}消息查询和消息确认API调整
由活跃消息服务替换掉原消息日志服务消息确认时补全信息从活跃消息移动到消息日志。
/*** 消息查询处理器** author wqliu* date 2021-8-20**/
Slf4j
public class MessageQueryHandler extends BaseServiceHandlerMessageQueryParameter {Overrideprotected String handleBusiness(MessageQueryParameter parameter, String appCode) {ActiveMessageService service SpringUtil.getBean(ActiveMessageService.class);ListActiveMessage list service.queryWaitHandleMessages(parameter.getCount(), appCode);String data JSON.toJSONString(list);log.info(查询到的待处理消息为{}, data);return data;}
} Overridepublic void confirm(String requestMessageId, String appCode) {// 获取消息日志对象ActiveMessage activeMessage getByRequestMessageId(requestMessageId);// 判断是否有权限对本消息确认if (!appCode.equals(activeMessage.getResponseAppCode())) {throw new CustomException(ActiveMessageExceptionEnum.MESSAGE_CONFIRM_PERMISSION_ERROR);}// 更新消息日志activeMessage.setStatus(ApiMessageStatusEnum.HANDLED.name());activeMessage.setResponseResult(MessageResponseResultEnum.SUCCESS.name());activeMessage.setResponseTime(LocalDateTime.now());// 更新日志modify(activeMessage);// 拷贝至消息日志MessageLog messageLog new MessageLog();BeanUtils.copyProperties(activeMessage, messageLog);messageLogService.add(messageLog);// 删除活跃消息remove(activeMessage.getId());}开源平台资料
平台名称一二三开发平台 简介 企业级通用低代码开发平台 设计资料CSDN专栏 开源地址Gitee 开源协议MIT 欢迎收藏、点赞、评论你的支持是我前行的动力。