当前位置: 首页 > news >正文

互联网App网站建设方案建筑人才网评的助工

互联网App网站建设方案,建筑人才网评的助工,网站模板 asp pc wap,电子商务网站建设技能论文kafka-顺序消息实现 场景 在购物付款的时候#xff0c;订单会有不同的订单状态#xff0c;对应不同的状态事件#xff0c;比如#xff1a;待支付#xff0c;支付成功#xff0c;支付失败等等#xff0c;我们会将这些消息推送给消息队列 #xff0c;后续的服务会根据订…kafka-顺序消息实现 场景 在购物付款的时候订单会有不同的订单状态对应不同的状态事件比如待支付支付成功支付失败等等我们会将这些消息推送给消息队列 后续的服务会根据订单状态进行不同的业务处理这就要求订单状态推送就要有状态的保证 解决方案 生产者将相同的key的订单状态事件推送到kafka的同一分区kafka 消费者接收消息消费者将消息提交给线程池线程池根据接收到的消息将订单状态事件使用路由策略选择其中一个线程将具有相同路由key的事件发送到同一个线程的阻塞队列中单个线程不停的从阻塞队列获取订单状态消息消费 代码实现 引入依赖 parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.2.2/versionrelativePath/ !-- lookup parent from repository -- /parent groupIdcom.example/groupId artifactIdboot-kafka/artifactId version0.0.1-SNAPSHOT/version nameboot-kafka/name descriptionboot-kafka/description propertiesjava.version17/java.version /properties dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion2.0.39/version/dependency /dependencies 使用到的DTO Data public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;Overridepublic String getUniqueNo() {return getOrderNo();} }Data public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;Overridepublic String getUniqueNo() {return getOrderNo();} }public interface OrderMessage {/*** 线程池路由key* return*/String getUniqueNo();}定义topic 这里是 3个分区2个副本 Configuration public class KafkaConfiguration {Beanpublic NewTopic topic(){return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);} }public interface Constants {String TOPIC_ORDER order; }消费者 消费者OrderListener Component Slf4j public class OrderListener {Autowiredprivate OrderThreadPoolOrderWorker, InterOrderDto orderThreadPool;KafkaListener(topics Constants.TOPIC_ORDER, groupId orderGroup, concurrency 3)public void logListener(ConsumerRecordString, String record) {log.debug( receive log event: {}-{}, record.partition(), record.value());try {OrderDto orderDto JSON.parseObject(record.value(), OrderDto.class);InterOrderDto interOrderDto new InterOrderDto();BeanUtils.copyProperties(orderDto, interOrderDto);interOrderDto.setPartition(record.partition() );orderThreadPool.dispatch(interOrderDto);} catch (Exception e) {log.error(# kafka log listener error: {}, record.value(), e);}}}线程池 OrderThreadPool /*** Date: 2024/1/24 10:23* 线程池实现** param W: worker* param D: message*/ Slf4j public class OrderThreadPoolW extends SingleThreadWorkerD, D extends OrderMessage {private ListW workers;private int size;public OrderThreadPool(int size, SupplierW provider) {this.size size;workers new ArrayList(size);for (int i 0; i size; i) {workers.add(provider.get());}if (CollectionUtils.isEmpty(workers)) {throw new RuntimeException(worker size is 0);}start();}/*** route message to single thread** param data*/public void dispatch(D data) {W w getUniqueQueue(data.getUniqueNo());w.offer(data);}private W getUniqueQueue(String uniqueNo) {int queueNo uniqueNo.hashCode() % size;for (W worker : workers) {if (queueNo worker.getQueueNo()) {return worker;}}throw new RuntimeException(worker 路由失败);}/*** start worker, only start once*/private void start() {for (W worker : workers) {new Thread(worker, OWorder- worker.getQueueNo()).start();}}/*** 关闭所有 workder, 等待所有任务执行完*/public void shutdown() {for (W worker : workers) {worker.shutdown();}}} 工作线程SingleThreadWorker, 内部使用阻塞队列使其串行化 /*** Date: 2024/1/24 10:58* single thread with a blocking-queue*/ Slf4j public abstract class SingleThreadWorkerT implements Runnable {private static AtomicInteger cnt new AtomicInteger(0);private BlockingQueueT queue;private boolean started true;/*** worker 唯一id*/Getterprivate int queueNo;public SingleThreadWorker(int size) {this.queue new LinkedBlockingQueue(size);this.queueNo cnt.getAndIncrement();log.info(init worker {}, this.queueNo);}/*** 提交消息** param data*/public void offer(T data) {try {queue.put(data);} catch (InterruptedException e) {log.info({} offer error: {}, Thread.currentThread().getName(), JSON.toJSONString(data), e);}}Overridepublic void run() {log.info({} worker start take , Thread.currentThread().getName());while (started) {try {T data queue.take();doConsumer(data);} catch (InterruptedException e) {log.error(queue take error, e);}}}/*** do real consume message** param data*/protected abstract void doConsumer(T data);/*** consume rest of message in the queue when thread-pool shutdown*/public void shutdown() {this.started false;ArrayListT rest new ArrayList();int i queue.drainTo(rest);if (i 0) {log.info({} has rest in queue {}, Thread.currentThread().getName(), i);for (T t : rest) {doConsumer(t);}}}} 工作线程实现OrderWorker, 这里就单独处理订单事件 /*** Date: 2024/1/24 13:42* 具体消费者*/ Slf4j public class OrderWorker extends SingleThreadWorkerInterOrderDto{public OrderWorker(int size) {super(size);}Overrideprotected void doConsumer(InterOrderDto data) {log.info({} consume msg: {}, Thread.currentThread().getName(), JSON.toJSONString(data));} }生产者 生产者OrderController, 模拟发送不同的事件类型的订单 RestController public class OrderController {Autowiredprivate KafkaTemplateString, String kafkaTemplate;GetMapping(/send)public String send() throws InterruptedException {int size 1000;for (int i 0; i size; i) {OrderDto orderDto new InterOrderDto();orderDto.setOrderNo(i );orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return success;}private String getStatus(int status){return status 0 ? 待支付 : status 1 ? 已支付 : 支付失败;} }application.properties 配置 # kafka地址 spring.kafka.bootstrap-servers192.168.x.x:9092 spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer启动类 Slf4j SpringBootApplication public class BootKafkaApplication {public static void main(String[] args) {SpringApplication.run(BootKafkaApplication.class, args);}/*** 配置线程池* return*/Beanpublic OrderThreadPoolOrderWorker, InterOrderDto orderThreadPool(){OrderThreadPoolOrderWorker, InterOrderDto threadPool new OrderThreadPool(3, () - new OrderWorker(100));Runtime.getRuntime().addShutdownHook(new Thread(() - {log.info(shutdown orderThreadPool);//容器关闭时让工作线程中的任务都被消费完threadPool.shutdown();}));return threadPool;}}测试 访问: http://localhost:8080/send 结果: OWorder-0 worker start take OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:待支付,timestamp:1706084482134,uniqueNo:0} OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:已支付,timestamp:1706084482271,uniqueNo:0} OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:支付失败,timestamp:1706084482282,uniqueNo:0} OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:待支付,timestamp:1706084482326,uniqueNo:3} OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:已支付,timestamp:1706084482336,uniqueNo:3} OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:支付失败,timestamp:1706084482347,uniqueNo:3} OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:待支付,timestamp:1706084482391,uniqueNo:6} OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:已支付,timestamp:1706084482401,uniqueNo:6} OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:支付失败,timestamp:1706084482412,uniqueNo:6}可以发现在我们工作线程中事件消费是有序的 good luck!
http://www.zqtcl.cn/news/613387/

