wordpress+示例,深圳哪个做网站好优化,帝国cms 网站地址设置,免费做相册视频网站实现原理
通过topic区分不同的延迟时长#xff0c;每个topic对于一个延迟#xff0c;比如 topic100 仅存储延迟 100ms 的消息#xff0c;topic1000 仅存储延迟 1s 的消息#xff0c;依次类推。生产消息时#xff0c;消息需按延迟时长投递到对应的topic。消费消息时#x…实现原理
通过topic区分不同的延迟时长每个topic对于一个延迟比如 topic100 仅存储延迟 100ms 的消息topic1000 仅存储延迟 1s 的消息依次类推。生产消息时消息需按延迟时长投递到对应的topic。消费消息时检查消息的时间如果未到达延迟时长则sleep剩余的时长后再处理。这样就简单的实现了基于kafka的延迟队列。死信队列可作为一种特殊的延迟队列比如延迟 3600000ms 的处理。
消费者实现
package mainimport (contexttimegithub.com/IBM/saramagithub.com/sirupsen/logrus
)// 定义每个topic对应的延迟时间(ms)
var topicDelayConfig map[string]time.Duration{delay-100ms: 100 * time.Millisecond,delay-200ms: 200 * time.Millisecond,delay-500ms: 500 * time.Millisecond,delay-1000ms: 1000 * time.Millisecond,
}type delayConsumerHandler struct {// 可以添加必要的依赖如业务处理器等
}func (h *delayConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error {logrus.Info(延迟队列消费者初始化完成)return nil
}func (h *delayConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {logrus.Info(延迟队列消费者清理完成)return nil
}// ConsumeClaim 处理分区消息实现延迟逻辑
func (h *delayConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {topic : claim.Topic()delay, exists : topicDelayConfig[topic]if !exists {logrus.Errorf(topic %s 未配置延迟时间跳过消费, topic)// 标记所有消息为已消费避免重复处理for range claim.Messages() {sess.MarkMessage(msg, )}return nil}// 按顺序处理消息假设消息时间有序for msg : range claim.Messages() {// 检查会话是否已关闭如重平衡发生select {case -sess.Context().Done():logrus.Info(会话已关闭停止消费)return nildefault:}// 计算需要延迟的时间// 消息应该被处理的时间 消息产生时间 主题延迟时间produceTime : msg.TimestampprocessTime : produceTime.Add(delay)now : time.Now()// 如果当前时间未到处理时间计算需要休眠的时间if now.Before(processTime) {sleepDuration : processTime.Sub(now)logrus.Debugf(消息需要延迟处理topic%s, offset%d, 需等待 %v (产生时间: %v, 预计处理时间: %v),topic, msg.Offset, sleepDuration, produceTime, processTime,)// 休眠期间监听会话关闭信号避免阻塞重平衡select {case -sess.Context().Done():logrus.Info(休眠期间会话关闭停止消费)return nilcase -time.After(sleepDuration):// 休眠完成继续处理}}// 延迟时间已到处理消息h.processMessage(msg)// 标记消息为已消费sess.MarkMessage(msg, )}return nil
}// 实际业务处理逻辑
func (h *delayConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {logrus.Infof(处理延迟消息topic%s, partition%d, offset%d, key%s, value%s, 产生时间%v,msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp,)// 这里添加实际的业务处理代码
}// 初始化消费者示例
func newDelayConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {config : sarama.NewConfig()config.Version sarama.V2_8_1_0 // 指定Kafka版本config.Consumer.Return.Errors trueconfig.Consumer.Group.Rebalance.Strategy sarama.BalanceStrategyRange// 确保消息的Timestamp是创建时间需要Kafka broker配置支持config.Consumer.Fetch.Min 1config.Consumer.Fetch.Default 1024 * 1024return sarama.NewConsumerGroup(brokers, groupID, config)
}func main() {brokers : []string{localhost:9092}groupID : delay-queue-grouptopics : []string{delay-100ms, delay-200ms, delay-500ms, delay-1000ms}consumer, err : newDelayConsumer(brokers, groupID)if err ! nil {logrus.Fatalf(创建消费者失败: %v, err)}defer consumer.Close()handler : delayConsumerHandler{}ctx : context.Background()// 持续消费for {if err : consumer.Consume(ctx, topics, handler); err ! nil {logrus.Errorf(消费出错: %v, err)// 简单重试逻辑time.Sleep(5 * time.Second)}}
}生产者实现
package mainimport (errorstimegithub.com/IBM/saramagithub.com/sirupsen/logrus
)// 定义允许的延迟时长毫秒及其对应的Topic
var allowedDelays map[time.Duration]string{100 * time.Millisecond: delay-100ms,200 * time.Millisecond: delay-200ms,500 * time.Millisecond: delay-500ms,1000 * time.Millisecond: delay-1000ms,// 可根据需要添加更多允许的延迟时长
}// DelayProducer 延迟消息生产者
type DelayProducer struct {producer sarama.SyncProducer
}// NewDelayProducer 创建延迟消息生产者实例
func NewDelayProducer(brokers []string) (*DelayProducer, error) {config : sarama.NewConfig()config.Version sarama.V2_8_1_0 // 匹配Kafka版本config.Producer.RequiredAcks sarama.WaitForAllconfig.Producer.Retry.Max 3config.Producer.Return.Successes trueproducer, err : sarama.NewSyncProducer(brokers, config)if err ! nil {return nil, err}return DelayProducer{producer: producer,}, nil
}// SendDelayMessage 发送延迟消息
// 参数:
// - key: 消息键
// - value: 消息内容
// - delay: 延迟时长
// 返回:
// - 消息的分区和偏移量
// - 错误信息若延迟不合法或发送失败
func (p *DelayProducer) SendDelayMessage(key, value []byte, delay time.Duration) (partition int32, offset int64, err error) {// 1. 校验延迟时长是否合法topic, ok : allowedDelays[delay]if !ok {return 0, 0, errors.New(invalid delay duration, allowed values are: 100ms, 200ms, 500ms, 1000ms)}// 2. 创建消息设置当前时间为消息时间戳供消费者计算延迟msg : sarama.ProducerMessage{Topic: topic,Key: sarama.ByteEncoder(key),Value: sarama.ByteEncoder(value),Timestamp: time.Now(), // 记录消息发送时间用于消费者计算处理时间}// 3. 发送消息partition, offset, err p.producer.SendMessage(msg)if err ! nil {logrus.Errorf(发送延迟消息失败: %v, 延迟时长: %v, err, delay)return 0, 0, err}logrus.Infof(发送延迟消息成功, topic: %s, 分区: %d, 偏移量: %d, 延迟时长: %v,topic, partition, offset, delay)return partition, offset, nil
}// Close 关闭生产者
func (p *DelayProducer) Close() error {return p.producer.Close()
}// 使用示例
func main() {// 初始化生产者producer, err : NewDelayProducer([]string{localhost:9092})if err ! nil {logrus.Fatalf(初始化生产者失败: %v, err)}defer producer.Close()// 发送合法延迟消息_, _, err producer.SendDelayMessage([]byte(test-key),[]byte(这是一条延迟消息),100*time.Millisecond, // 合法延迟)if err ! nil {logrus.Error(发送消息失败:, err)}// 尝试发送非法延迟消息会被拒绝_, _, err producer.SendDelayMessage([]byte(test-key),[]byte(这是一条非法延迟消息),300*time.Millisecond, // 不允许的延迟)if err ! nil {logrus.Error(发送消息失败:, err) // 会输出非法延迟的错误}
}