外贸网站发外链,无锡网站推广,品牌建设论文怎么写,如何做网站网页流程Kafka Go客户端
在Go中里面有三个比较有名气的Go客户端。
Sarama:用户数量最多#xff0c;早期这个项目是在Shopify下面#xff0c;现在挪到了IBM下。segmentio/kafka-go:没啥大的缺点。confluent-kafka-go#xff1a;需要启用cgo,跨平台问题比较多#xff0c;交叉编译也…Kafka Go客户端
在Go中里面有三个比较有名气的Go客户端。
Sarama:用户数量最多早期这个项目是在Shopify下面现在挪到了IBM下。segmentio/kafka-go:没啥大的缺点。confluent-kafka-go需要启用cgo,跨平台问题比较多交叉编译也不支持。
Sarama 使用入门tools
IBM/sarama: Sarama is a Go library for Apache Kafka.
在 Sarama 里面提供了一些简单的命令行工具,可以看做是 Shell脚本提供的功能一个子集。
Consumer和 producer中的用得比较多 1.设置 Go 代理如果内网无法直连 proxy.golang.org
export GOPROXYhttps://goproxy.cn,direct
export GOSUMDBsum.golang.google.cn2.在虚拟机上执行安装命令
go install github.com/IBM/sar ama/tools/kafka-console-consumerlatest go install github.com/lBM/sarama/tools/kafka-console-producerlatest
3.把可执行文件所在目录加到 PATH如果还没加
export PATH$PATH:$(go env GOBIN)4.确认可执行文件在哪里
# 查看 GOBIN如果你没显式设置就会是空
go env GOBIN# 查看 GOPATH默认是 $HOME/go对于 root 用户就是 /root/go
go env GOPATH#我的是/home/cxz/go/lib:/home/cxz/go/work5.查看安装结果
ls /home/cxz/go/lib/bin
#应该能够看到kafka-console-consumer kafka-console-producer6.临时生效
export PATH$PATH:/home/cxz/go/lib/bin# 然后验证
which kafka-console-consumer
# 应该输出 /home/cxz/go/lib/bin/kafka-console-consumer7.永久生效
echo export PATH$PATH:/home/cxz/go/lib/bin ~/.bashrc
# 或者如果你用的是 zsh
# echo export PATH$PATH:/home/cxz/go/lib/bin ~/.zshrc# 然后重新加载配置
source ~/.bashrcSarama 使用入门发送消息
虚拟机上执行
kafka-console-consumer -topictest_topic -brokers192.168.24.101:9094Goland上执行
package mainimport (github.com/IBM/saramagithub.com/stretchr/testify/asserttesting
)var addrs []string{192.168.24.101:9094}func TestSyncProducer(t *testing.T) {//创建一个 Sarama 的配置对象。cfg : sarama.NewConfig()//表示生产者要等待 Kafka 确认消息成功写入后再返回同步模式。如果不设置这个SyncProducer.SendMessage 会一直失败。cfg.Producer.Return.Successes true //同步的Producer一定要设置//创建一个同步的生产者实例producer, err : sarama.NewSyncProducer(addrs, cfg)assert.NoError(t, err)//构建消息并发送_, _, err producer.SendMessage(sarama.ProducerMessage{Topic: test_topic,//消息数据本体Value: sarama.StringEncoder(hello world ,这是一条使用kafka的消息),//会在生产者和消费者之间传递消息头可传递自定义键值对比如 trace_id 用于链路追踪。Headers: []sarama.RecordHeader{{Key: []byte(trace_id),Value: []byte(123456),},},//只作用于发送过程。元信息在发送过程中使用可以用来传递额外信息发送完成后会原样返回不会传给消费者。Metadata: 这是metadata,})assert.NoError(t, err)
}10.执行结果
Partition: 0
Offset: 0
Key:
Value: hello world ,这是一条使用kafka的消息使用控制台工具连接Kafka
Sarama 使用入门指定分区
可以注意到,前面所有的消息都被发送到了 Partition 0 上面。
正常来说,在 Sarama 里面,可以通过指定 config 中的Partitioner来指定最终的目标分区。
常见的方法:
Random:随机挑一个。 RoundRobin:轮询。 Hash(默认):根据 key 的哈希值来筛选一个。 Manual: 根据 Message 中的 partition 字段来选择。 ConsistentCRC:一致性哈希用的是 CRC32 算法。 Custom:实际上不 Custom,而是自定义一部分Hash 的参数,本质上是一个 Hash 的实现。
//默认HashPartitioner 适合 按用户 ID、订单 ID 等字段分区场景
cfg.Producer.Partitioner sarama.NewHashPartitioner
//使用 CRC32 算法 计算 Key 的哈希。 适合 需要高一致性分布的业务例如日志收集系统
cfg.Producer.Partitioner sarama.NewConsistentCRCHashPartitioner
//忽略 Key每条消息随机分配 partition。 适合 普通消息队列、广播类场景。
cfg.Producer.Partitioner sarama.NewRandomPartitioner
//需要手动指定 partitionProducerMessage.Partition 字段。适合 明确知道要写哪个 partition例如做数据分流
cfg.Producer.Partitioner sarama.NewManualPartitioner
//用于实现你自己的 Partitioner 一般不推荐使用这个空参函数它会 panic应实现完整接口。
cfg.Producer.Partitioner sarama.NewCustomPartitioner()
//允许你使用自定义哈希函数来做 key 分区。 适合 有特定哈希策略需求时例如分布要尽可能均匀。
cfg.Producer.Partitioner sarama.NewCustomHashPartitioner(func() hash.Hash32 {})Topic: test_topic,
//分区依据
Key: sarama.StringEncoder(user_123), // 这里是分区依据
//消息数据本体
Value: sarama.StringEncoder(hello world ,这是一条使用kafka的消息),最典型的场景就是利用Partitioner来保证同一个业务的消息一定发送到同一个分区上从而保证业 有序。
Sarama 使用入门异步发送
Sarama有一个异步发送的producer它的用法稍微复杂一点。
把Return.Success和 Errors都设置为true这是为了后面能够拿到发送结果。 初始化异步producer。 从producer里面拿到Input的channel,并且发送 一条消息。 利用select case同时**监听Success和Error两个channel,**来获得发送成功与否的信息。
func TestAsyncProducer(t *testing.T) {cfg : sarama.NewConfig()//怎么知道发送是否成功cfg.Producer.Return.Errors truecfg.Producer.Return.Successes trueproducer, err : sarama.NewAsyncProducer(addrs, cfg)require.NoError(t, err)messages : producer.Input()go func() {for {messages - sarama.ProducerMessage{Topic: test_topic,//分区依据Key: sarama.StringEncoder(user_123), // 这里是分区依据//消息数据本体Value: sarama.StringEncoder(hello world ,这是一条使用kafka的消息),//会在生产者和消费者之间传递Headers: []sarama.RecordHeader{{Key: []byte(trace_id),Value: []byte(123456),},},//只作用于发送过程Metadata: 这是metadata,}}}()errCh : producer.Errors()succCh : producer.Successes()for {//两个都不满足就会阻塞select {case err : -errCh:t.Log(发送出了问题, err.Err)case -succCh:t.Log(发送成功)}}
}
Sarama 使用入门acks
在Kafka里面生产者在发送数据的时候有一个很关键的参数,就是 acks。 有三个取值
0客户端发一次不需要服务端的确认。 1:客户端发送并且需要服务端写入到主分区。 -1客户端发送并且需要服务端同步到所有的ISR 上。
从上到下性能变差但是数据可靠性上升。需要性能选 0需要消息不丢失选-1。
理解acks你就要抓住核心点谁ack才算数
0TCP协议返回了ack就可以。1主分区确认写入了就可以。-1所有的ISR都确认了就可以。 ISR In Sync Replicas用通俗易懂的话来说,就是跟上了节奏的从分区。
什么叫做跟上了节奏就是它和主分区保持了数据同步。
所以当消息被同步到从分区之后如果主分区崩溃了那么依旧可以保证在从分区上还有数据。 sarama 使用入门启动消费者
Sarama的消费者设计不是很直观稍微有点复杂。
首先要初始化一个ConsumerGroup。 调用ConsumerGroup上的Consume方法。 为 Consume 方法传入一个 ConsumerGroupHandler的辅助方法。
package mainimport (contextgithub.com/IBM/saramagithub.com/stretchr/testify/assertlogtesting
)func TestConsumer(t *testing.T) {cfg : sarama.NewConfig()//正常来说一个消费者都是归属一个消费者组的//消费者就是你的业务consumerGroup, err : sarama.NewConsumerGroup(addrs, test_group, cfg)assert.NoError(t, err)err consumerGroup.Consume(context.Background(), []string{test_topic}, testConsumerGroupHandler{})//你消费结束就会到这里t.Log(err)
}type testConsumerGroupHandler struct {
}func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {log.Println(Setup session:, session)return nil
}func (t testConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {log.Println(Cleanup session:, session)return nil
}func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的会话从建立连接到连接彻底断掉的那一段时间session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {msgs : claim.Messages()for msg : range msgs {//var bizMsg MyBizMsg//err : json.Unmarshal(msg.Value, bizMsg)//if err ! nil {// //这就是消费消息出错// //大多数时候就是重试// //记录日志// continue//}log.Println(string(msg.Value))session.MarkMessage(msg, )}//什么情况下会到这里//msg被人关了也就是要退出消费逻辑return nil
}type MyBizMsg struct {Name string
}
sarama 使用入门ConsumerGroupHandler
下面的代码就是对ConsumerGroupHandler的实现关键就是在消费了msg之后如果消费成功了要记得提交。
也就是调用MarkMessage方法。
至于 Setup 和 Cleanup 方法反而用得不多。
type testConsumerGroupHandler struct {
}func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {log.Println(Setup session:, session)return nil
}func (t testConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {log.Println(Cleanup session:, session)return nil
}func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的会话从建立连接到连接彻底断掉的那一段时间session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {msgs : claim.Messages()for msg : range msgs {//var bizMsg MyBizMsg//err : json.Unmarshal(msg.Value, bizMsg)//if err ! nil {// //这就是消费消息出错// //大多数时候就是重试// //记录日志// continue//}log.Println(string(msg.Value))session.MarkMessage(msg, )}//什么情况下会到这里//msg被人关了也就是要退出消费逻辑return nil
}sarama 使用入门利用context来控制消费者退出
可以利用初始化ConsumerGroup 时候传入的ctx来控制消费者组退出消息。
下图中我传入了一个超时的context,那么 start : time.Now()//这里是测试我们就控制消费10sctx, cancel : context.WithTimeout(context.Background(), 10*time.Second)defer cancel()//开始消费会在这里阻塞住err consumerGroup.Consume(ctx, []string{test_topic}, testConsumerGroupHandler{})//你消费结束就会到这里t.Log(err, time.Since(start).String()) 下图中我主动调用了cancel,那么 start : time.Now()//这里是测试我们就控制消费5sctx, cancel : context.WithCancel(context.Background())time.AfterFunc(time.Second*5, func() {cancel()})//开始消费会在这里阻塞住err consumerGroup.Consume(ctx, []string{test_topic}, testConsumerGroupHandler{})//你消费结束就会到这里t.Log(err, time.Since(start).String())如果超时了如果我主动调用了cancel
以上两种情况任何一种情况出现了都会让消费者退出消息。
sarama 使用入门指定偏移量消费
在部分场景下我们会希望消费历史消息或者从某个消息开始消费那么可以考虑在Setup里面设置偏移量。
关键调用是 ResetOffset。
不过一般建议走离线渠道操作Kafka集群去重置对应的偏移量。
核心在于你并不是每次重新部署重新启动都是要重置这个偏移量的。
只要你的消费者组在这个分区上有过“已提交的 offset”Kafka 就会优先使用这个提交的 offset而忽略你在 Setup() 中设置的 offset。
// 在每次 rebalance 或初次连接 Kafka 后调用用于初始化。
func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {//执行一些初始化的事情log.Println(Setup)//假设要重置到0var offset int64 0//遍历所有的分区partitions : session.Claims()[test_topic]for _, p : range partitions {session.ResetOffset(test_topic, p, offset, )//session.ResetOffset(test_topic, p, sarama.OffsetNewest, )//session.ResetOffset(test_topic, p, sarama.OffsetOldest, )}return nil
}sarama使用入门异步消费批量提交
正常来说为了在异步消费失败之后还能继续重试可以考虑异步消费一批提交一批。
下图中ctx.Done分支用来控制凑够一批的超时机制防止生产者的速率很低一直凑不够一批。
func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的会话从建立连接到连接彻底断掉的那一段时间//可以通过 session 控制 offset 提交获取消费者信息并感知退出时机。session sarama.ConsumerGroupSession,//claim 是你获取消息的入口claim sarama.ConsumerGroupClaim) error {msgs : claim.Messages()//设置批量处理的条数const batchSize 10for {ctx, cancel : context.WithTimeout(context.Background(), time.Second*1)var eg errgroup.Groupvar last *sarama.ConsumerMessagefor i : 0; i batchSize; i {done : falseselect {case -ctx.Done()://这边表示超时了done truecase msg, ok : -msgs:if !ok {cancel()return nil}last msgmsg1 : msgeg.Go(func() error {//我就在这里消费time.Sleep(time.Second)//你在这里重试log.Println(string(msg1.Value))return nil})}if done {break}}cancel()err : eg.Wait()if err ! nil {//这边能怎么办//记录日志continue}//就这样session.MarkMessage(last, )}return nil
}另外一个分支就是读取消息并且提交到errgroup里面执行。
Sleep是模拟长时间业务执行。