当前位置: 首页 > news >正文

邯郸网站设计培训网站入口首页

邯郸网站设计培训,网站入口首页,巨量算数,网页设计公司概念kafka-顺序消息实现 场景 在购物付款的时候#xff0c;订单会有不同的订单状态#xff0c;对应不同的状态事件#xff0c;比如#xff1a;待支付#xff0c;支付成功#xff0c;支付失败等等#xff0c;我们会将这些消息推送给消息队列 #xff0c;后续的服务会根据订…kafka-顺序消息实现 场景 在购物付款的时候订单会有不同的订单状态对应不同的状态事件比如待支付支付成功支付失败等等我们会将这些消息推送给消息队列 后续的服务会根据订单状态进行不同的业务处理这就要求订单状态推送就要有状态的保证 解决方案 生产者将相同的key的订单状态事件推送到kafka的同一分区kafka 消费者接收消息消费者将消息提交给线程池线程池根据接收到的消息将订单状态事件使用路由策略选择其中一个线程将具有相同路由key的事件发送到同一个线程的阻塞队列中单个线程不停的从阻塞队列获取订单状态消息消费 代码实现 引入依赖 parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.2.2/versionrelativePath/ !-- lookup parent from repository -- /parent groupIdcom.example/groupId artifactIdboot-kafka/artifactId version0.0.1-SNAPSHOT/version nameboot-kafka/name descriptionboot-kafka/description propertiesjava.version17/java.version /properties dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion2.0.39/version/dependency /dependencies 使用到的DTO Data public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;Overridepublic String getUniqueNo() {return getOrderNo();} }Data public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;Overridepublic String getUniqueNo() {return getOrderNo();} }public interface OrderMessage {/*** 线程池路由key* return*/String getUniqueNo();}定义topic 这里是 3个分区2个副本 Configuration public class KafkaConfiguration {Beanpublic NewTopic topic(){return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);} }public interface Constants {String TOPIC_ORDER order; }消费者 消费者OrderListener Component Slf4j public class OrderListener {Autowiredprivate OrderThreadPoolOrderWorker, InterOrderDto orderThreadPool;KafkaListener(topics Constants.TOPIC_ORDER, groupId orderGroup, concurrency 3)public void logListener(ConsumerRecordString, String record) {log.debug( receive log event: {}-{}, record.partition(), record.value());try {OrderDto orderDto JSON.parseObject(record.value(), OrderDto.class);InterOrderDto interOrderDto new InterOrderDto();BeanUtils.copyProperties(orderDto, interOrderDto);interOrderDto.setPartition(record.partition() );orderThreadPool.dispatch(interOrderDto);} catch (Exception e) {log.error(# kafka log listener error: {}, record.value(), e);}}}线程池 OrderThreadPool /*** Date: 2024/1/24 10:23* 线程池实现** param W: worker* param D: message*/ Slf4j public class OrderThreadPoolW extends SingleThreadWorkerD, D extends OrderMessage {private ListW workers;private int size;public OrderThreadPool(int size, SupplierW provider) {this.size size;workers new ArrayList(size);for (int i 0; i size; i) {workers.add(provider.get());}if (CollectionUtils.isEmpty(workers)) {throw new RuntimeException(worker size is 0);}start();}/*** route message to single thread** param data*/public void dispatch(D data) {W w getUniqueQueue(data.getUniqueNo());w.offer(data);}private W getUniqueQueue(String uniqueNo) {int queueNo uniqueNo.hashCode() % size;for (W worker : workers) {if (queueNo worker.getQueueNo()) {return worker;}}throw new RuntimeException(worker 路由失败);}/*** start worker, only start once*/private void start() {for (W worker : workers) {new Thread(worker, OWorder- worker.getQueueNo()).start();}}/*** 关闭所有 workder, 等待所有任务执行完*/public void shutdown() {for (W worker : workers) {worker.shutdown();}}} 工作线程SingleThreadWorker, 内部使用阻塞队列使其串行化 /*** Date: 2024/1/24 10:58* single thread with a blocking-queue*/ Slf4j public abstract class SingleThreadWorkerT implements Runnable {private static AtomicInteger cnt new AtomicInteger(0);private BlockingQueueT queue;private boolean started true;/*** worker 唯一id*/Getterprivate int queueNo;public SingleThreadWorker(int size) {this.queue new LinkedBlockingQueue(size);this.queueNo cnt.getAndIncrement();log.info(init worker {}, this.queueNo);}/*** 提交消息** param data*/public void offer(T data) {try {queue.put(data);} catch (InterruptedException e) {log.info({} offer error: {}, Thread.currentThread().getName(), JSON.toJSONString(data), e);}}Overridepublic void run() {log.info({} worker start take , Thread.currentThread().getName());while (started) {try {T data queue.take();doConsumer(data);} catch (InterruptedException e) {log.error(queue take error, e);}}}/*** do real consume message** param data*/protected abstract void doConsumer(T data);/*** consume rest of message in the queue when thread-pool shutdown*/public void shutdown() {this.started false;ArrayListT rest new ArrayList();int i queue.drainTo(rest);if (i 0) {log.info({} has rest in queue {}, Thread.currentThread().getName(), i);for (T t : rest) {doConsumer(t);}}}} 工作线程实现OrderWorker, 这里就单独处理订单事件 /*** Date: 2024/1/24 13:42* 具体消费者*/ Slf4j public class OrderWorker extends SingleThreadWorkerInterOrderDto{public OrderWorker(int size) {super(size);}Overrideprotected void doConsumer(InterOrderDto data) {log.info({} consume msg: {}, Thread.currentThread().getName(), JSON.toJSONString(data));} }生产者 生产者OrderController, 模拟发送不同的事件类型的订单 RestController public class OrderController {Autowiredprivate KafkaTemplateString, String kafkaTemplate;GetMapping(/send)public String send() throws InterruptedException {int size 1000;for (int i 0; i size; i) {OrderDto orderDto new InterOrderDto();orderDto.setOrderNo(i );orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return success;}private String getStatus(int status){return status 0 ? 待支付 : status 1 ? 已支付 : 支付失败;} }application.properties 配置 # kafka地址 spring.kafka.bootstrap-servers192.168.x.x:9092 spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer启动类 Slf4j SpringBootApplication public class BootKafkaApplication {public static void main(String[] args) {SpringApplication.run(BootKafkaApplication.class, args);}/*** 配置线程池* return*/Beanpublic OrderThreadPoolOrderWorker, InterOrderDto orderThreadPool(){OrderThreadPoolOrderWorker, InterOrderDto threadPool new OrderThreadPool(3, () - new OrderWorker(100));Runtime.getRuntime().addShutdownHook(new Thread(() - {log.info(shutdown orderThreadPool);//容器关闭时让工作线程中的任务都被消费完threadPool.shutdown();}));return threadPool;}}测试 访问: http://localhost:8080/send 结果: OWorder-0 worker start take OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:待支付,timestamp:1706084482134,uniqueNo:0} OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:已支付,timestamp:1706084482271,uniqueNo:0} OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:支付失败,timestamp:1706084482282,uniqueNo:0} OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:待支付,timestamp:1706084482326,uniqueNo:3} OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:已支付,timestamp:1706084482336,uniqueNo:3} OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:支付失败,timestamp:1706084482347,uniqueNo:3} OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:待支付,timestamp:1706084482391,uniqueNo:6} OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:已支付,timestamp:1706084482401,uniqueNo:6} OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:支付失败,timestamp:1706084482412,uniqueNo:6}可以发现在我们工作线程中事件消费是有序的 good luck!
http://www.zqtcl.cn/news/183971/