相关文章:

  • 最新创建的网站搭建网站的平台有哪些
  • 全国房地产网站企管宝app下载
  • 无线网络网站dns解析失败南通模板建站多少钱
  • h5手机网站建设哪家好北京海淀房管局网站
  • 制作一个简单的网站冬奥会网页设计代码
  • 如何做网站 百度西充建设部门投诉网站
  • 怎么创建自己的博客网站网站优化主要内容
  • 太原网站建设推广建设网站观澜
  • 网站开发员名称是什么网站制作教程及流程
  • 建设财经资讯网站的目的移动端网站模板怎么做的
  • 受欢迎的赣州网站建设青岛建站
  • 青海网站制作哪家好烟台龙口网站建设
  • 婚恋网站排名前十名网站建设的论坛
  • 进行网站建设有哪些重要意义上海浦东建筑建设网站污水处理工程
  • 自己做qq代刷网站要钱吗瑞安网站建设优化推广
  • 建设网站招标定制高端网站建设报价
  • 商城网站建设code521广州安全教育平台登录入囗
  • 如何做网站系统安庆网站建设公司简
  • 北京做网站电话的公司网站怎么做外链
  • 手工艺品外贸公司网站建设方案复古风格网站
  • 企业网站后端模板如何编写手机程序
  • 泰州网站建设服务好wordpress 子分类
  • 做个企业网站要多少钱php mysql怎么编写视频网站
  • 精仿手表网站做网站为什么要做备案接入
  • 哈什么网一个网站做ppt清新区城乡建设局网站
  • 重庆专业网站建设首页排名网站模板广告去除
  • 河南省建设行业证书查询网站怎么用ps做网站首页背景图片
  • 如何取一个大气的名字的做网站青岛北方现货交易平台
  • 关于做书的网站购物网站建设资讯
  • 运营网站开发工作招聘做装修有什么好网站可以做