当前位置: 首页 > news >正文

wordpress+示例深圳哪个做网站好优化

wordpress+示例,深圳哪个做网站好优化,帝国cms 网站地址设置,免费做相册视频网站实现原理 通过topic区分不同的延迟时长#xff0c;每个topic对于一个延迟#xff0c;比如 topic100 仅存储延迟 100ms 的消息#xff0c;topic1000 仅存储延迟 1s 的消息#xff0c;依次类推。生产消息时#xff0c;消息需按延迟时长投递到对应的topic。消费消息时#x…实现原理 通过topic区分不同的延迟时长每个topic对于一个延迟比如 topic100 仅存储延迟 100ms 的消息topic1000 仅存储延迟 1s 的消息依次类推。生产消息时消息需按延迟时长投递到对应的topic。消费消息时检查消息的时间如果未到达延迟时长则sleep剩余的时长后再处理。这样就简单的实现了基于kafka的延迟队列。死信队列可作为一种特殊的延迟队列比如延迟 3600000ms 的处理。 消费者实现 package mainimport (contexttimegithub.com/IBM/saramagithub.com/sirupsen/logrus )// 定义每个topic对应的延迟时间(ms) var topicDelayConfig map[string]time.Duration{delay-100ms: 100 * time.Millisecond,delay-200ms: 200 * time.Millisecond,delay-500ms: 500 * time.Millisecond,delay-1000ms: 1000 * time.Millisecond, }type delayConsumerHandler struct {// 可以添加必要的依赖如业务处理器等 }func (h *delayConsumerHandler) Setup(sess sarama.ConsumerGroupSession) error {logrus.Info(延迟队列消费者初始化完成)return nil }func (h *delayConsumerHandler) Cleanup(sess sarama.ConsumerGroupSession) error {logrus.Info(延迟队列消费者清理完成)return nil }// ConsumeClaim 处理分区消息实现延迟逻辑 func (h *delayConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {topic : claim.Topic()delay, exists : topicDelayConfig[topic]if !exists {logrus.Errorf(topic %s 未配置延迟时间跳过消费, topic)// 标记所有消息为已消费避免重复处理for range claim.Messages() {sess.MarkMessage(msg, )}return nil}// 按顺序处理消息假设消息时间有序for msg : range claim.Messages() {// 检查会话是否已关闭如重平衡发生select {case -sess.Context().Done():logrus.Info(会话已关闭停止消费)return nildefault:}// 计算需要延迟的时间// 消息应该被处理的时间 消息产生时间 主题延迟时间produceTime : msg.TimestampprocessTime : produceTime.Add(delay)now : time.Now()// 如果当前时间未到处理时间计算需要休眠的时间if now.Before(processTime) {sleepDuration : processTime.Sub(now)logrus.Debugf(消息需要延迟处理topic%s, offset%d, 需等待 %v (产生时间: %v, 预计处理时间: %v),topic, msg.Offset, sleepDuration, produceTime, processTime,)// 休眠期间监听会话关闭信号避免阻塞重平衡select {case -sess.Context().Done():logrus.Info(休眠期间会话关闭停止消费)return nilcase -time.After(sleepDuration):// 休眠完成继续处理}}// 延迟时间已到处理消息h.processMessage(msg)// 标记消息为已消费sess.MarkMessage(msg, )}return nil }// 实际业务处理逻辑 func (h *delayConsumerHandler) processMessage(msg *sarama.ConsumerMessage) {logrus.Infof(处理延迟消息topic%s, partition%d, offset%d, key%s, value%s, 产生时间%v,msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp,)// 这里添加实际的业务处理代码 }// 初始化消费者示例 func newDelayConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {config : sarama.NewConfig()config.Version sarama.V2_8_1_0 // 指定Kafka版本config.Consumer.Return.Errors trueconfig.Consumer.Group.Rebalance.Strategy sarama.BalanceStrategyRange// 确保消息的Timestamp是创建时间需要Kafka broker配置支持config.Consumer.Fetch.Min 1config.Consumer.Fetch.Default 1024 * 1024return sarama.NewConsumerGroup(brokers, groupID, config) }func main() {brokers : []string{localhost:9092}groupID : delay-queue-grouptopics : []string{delay-100ms, delay-200ms, delay-500ms, delay-1000ms}consumer, err : newDelayConsumer(brokers, groupID)if err ! nil {logrus.Fatalf(创建消费者失败: %v, err)}defer consumer.Close()handler : delayConsumerHandler{}ctx : context.Background()// 持续消费for {if err : consumer.Consume(ctx, topics, handler); err ! nil {logrus.Errorf(消费出错: %v, err)// 简单重试逻辑time.Sleep(5 * time.Second)}} }生产者实现 package mainimport (errorstimegithub.com/IBM/saramagithub.com/sirupsen/logrus )// 定义允许的延迟时长毫秒及其对应的Topic var allowedDelays map[time.Duration]string{100 * time.Millisecond: delay-100ms,200 * time.Millisecond: delay-200ms,500 * time.Millisecond: delay-500ms,1000 * time.Millisecond: delay-1000ms,// 可根据需要添加更多允许的延迟时长 }// DelayProducer 延迟消息生产者 type DelayProducer struct {producer sarama.SyncProducer }// NewDelayProducer 创建延迟消息生产者实例 func NewDelayProducer(brokers []string) (*DelayProducer, error) {config : sarama.NewConfig()config.Version sarama.V2_8_1_0 // 匹配Kafka版本config.Producer.RequiredAcks sarama.WaitForAllconfig.Producer.Retry.Max 3config.Producer.Return.Successes trueproducer, err : sarama.NewSyncProducer(brokers, config)if err ! nil {return nil, err}return DelayProducer{producer: producer,}, nil }// SendDelayMessage 发送延迟消息 // 参数: // - key: 消息键 // - value: 消息内容 // - delay: 延迟时长 // 返回: // - 消息的分区和偏移量 // - 错误信息若延迟不合法或发送失败 func (p *DelayProducer) SendDelayMessage(key, value []byte, delay time.Duration) (partition int32, offset int64, err error) {// 1. 校验延迟时长是否合法topic, ok : allowedDelays[delay]if !ok {return 0, 0, errors.New(invalid delay duration, allowed values are: 100ms, 200ms, 500ms, 1000ms)}// 2. 创建消息设置当前时间为消息时间戳供消费者计算延迟msg : sarama.ProducerMessage{Topic: topic,Key: sarama.ByteEncoder(key),Value: sarama.ByteEncoder(value),Timestamp: time.Now(), // 记录消息发送时间用于消费者计算处理时间}// 3. 发送消息partition, offset, err p.producer.SendMessage(msg)if err ! nil {logrus.Errorf(发送延迟消息失败: %v, 延迟时长: %v, err, delay)return 0, 0, err}logrus.Infof(发送延迟消息成功, topic: %s, 分区: %d, 偏移量: %d, 延迟时长: %v,topic, partition, offset, delay)return partition, offset, nil }// Close 关闭生产者 func (p *DelayProducer) Close() error {return p.producer.Close() }// 使用示例 func main() {// 初始化生产者producer, err : NewDelayProducer([]string{localhost:9092})if err ! nil {logrus.Fatalf(初始化生产者失败: %v, err)}defer producer.Close()// 发送合法延迟消息_, _, err producer.SendDelayMessage([]byte(test-key),[]byte(这是一条延迟消息),100*time.Millisecond, // 合法延迟)if err ! nil {logrus.Error(发送消息失败:, err)}// 尝试发送非法延迟消息会被拒绝_, _, err producer.SendDelayMessage([]byte(test-key),[]byte(这是一条非法延迟消息),300*time.Millisecond, // 不允许的延迟)if err ! nil {logrus.Error(发送消息失败:, err) // 会输出非法延迟的错误} }
http://www.zqtcl.cn/news/674724/