相关文章:

  • 网站vi设计公司网站建设app
  • 书店网站建设策划书总结每天看七个广告赚40元的app
  • 做网站的属于什么专业成都广告制作安装公司
  • 天津市网站建设公司网站制作费用
  • 网站制作公司 郑州wordpress图片中文不显示解决
  • 网站建设模式有哪些方面jquery做的装修网站
  • 佛山手机建网站企业网站公司单位有哪些
  • 给企业做网站的平台有没有专门做衣服搭配的网站
  • 青岛本地网站最近军事新闻大事
  • 潍坊哪里有做360网站的成都官微最新发布
  • 还有哪些网站可以做淘宝活动企业建设网站的方式
  • 上海技术公司做网站2022引流人脉推广软件
  • 网站排名优化技术安徽省城乡和建设厅网站
  • 平阴县建设工程网站英文网站模板制作
  • 网站制作超链接怎么做厦门公司建站
  • 阿里云做的网站怎么备份建筑设计网站issuu
  • 网上做设计网站广西做网站找谁
  • 网站优化成本建设项目网站备案申请表
  • 做公众号首图网站上海短期网站建设培训
  • 网站开发最好佛山优化网站排名
  • 服务器搭建网站打不开网站建设信息平台
  • 宽屏蓝色企业网站源码如何编辑网站标题
  • 免费搭建手机网站广告公司怎么取名
  • 网站抓取超时错误c 高性能网站开发
  • 营销导向企业网站策划wordpress 不显示菜单
  • 特效视频网站用.net做视频网站的案例
  • 网站建设实用的网站视屏网站的审核是怎么做的
  • 网站模板之家免费下载福州网红餐厅
  • 西安网站设计与建设第三方检测机构
  • 手机网站推广法建设网站明细报价表