门户网站源码,国内建筑设计公司排名,音乐网站界面,工商信息查询官网问题描述
业务验收阶段#xff0c;遇到了一个由于成员变量导致的线程问题
有一个kafka切面#xff0c;用来处理某些功能在调用前后的发送消息#xff0c;资产类型type是成员变量定义#xff1b;
资产1类型推送消息是以zichan1为节点#xff1b;资产2类型推送消息是以zi…问题描述
业务验收阶段遇到了一个由于成员变量导致的线程问题
有一个kafka切面用来处理某些功能在调用前后的发送消息资产类型type是成员变量定义
资产1类型推送消息是以zichan1为节点资产2类型推送消息是以zichan2为节点
当多个线程调用切面类时由于切面类中使用成员变量且为动态数据时此时会出现根据资产类型推送消息错误例如资产1在调用功能时切面类的type字段为zichan1同时有资产2调用功能时此时的切面类的type字段为zichan2;导致资产1在调用功能前推送的是zichan1,在调用功能后推送的是zichan2的消息标识。
原因
当多个线程同时调用时成员变量则会只采用最后一次调用的值。
下面简单描述下情景
kafkaAspect类
import com.alibaba.fastjson.JSON;
import com.example.demo.constant.StageCodeConstant;
import com.example.demo.entity.KafkaSendMessageConstant;
import com.example.demo.util.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ArrayUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;Component
Aspect
Slf4j
public class KafkaProcessAspect {private static final Logger log LoggerFactory.getLogger(KafkaProcessAspect.class);private String assetType ;private String startTime ;Around(annotation(kafkaProcess))public Object doAround(ProceedingJoinPoint joinPoint, KafkaProcess kafkaProcess) throws Throwable {Object object null;assetType (String) getParamValue(joinPoint, assetType);startTime DateUtils.getTime();object joinPoint.proceed();//推送消息String stageCode StageCodeConstant.STAGE_CODE.get(assetType).get(kafkaProcess.functionName());KafkaSendMessageConstant messageConstant new KafkaSendMessageConstant();messageConstant.setStageCode(stageCode);messageConstant.setStartTime(startTime);messageConstant.setEndTime(DateUtils.getTime());log.info(资产类型{}推送消息{}, assetType, JSON.toJSONString(messageConstant));return object;}private Object getParamValue(ProceedingJoinPoint joinPoint, String paramName) {Object[] params joinPoint.getArgs();String[] parameterNames ((MethodSignature) joinPoint.getSignature()).getParameterNames();if (parameterNames null || parameterNames.length 0) {return null;}for (String param : parameterNames) {if (param.equals(paramName)) {int index ArrayUtils.indexOf(parameterNames, param);return params[index];}}return null;}}由于controller中的请求地址是采用占位符定义后使用PathVariable可以获取的类型
Controller类
import com.example.demo.aop.KafkaProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;RestController
RequestMapping(/test)
public class TestController {private static final Logger logger LoggerFactory.getLogger(TestController.class);PostMapping(/{assetType}/cmpt)KafkaProcess(functionName JS)public void cmpt(PathVariable(assetType) String assetType) {logger.info({}接口开始, assetType);long startTime System.currentTimeMillis();for (int i 0; i 20000; i) {for (int j 0; j 20000; j) {int m i * j;logger.debug(i*j{},m);}}long endTime System.currentTimeMillis();logger.info({}接口结束,耗时{}, assetType, endTime - startTime);}}资产类型枚举 AssetTypeEnum
public enum AssetTypeEnum {ZICHAN_1(zichan1,资产1),ZICHAN_2(zichan2,资产2);// 成员变量private String code;private String desc;public String getCode() {return code;}public void setCode(String code) {this.code code;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc desc;}// 构造方法AssetTypeEnum(String code,String desc) {this.code code;this.desc desc;}}推送消息 KafkaSendMessageConstant
public class KafkaSendMessageConstant {private String stageCode;private String startTime;private String endTime;public String getStageCode() {return stageCode;}public void setStageCode(String stageCode) {this.stageCode stageCode;}public String getStartTime() {return startTime;}public void setStartTime(String startTime) {this.startTime startTime;}public String getEndTime() {return endTime;}public void setEndTime(String endTime) {this.endTime endTime;}
}节点常量类 StageCodeConstant 根据不同资产类型赋值不同功能的推送标识
import java.util.HashMap;
import java.util.Map;public class StageCodeConstant {public static final MapString, MapString, String STAGE_CODE new HashMap();static {//资产1STAGE_CODE.put(AssetTypeEnum.ZICHAN_1.getCode(),new HashMapString, String() {{put(JS, ZICHAN1-JS);}});//资产2STAGE_CODE.put(AssetTypeEnum.ZICHAN_2.getCode(),new HashMapString, String() {{put(JS, ZICHAN2-JS);}});}
}用postman调用资产2接口 后立即调用资产1接口 此时出现这种结果 会发现调用资产2的时候发送消息还是资产1的信息然后资产1发送的消息也是资产1的信息
解决
此时有两个解决办法一个是将doAround()和其他方法合并为一个方法将成员变量调整为局部变量另一个则为将该成员变量设置为一个对象对这个对象进行线程设置保证doAround()和doBefore()获取的是同一个对象的数据
解决方法一
import com.alibaba.fastjson.JSON;
import com.example.demo.constant.StageCodeConstant;
import com.example.demo.entity.KafkaSendMessageConstant;
import com.example.demo.util.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ArrayUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;Component
Aspect
Slf4j
public class KafkaProcessAspect {private static final Logger log LoggerFactory.getLogger(KafkaProcessAspect.class);Around(annotation(kafkaProcess))public Object doAround(ProceedingJoinPoint joinPoint, KafkaProcess kafkaProcess) throws Throwable {Object object null;String assetType (String) getParamValue(joinPoint, assetType);String startTime DateUtils.getTime();object joinPoint.proceed();//推送消息String stageCode StageCodeConstant.STAGE_CODE.get(assetType).get(kafkaProcess.functionName());KafkaSendMessageConstant messageConstant new KafkaSendMessageConstant();messageConstant.setStageCode(stageCode);messageConstant.setStartTime(startTime);messageConstant.setEndTime(DateUtils.getTime());log.info(资产类型{}推送消息{}, assetType, JSON.toJSONString(messageConstant));return object;}private Object getParamValue(ProceedingJoinPoint joinPoint, String paramName) {Object[] params joinPoint.getArgs();String[] parameterNames ((MethodSignature) joinPoint.getSignature()).getParameterNames();if (parameterNames null || parameterNames.length 0) {return null;}for (String param : parameterNames) {if (param.equals(paramName)) {int index ArrayUtils.indexOf(parameterNames, param);return params[index];}}return null;}}解决方法二
成员变量KafkaSingleDTO
public class KafkaSingleDTO {private String assetType;private String date;public String getAssetType() {return assetType;}public void setAssetType(String assetType) {this.assetType assetType;}public String getDate() {return date;}public void setDate(String date) {this.date date;}
}对象单实例获取 KafkaSingleUtil
import com.example.demo.entity.KafkaSingleDTO;public class KafkaSingleUtil {private static ThreadLocalKafkaSingleDTO START new ThreadLocal();public static KafkaSingleDTO getObject() {KafkaSingleDTO singleDTO START.get();if (singleDTO null) {singleDTO new KafkaSingleDTO();}return singleDTO;}}kafkaAsspect拦截器
import com.alibaba.fastjson.JSON;
import com.example.demo.constant.StageCodeConstant;
import com.example.demo.entity.KafkaSendMessageConstant;
import com.example.demo.entity.KafkaSingleDTO;
import com.example.demo.util.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ArrayUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;Component
Aspect
Slf4j
public class KafkaProcessAspect {private static final Logger log LoggerFactory.getLogger(KafkaProcessAspect.class);private static String assetType;private static String startTime;Around(annotation(kafkaProcess))public Object doAround(ProceedingJoinPoint joinPoint, KafkaProcess kafkaProcess) throws Throwable {Object object null;assetType (String) getParamValue(joinPoint, assetType);startTime DateUtils.getTime();KafkaSingleDTO singleDTO KafkaSingleUtil.getObject();singleDTO.setAssetType(assetType);singleDTO.setDate(startTime);object joinPoint.proceed();//推送消息--方法调用后String stageCode StageCodeConstant.STAGE_CODE.get(singleDTO.getAssetType()).get(kafkaProcess.functionName());KafkaSendMessageConstant messageConstant new KafkaSendMessageConstant();messageConstant.setStageCode(stageCode);messageConstant.setStartTime(singleDTO.getDate());messageConstant.setEndTime(DateUtils.getTime());log.info(资产类型{}方法后推送消息{}, singleDTO.getAssetType(), JSON.toJSONString(messageConstant));return object;}Before(annotation(kafkaProcess))public void doBefore(KafkaProcess kafkaProcess){//推送消息--方法调用前KafkaSingleDTO singleDTO KafkaSingleUtil.getObject();singleDTO.setAssetType(assetType);String stageCode StageCodeConstant.STAGE_CODE.get(singleDTO.getAssetType()).get(kafkaProcess.functionName());KafkaSendMessageConstant messageConstant new KafkaSendMessageConstant();messageConstant.setStageCode(stageCode);//...消息实体类 可自补充log.info(资产类型{}方法前推送消息{}, singleDTO.getAssetType(), JSON.toJSONString(messageConstant));}private Object getParamValue(ProceedingJoinPoint joinPoint, String paramName) {Object[] params joinPoint.getArgs();String[] parameterNames ((MethodSignature) joinPoint.getSignature()).getParameterNames();if (parameterNames null || parameterNames.length 0) {return null;}for (String param : parameterNames) {if (param.equals(paramName)) {int index ArrayUtils.indexOf(parameterNames, param);return params[index];}}return null;}
}最终结果 到此结束