当前位置: 首页 > news >正文

wordpress 显示子分类扬州抖音seo

wordpress 显示子分类,扬州抖音seo,溧水建设局网站,模板网站建设咨询文章目录 一、简介二、生产者三、消费者 代码地址#xff1a;https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go 一、简介 之前已经介绍过一个操作kafka的go库了#xff0c;28.windows安装kafka#xff0c;Go操作kafka示例#xff08;sarama库#xf… 文章目录 一、简介二、生产者三、消费者 代码地址https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go 一、简介 之前已经介绍过一个操作kafka的go库了28.windows安装kafkaGo操作kafka示例sarama库 但是这个库比较老了当前比较流行的库是github.com/segmentio/kafka-go所以本次我们就使用一下它。 我们在GitHub直接输入kafka并带上language标签为Go时可以可以看到当前get github.com/segmentio/kafka-go库是最流行的。 首先启动kafka的服务器然后在项目中go get github.com/segmentio/kafka-go 接着我们就可以创建生产者和消费者了注意在实际工作中一般是一个服务为生产者另一个服务作为消费者但是本案例中不涉及微服务就是演示一下生成和消费的示例代码因此写到了一个服务当中。代码文件组织如下 user.go 用于测试发送和消费结构体字符串消息 package modeltype User struct {Id int64 json:idUserName string json:user_nameAge int64 json:age } 二、生产者 启动zookeeper和kafka并创建名为test的topic步骤可以参考28.windows安装kafkaGo操作kafka示例sarama库 producer.go package producerimport (contextencoding/jsonfmtgolang-trick/31-kafka-go/modeltimegithub.com/segmentio/kafka-go )var (topic userProducer *kafka.Writer )func init() {Producer kafka.Writer{Addr: kafka.TCP(localhost:9092), //TCP函数参数为不定长参数可以传多个地址组成集群Topic: topic,Balancer: kafka.Hash{}, // 用于对key进行hash决定消息发送到哪个分区MaxAttempts: 0,WriteBackoffMin: 0,WriteBackoffMax: 0,BatchSize: 0,BatchBytes: 0,BatchTimeout: 0,ReadTimeout: 0,WriteTimeout: time.Second, // kafka有时候可能负载很高写不进去那么超时后可以放弃写入用于可以丢消息的场景RequiredAcks: kafka.RequireNone, // 不需要任何节点确认就返回Async: false,Completion: nil,Compression: 0,Logger: nil,ErrorLogger: nil,Transport: nil,AllowAutoTopicCreation: false, // 第一次发消息的时候如果topic不存在就自动创建topic工作中禁止使用} }// 生产消息,发送user信息 func SendMessage(ctx context.Context, user *model.User) {msgContent, err : json.Marshal(user)if err ! nil {fmt.Println(fmt.Sprintf(json marshal user erruser:%v,err:%v, user, err))}msg : kafka.Message{Topic: ,Partition: 0,Offset: 0,HighWaterMark: 0,Key: []byte(fmt.Sprintf(%d, user.Id)),Value: msgContent,Headers: nil,WriterData: nil,Time: time.Time{},}err Producer.WriteMessages(ctx, msg)if err ! nil {fmt.Println(fmt.Sprintf(写入kafka失败user:%v,err:%v, user, err))} } main.go: 测试消息发送 package mainimport (contextfmtgolang-trick/31-kafka-go/modelgolang-trick/31-kafka-go/producer )func main() {ctx : context.Background()for i : 0; i 5; i {user : model.User{Id: int64(i 1),UserName: fmt.Sprintf(lym:%d, i),Age: 18,}producer.SendMessage(ctx, user)}producer.Producer.Close() // 消息发送完毕后关闭生产者 } 可以看到五条消息都发送成功 三、消费者 consumer.go package consumerimport (contextencoding/jsonfmtgolang-trick/24-gin-learning/class08/modeltimegithub.com/segmentio/kafka-go )var (topic userConsumer *kafka.Reader )func init() {Consumer kafka.NewReader(kafka.ReaderConfig{Brokers: []string{localhost:9092}, // broker地址 数组GroupID: test, // 消费者组id每个消费者组可以消费kafka的完整数据但是同一个消费者组中的消费者根据设置的分区消费策略共同消费kafka中的数据GroupTopics: nil,Topic: topic, // 消费哪个topicPartition: 0,Dialer: nil,QueueCapacity: 0,MinBytes: 0,MaxBytes: 0,MaxWait: 0,ReadBatchTimeout: 0,ReadLagInterval: 0,GroupBalancers: nil,HeartbeatInterval: 0,CommitInterval: time.Second, // offset 上报间隔PartitionWatchInterval: 0,WatchPartitionChanges: false,SessionTimeout: 0,RebalanceTimeout: 0,JoinGroupBackoff: 0,RetentionTime: 0,StartOffset: kafka.FirstOffset, // 仅对新创建的消费者组生效从头开始消费工作中可能更常用从最新的开始消费kafka.LastOffsetReadBackoffMin: 0,ReadBackoffMax: 0,Logger: nil,ErrorLogger: nil,IsolationLevel: 0,MaxAttempts: 0,OffsetOutOfRangeError: false,}) }// 消费消息 func ReadMessage(ctx context.Context) {// 消费者应该通过协程一直开着一直消费for {if msg, err : Consumer.ReadMessage(ctx); err ! nil {fmt.Println(fmt.Sprintf(读kafka失败err:%v, err))break // 当前消息读取失败时并不退出for终止所有后续消费而是跳过该消息即可} else {user : model.User{}err : json.Unmarshal(msg.Value, user)if err ! nil {fmt.Println(fmt.Sprintf(json unmarshal msg value errmsg:%v,err:%v, user, err))break // 当前消息处理失败时并不退出for终止所有后续消费而是跳过该消息即可}fmt.Println(fmt.Sprintf(topic%s,partition%d,offset%d,key%s,user%v, msg.Topic, msg.Partition, msg.Offset, msg.Key, user))}} } main.go: 测试接收消息 package mainimport (contextfmtgolang-trick/31-kafka-go/consumerosos/signalsyscall )// 需要监听信息2和15在程序退出时关闭Consumer func listenSignal() {c : make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)sig : -cfmt.Printf(收到信号 %s , sig.String())if consumer.Consumer ! nil {consumer.Consumer.Close()}os.Exit(0) }func main() {ctx : context.Background()//for i : 0; i 5; i {// user : model.User{// Id: int64(i 1),// UserName: fmt.Sprintf(lym:%d, i),// Age: 18,// }// producer.SendMessage(ctx, user)//}//producer.Producer.Close()go consumer.ReadMessage(ctx)listenSignal() } 启动后因为我们设置的从头开始消费所以原有的五条消息消费成功然后在等待着队列中有消息时继续消费 我们可以通过kafka客户端发两条消息看看我们的消费者程序是否能消费到 最后关闭服务停止消费
http://www.zqtcl.cn/news/152058/

