邯郸网站设计培训,网站入口首页,巨量算数,网页设计公司概念kafka-顺序消息实现
场景
在购物付款的时候#xff0c;订单会有不同的订单状态#xff0c;对应不同的状态事件#xff0c;比如#xff1a;待支付#xff0c;支付成功#xff0c;支付失败等等#xff0c;我们会将这些消息推送给消息队列 #xff0c;后续的服务会根据订…kafka-顺序消息实现
场景
在购物付款的时候订单会有不同的订单状态对应不同的状态事件比如待支付支付成功支付失败等等我们会将这些消息推送给消息队列 后续的服务会根据订单状态进行不同的业务处理这就要求订单状态推送就要有状态的保证
解决方案
生产者将相同的key的订单状态事件推送到kafka的同一分区kafka 消费者接收消息消费者将消息提交给线程池线程池根据接收到的消息将订单状态事件使用路由策略选择其中一个线程将具有相同路由key的事件发送到同一个线程的阻塞队列中单个线程不停的从阻塞队列获取订单状态消息消费 代码实现
引入依赖
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.2.2/versionrelativePath/ !-- lookup parent from repository --
/parent
groupIdcom.example/groupId
artifactIdboot-kafka/artifactId
version0.0.1-SNAPSHOT/version
nameboot-kafka/name
descriptionboot-kafka/description
propertiesjava.version17/java.version
/properties
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion2.0.39/version/dependency
/dependencies
使用到的DTO
Data
public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;Overridepublic String getUniqueNo() {return getOrderNo();}
}Data
public class InterOrderDto extends OrderDto implements OrderMessage{/*** 属于哪个分区*/private String partition;Overridepublic String getUniqueNo() {return getOrderNo();}
}public interface OrderMessage {/*** 线程池路由key* return*/String getUniqueNo();}定义topic
这里是 3个分区2个副本
Configuration
public class KafkaConfiguration {Beanpublic NewTopic topic(){return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);}
}public interface Constants {String TOPIC_ORDER order;
}消费者
消费者OrderListener
Component
Slf4j
public class OrderListener {Autowiredprivate OrderThreadPoolOrderWorker, InterOrderDto orderThreadPool;KafkaListener(topics Constants.TOPIC_ORDER, groupId orderGroup, concurrency 3)public void logListener(ConsumerRecordString, String record) {log.debug( receive log event: {}-{}, record.partition(), record.value());try {OrderDto orderDto JSON.parseObject(record.value(), OrderDto.class);InterOrderDto interOrderDto new InterOrderDto();BeanUtils.copyProperties(orderDto, interOrderDto);interOrderDto.setPartition(record.partition() );orderThreadPool.dispatch(interOrderDto);} catch (Exception e) {log.error(# kafka log listener error: {}, record.value(), e);}}}线程池 OrderThreadPool
/*** Date: 2024/1/24 10:23* 线程池实现** param W: worker* param D: message*/
Slf4j
public class OrderThreadPoolW extends SingleThreadWorkerD, D extends OrderMessage {private ListW workers;private int size;public OrderThreadPool(int size, SupplierW provider) {this.size size;workers new ArrayList(size);for (int i 0; i size; i) {workers.add(provider.get());}if (CollectionUtils.isEmpty(workers)) {throw new RuntimeException(worker size is 0);}start();}/*** route message to single thread** param data*/public void dispatch(D data) {W w getUniqueQueue(data.getUniqueNo());w.offer(data);}private W getUniqueQueue(String uniqueNo) {int queueNo uniqueNo.hashCode() % size;for (W worker : workers) {if (queueNo worker.getQueueNo()) {return worker;}}throw new RuntimeException(worker 路由失败);}/*** start worker, only start once*/private void start() {for (W worker : workers) {new Thread(worker, OWorder- worker.getQueueNo()).start();}}/*** 关闭所有 workder, 等待所有任务执行完*/public void shutdown() {for (W worker : workers) {worker.shutdown();}}}
工作线程SingleThreadWorker, 内部使用阻塞队列使其串行化
/*** Date: 2024/1/24 10:58* single thread with a blocking-queue*/
Slf4j
public abstract class SingleThreadWorkerT implements Runnable {private static AtomicInteger cnt new AtomicInteger(0);private BlockingQueueT queue;private boolean started true;/*** worker 唯一id*/Getterprivate int queueNo;public SingleThreadWorker(int size) {this.queue new LinkedBlockingQueue(size);this.queueNo cnt.getAndIncrement();log.info(init worker {}, this.queueNo);}/*** 提交消息** param data*/public void offer(T data) {try {queue.put(data);} catch (InterruptedException e) {log.info({} offer error: {}, Thread.currentThread().getName(), JSON.toJSONString(data), e);}}Overridepublic void run() {log.info({} worker start take , Thread.currentThread().getName());while (started) {try {T data queue.take();doConsumer(data);} catch (InterruptedException e) {log.error(queue take error, e);}}}/*** do real consume message** param data*/protected abstract void doConsumer(T data);/*** consume rest of message in the queue when thread-pool shutdown*/public void shutdown() {this.started false;ArrayListT rest new ArrayList();int i queue.drainTo(rest);if (i 0) {log.info({} has rest in queue {}, Thread.currentThread().getName(), i);for (T t : rest) {doConsumer(t);}}}}
工作线程实现OrderWorker, 这里就单独处理订单事件
/*** Date: 2024/1/24 13:42* 具体消费者*/
Slf4j
public class OrderWorker extends SingleThreadWorkerInterOrderDto{public OrderWorker(int size) {super(size);}Overrideprotected void doConsumer(InterOrderDto data) {log.info({} consume msg: {}, Thread.currentThread().getName(), JSON.toJSONString(data));}
}生产者
生产者OrderController, 模拟发送不同的事件类型的订单
RestController
public class OrderController {Autowiredprivate KafkaTemplateString, String kafkaTemplate;GetMapping(/send)public String send() throws InterruptedException {int size 1000;for (int i 0; i size; i) {OrderDto orderDto new InterOrderDto();orderDto.setOrderNo(i );orderDto.setPayStatus(getStatus(0));orderDto.setTimestamp(System.currentTimeMillis());//相同的key发送到相同的分区kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(1));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));TimeUnit.MILLISECONDS.sleep(10);orderDto.setPayStatus(getStatus(2));orderDto.setTimestamp(System.currentTimeMillis());kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));}return success;}private String getStatus(int status){return status 0 ? 待支付 : status 1 ? 已支付 : 支付失败;}
}application.properties 配置
# kafka地址
spring.kafka.bootstrap-servers192.168.x.x:9092
spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer启动类
Slf4j
SpringBootApplication
public class BootKafkaApplication {public static void main(String[] args) {SpringApplication.run(BootKafkaApplication.class, args);}/*** 配置线程池* return*/Beanpublic OrderThreadPoolOrderWorker, InterOrderDto orderThreadPool(){OrderThreadPoolOrderWorker, InterOrderDto threadPool new OrderThreadPool(3, () - new OrderWorker(100));Runtime.getRuntime().addShutdownHook(new Thread(() - {log.info(shutdown orderThreadPool);//容器关闭时让工作线程中的任务都被消费完threadPool.shutdown();}));return threadPool;}}测试
访问: http://localhost:8080/send 结果:
OWorder-0 worker start take
OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:待支付,timestamp:1706084482134,uniqueNo:0}
OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:已支付,timestamp:1706084482271,uniqueNo:0}
OWorder-0 consume msg: {orderNo:0,partition:2,payStatus:支付失败,timestamp:1706084482282,uniqueNo:0}
OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:待支付,timestamp:1706084482326,uniqueNo:3}
OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:已支付,timestamp:1706084482336,uniqueNo:3}
OWorder-0 consume msg: {orderNo:3,partition:2,payStatus:支付失败,timestamp:1706084482347,uniqueNo:3}
OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:待支付,timestamp:1706084482391,uniqueNo:6}
OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:已支付,timestamp:1706084482401,uniqueNo:6}
OWorder-0 consume msg: {orderNo:6,partition:1,payStatus:支付失败,timestamp:1706084482412,uniqueNo:6}可以发现在我们工作线程中事件消费是有序的
good luck!