相关文章:

  • 网站建设跟网站结构如何提高网站排名的方法
  • 网站模板 缓存商标网上开店创业计划书
  • 沧州网站建设微艾薇怎样给企业做网站
  • 如何做淘宝客的网站个人网站设计与制作代码
  • 信用门户网站建设观摩惠州专业做网站
  • wordpress打开网站前广告佛山百度推广seo服务
  • 松北建设局网站vps 用ip可以访问网站么
  • 网站图片内容免费开源crm
  • wordpress调用分类栏目wordpress文章优化
  • 建站公司上海企业官网模板下载
  • 网站建设推广话术wordpress 不显示缩略图
  • 企业电子商务网站建设和一般百拓公司做网站怎么样
  • 吉林网站建设司上海什么做网站的公司比较好
  • 吉安市建设规划局网站jsp wordpress
  • 建设银行贵金属网站微信小程序注册后怎么使用
  • 如何做律师网站河南建网站 优帮云
  • 云阳如何做网站网站建设旅游
  • 推荐一个简单的网站制作单位网站服务的建设及维护
  • tp5网站文档归档怎么做网站 信用卡支付接口
  • phpcms 企业网站网站建设中单页代码
  • 坑梓网站建设方案网络编程技术及应用
  • 电子商务网站建设 价格新媒体运营需要具备哪些能力
  • 做生存分析的网站电商网站运营建设的目标
  • 佛山 做网站邮箱官方网站注册
  • 生成flash的网站源码表白二维码制作网站
  • 定做专业营销型网站网站开发应用
  • 万盛建设局官方网站如何用群晖nas做网站
  • 建设装饰网站郑州惠济区建设局网站
  • 网站做标题有用吗网站优化多少钱
  • 婚庆设备租赁网站源码如何进行网站的建设和维护