相关文章:

  • 深圳网站设计廊坊公司深圳ui设计培训班
  • 为什么网站需要维护帮人推广注册app的平台
  • 网站开发岗位要求服务好的做培训网站
  • 宁波制作网站企业有哪些学网页设计需要什么学历
  • 网站建设公司墨子网络百度域名续费
  • 琪觅公司网站开发中文网页开发工具
  • 教育网站制作设计成都网络营销公司
  • 怎么查看一个网站页面的seo优化情况网站建站建设首选上海黔文信息科技有限公司2
  • 威海网站建设价格深圳优美网络科技有限公司
  • 做网站用什么系统建设网站投资多少
  • 凡科建站官网 网络服务抚顺 网站建设
  • 学校网站的建设方案西安企业seo外包服务公司
  • 建设租车网站深圳ww
  • 推广网络网站潜江资讯网一手机版
  • 凡科网站自己如何做毕设 做网站
  • 一起做网站逛市场百度权重查询网站
  • 专业网站优化推广网站核查怎么抽查
  • 牡丹江站salong wordpress
  • 网站建设公司做网站要多少费用有哪些外国网站国内可以登录的
  • 天津建站平台网页制作免费的素材网站
  • 建设网站需要专业哪个企业提供电子商务网站建设外包
  • 公司网站建设及维护网站建设思维
  • 那个网站可以学做西餐17做网站广州沙河
  • 品牌网站建设哪里好京东网站建设案例
  • 亚马逊海外版网站深圳市工商注册信息查询网站
  • 新乐做网站优化网站上漂亮的甘特图是怎么做的
  • 新网站应该怎么做seo品牌推广方案思维导图
  • 想要网站导航推广页浅谈中兴电子商务网站建设
  • 免费引流在线推广成都网站优化费用
  • 老河口市网站佛山市点精网络科技有限公司