建网站 西安,wordpress 导入docx,深圳人才市场现场招聘信息,网站开发验收方案Kafka是一种高吞吐量的分布式发布订阅消息系统#xff0c;本文介绍了如何使用kafka-go这个库实现Go语言与kafka的交互。
Go社区中目前有三个比较常用的kafka客户端库 , 它们各有特点。
首先是IBM/sarama#xff08;这个库已经由Shopify转给了IBM#xff09;#xff0c;之…Kafka是一种高吞吐量的分布式发布订阅消息系统本文介绍了如何使用kafka-go这个库实现Go语言与kafka的交互。
Go社区中目前有三个比较常用的kafka客户端库 , 它们各有特点。
首先是IBM/sarama这个库已经由Shopify转给了IBM之前我写过一篇使用sarama操作Kafka的教程相较于sarama kafka-go 更简单、更易用。
segmentio/kafka-go 是纯Go实现提供了与kafka交互的低级别和高级别两套API同时也支持Context。
此外社区中另一个比较常用的confluentinc/confluent-kafka-go它是一个基于cgo的librdkafka包装在项目中使用它会引入对C库的依赖。
准备Kafka环境
这里推荐使用Docker Compose快速搭建一套本地开发环境。
以下docker-compose.yml文件用来搭建一套单节点zookeeper和单节点kafka环境并且在8080端口提供kafka-ui管理界面。
version: 2.1services:zoo1:image: confluentinc/cp-zookeeper:7.3.2hostname: zoo1container_name: zoo1ports:- 2181:2181environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_SERVER_ID: 1ZOOKEEPER_SERVERS: zoo1:2888:3888kafka1:image: confluentinc/cp-kafka:7.3.2hostname: kafka1container_name: kafka1ports:- 9092:9092- 29092:29092- 9999:9999environment:KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXTKAFKA_INTER_BROKER_LISTENER_NAME: INTERNALKAFKA_ZOOKEEPER_CONNECT: zoo1:2181KAFKA_BROKER_ID: 1KAFKA_LOG4J_LOGGERS: kafka.controllerINFO,kafka.producer.async.DefaultEventHandlerINFO,state.change.loggerINFOKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_JMX_PORT: 9999KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizerKAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: truedepends_on:- zoo1kafka-ui:container_name: kafka-uiimage: provectuslabs/kafka-ui:latestports:- 8080:8080depends_on:- kafka1environment:DYNAMIC_CONFIG_ENABLED: TRUE
将上述docker-compose.yml文件在本地保存在同一目录下执行以下命令启动容器。
docker-compose up -d容器启动后使用浏览器打开127.0.0.1:8080 即可看到如下kafka-ui界面。 点击页面右侧的“Configure new cluster”按钮配置kafka服务连接信息。 填写完信息后点击页面下方的“Submit”按钮提交即可。 安装kafka-go
执行以下命令下载 kafka-go依赖。
go get github.com/segmentio/kafka-go注意kafka-go 需要 Go 1.15或更高版本。 kafka-go使用指南
kafka-go 提供了两套与Kafka交互的API。
低级别 low-level基于与 Kafka 服务器的原始网络连接实现。高级别high-level对于常用读写操作封装了一套更易用的API。
通常建议直接使用高级别的交互API。
Connection
Conn 类型是 kafka-go 包的核心。它代表与 Kafka broker之间的连接。基于它实现了一套与Kafka交互的低级别 API。
发送消息
下面是连接至Kafka之后使用Conn发送消息的代码示例。
// writeByConn 基于Conn发送消息
func writeByConn() {topic : my-topicpartition : 0// 连接至Kafka集群的Leader节点conn, err : kafka.DialLeader(context.Background(), tcp, localhost:9092, topic, partition)if err ! nil {log.Fatal(failed to dial leader:, err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))// 发送消息_, err conn.WriteMessages(kafka.Message{Value: []byte(one!)},kafka.Message{Value: []byte(two!)},kafka.Message{Value: []byte(three!)},)if err ! nil {log.Fatal(failed to write messages:, err)}// 关闭连接if err : conn.Close(); err ! nil {log.Fatal(failed to close writer:, err)}
}
消费消息
// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partitiontopic : my-topicpartition : 0// 连接至Kafka的leader节点conn, err : kafka.DialLeader(context.Background(), tcp, localhost:9092, topic, partition)if err ! nil {log.Fatal(failed to dial leader:, err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息得到的batch是一系列消息的迭代器batch : conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b : make([]byte, 10e3) // 10KB max per messagefor {n, err : batch.Read(b)if err ! nil {break}fmt.Println(string(b[:n]))}// 关闭batchif err : batch.Close(); err ! nil {log.Fatal(failed to close batch:, err)}// 关闭连接if err : conn.Close(); err ! nil {log.Fatal(failed to close connection:, err)}
}
使用batch.Read更高效一些但是需要根据消息长度选择合适的buffer上述代码中的b如果传入的buffer太小消息装不下就会返回io.ErrShortBuffer错误。
如果不考虑内存分配的效率问题也可以按以下代码使用batch.ReadMessage读取消息。
for {msg, err : batch.ReadMessage()if err ! nil {break}fmt.Println(string(msg.Value))
}
创建topic
当Kafka关闭自动创建topic的设置时可按如下方式创建topic。
// createTopicByConn 创建topic
func createTopicByConn() {// 指定要创建的topic名称topic : my-topic// 连接至任意kafka节点conn, err : kafka.Dial(tcp, localhost:9092)if err ! nil {panic(err.Error())}defer conn.Close()// 获取当前控制节点信息controller, err : conn.Controller()if err ! nil {panic(err.Error())}var controllerConn *kafka.Conn// 连接至leader节点controllerConn, err kafka.Dial(tcp, net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err ! nil {panic(err.Error())}defer controllerConn.Close()topicConfigs : []kafka.TopicConfig{{Topic: topic,NumPartitions: 1,ReplicationFactor: 1,},}// 创建topicerr controllerConn.CreateTopics(topicConfigs...)if err ! nil {panic(err.Error())}
}
通过非leader节点连接leader节点
下面的示例代码演示了如何通过已有的非leader节点的Conn连接至 leader节点。
conn, err : kafka.Dial(tcp, localhost:9092)
if err ! nil {panic(err.Error())
}
defer conn.Close()
// 获取当前控制节点信息
controller, err : conn.Controller()
if err ! nil {panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err kafka.Dial(tcp, net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err ! nil {panic(err.Error())
}
defer connLeader.Close()
获取topic列表
conn, err : kafka.Dial(tcp, localhost:9092)
if err ! nil {panic(err.Error())
}
defer conn.Close()partitions, err : conn.ReadPartitions()
if err ! nil {panic(err.Error())
}m : map[string]struct{}{}
// 遍历所有分区取topic
for _, p : range partitions {m[p.Topic] struct{}{}
}
for k : range m {fmt.Println(k)
}
Reader
Reader是由 kafka-go 包提供的另一个概念对于从单个主题-分区topic-partition消费消息这种典型场景使用它能够简化代码。Reader 还实现了自动重连和偏移量管理并支持使用 Context 支持异步取消和超时的 API。
注意 当进程退出时必须在 Reader 上调用 Close() 。Kafka服务器需要一个优雅的断开连接来阻止它继续尝试向已连接的客户端发送消息。如果进程使用 SIGINT (shell 中的 Ctrl-C)或 SIGTERM (如 docker stop 或 kubernetes start)终止那么下面给出的示例不会调用 Close()。当同一topic上有新Reader连接时可能导致延迟(例如新进程启动或新容器运行)。在这种场景下应使用signal.Notify处理程序在进程关闭时关闭Reader。
消费消息
下面的代码演示了如何使用Reader连接至Kafka消费消息。
// readByReader 通过Reader接收消息
func readByReader() {// 创建Readerr : kafka.NewReader(kafka.ReaderConfig{Brokers: []string{localhost:9092, localhost:9093, localhost:9094},Topic: topic-A,Partition: 0,MaxBytes: 10e6, // 10MB})r.SetOffset(42) // 设置Offset// 接收消息for {m, err : r.ReadMessage(context.Background())if err ! nil {break}fmt.Printf(message at offset %d: %s %s\n, m.Offset, string(m.Key), string(m.Value))}// 程序退出前关闭Readerif err : r.Close(); err ! nil {log.Fatal(failed to close reader:, err)}
}
消费者组
kafka-go支持消费者组包括broker管理的offset。要启用消费者组只需在 ReaderConfig 中指定 GroupID。
使用消费者组时ReadMessage 会自动提交偏移量。
// 创建一个reader指定GroupID从 topic-A 消费消息
r : kafka.NewReader(kafka.ReaderConfig{Brokers: []string{localhost:9092, localhost:9093, localhost:9094},GroupID: consumer-group-id, // 指定消费者组idTopic: topic-A,MaxBytes: 10e6, // 10MB
})// 接收消息
for {m, err : r.ReadMessage(context.Background())if err ! nil {break}fmt.Printf(message at topic/partition/offset %v/%v/%v: %s %s\n, m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}// 程序退出前关闭Reader
if err : r.Close(); err ! nil {log.Fatal(failed to close reader:, err)
}
在使用消费者组时会有以下限制 (*Reader).SetOffset 当设置了GroupID时会返回错误 (*Reader).Offset 当设置了GroupID时会永远返回 -1 (*Reader).Lag 当设置了GroupID时会永远返回 -1 (*Reader).ReadLag 当设置了GroupID时会返回错误 (*Reader).Stats 当设置了GroupID时会返回一个-1的分区 显式提交
kafka-go 也支持显式提交。当需要显式提交时不要调用 ReadMessage而是调用 FetchMessage获取消息然后调用 CommitMessages 显式提交。
ctx : context.Background()
for {// 获取消息m, err : r.FetchMessage(ctx)if err ! nil {break}// 处理消息fmt.Printf(message at topic/partition/offset %v/%v/%v: %s %s\n, m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))// 显式提交if err : r.CommitMessages(ctx, m); err ! nil {log.Fatal(failed to commit messages:, err)}
}
在消费者组中提交消息时具有给定主题/分区的最大偏移量的消息确定该分区的提交偏移量的值。例如如果通过调用 FetchMessage 获取了单个分区的偏移量为 1、2 和 3 的消息则使用偏移量为3的消息调用 CommitMessages 也将导致该分区的偏移量为 1 和 2 的消息被提交。
管理提交间隔
默认情况下调用CommitMessages将同步向Kafka提交偏移量。为了提高性能可以在ReaderConfig中设置CommitInterval来定期向Kafka提交偏移。
// 创建一个reader从 topic-A 消费消息
r : kafka.NewReader(kafka.ReaderConfig{Brokers: []string{localhost:9092, localhost:9093, localhost:9094},GroupID: consumer-group-id,Topic: topic-A,MaxBytes: 10e6, // 10MBCommitInterval: time.Second, // 每秒刷新一次提交给 Kafka
})
Writer
向Kafka发送消息除了使用基于Conn的低级APIkafka-go包还提供了更高级别的 Writer 类型。大多数情况下使用Writer即可满足条件它支持以下特性。
对错误进行自动重试和重新连接。在可用分区之间可配置的消息分布。向Kafka同步或异步写入消息。使用Context的异步取消。关闭时清除挂起的消息以支持正常关闭。在发布消息之前自动创建不存在的topic。
发送消息
// 创建一个writer 向topic-A发送消息
w : kafka.Writer{Addr: kafka.TCP(localhost:9092, localhost:9093, localhost:9094),Topic: topic-A,Balancer: kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布RequiredAcks: kafka.RequireAll, // ack模式Async: true, // 异步
}err : w.WriteMessages(context.Background(),kafka.Message{Key: []byte(Key-A),Value: []byte(Hello World!),},kafka.Message{Key: []byte(Key-B),Value: []byte(One!),},kafka.Message{Key: []byte(Key-C),Value: []byte(Two!),},
)
if err ! nil {log.Fatal(failed to write messages:, err)
}if err : w.Close(); err ! nil {log.Fatal(failed to close writer:, err)
}
创建不存在的topic
如果给Writer配置了AllowAutoTopicCreation:true那么当发送消息至某个不存在的topic时则会自动创建topic。 // 创建不存在的topic
// 如果给Writer配置了AllowAutoTopicCreation:true那么当发送消息至某个不存在的topic时则会自动创建topic。
func writeByWriter2() {writer : kafka.Writer{Addr: kafka.TCP(192.168.2.204:9092),Topic: kafka-test-topic,AllowAutoTopicCreation: true, //自动创建topic}messages : []kafka.Message{{Key: []byte(Key-A),Value: []byte(Hello World!),},{Key: []byte(Key-B),Value: []byte(One!),},{Key: []byte(Key-C),Value: []byte(Tow!),},}const retries 3//重试3次for i : 0; i retries; i {ctx, cancel : context.WithTimeout(context.Background(), 10*time.Second)defer cancel()err : writer.WriteMessages(ctx, messages...)if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {time.Sleep(time.Millisecond * 250)continue}if err ! nil {log.Fatal(unexpected error %v, err)}break}//关闭Writerif err : writer.Close(); err ! nil {log.Fatal(failed to close writer:, err)}
}写入多个topic
通常WriterConfig.Topic用于初始化单个topic的Writer。通过去掉WriterConfig中的Topic配置分别设置每条消息的message.topic可以实现将消息发送至多个topic。
w : kafka.Writer{Addr: kafka.TCP(localhost:9092, localhost:9093, localhost:9094),// 注意: 当此处不设置Topic时,后续的每条消息都需要指定TopicBalancer: kafka.LeastBytes{},
}err : w.WriteMessages(context.Background(),// 注意: 每条消息都需要指定一个 Topic, 否则就会报错kafka.Message{Topic: topic-A,Key: []byte(Key-A),Value: []byte(Hello World!),},kafka.Message{Topic: topic-B,Key: []byte(Key-B),Value: []byte(One!),},kafka.Message{Topic: topic-C,Key: []byte(Key-C),Value: []byte(Two!),},
)
if err ! nil {log.Fatal(failed to write messages:, err)
}if err : w.Close(); err ! nil {log.Fatal(failed to close writer:, err)
}
注意Writer中的Topic和Message中的Topic是互斥的同一时刻有且只能设置一处。
其他配置
TLS
对于基本的 Conn 类型或在 Reader/Writer 配置中可以在Dialer中设置TLS选项。如果 TLS 字段为空则它将不启用TLS 连接。
注意不在Conn/Reder/Writer上配置TLS连接到启用TLS的Kafka集群可能会出现io.ErrUnexpectedEOF错误。
Connection
dialer : kafka.Dialer{Timeout: 10 * time.Second,DualStack: true,TLS: tls.Config{...tls config...}, // 指定TLS配置
}conn, err : dialer.DialContext(ctx, tcp, localhost:9093)
Reader
dialer : kafka.Dialer{Timeout: 10 * time.Second,DualStack: true,TLS: tls.Config{...tls config...}, // 指定TLS配置
}r : kafka.NewReader(kafka.ReaderConfig{Brokers: []string{localhost:9092, localhost:9093, localhost:9094},GroupID: consumer-group-id,Topic: topic-A,Dialer: dialer,
})
Writer
创建Writer时可以按如下方式指定TLS配置。
w : kafka.Writer{Addr: kafka.TCP(localhost:9092, localhost:9093, localhost:9094), Topic: topic-A,Balancer: kafka.Hash{},Transport: kafka.Transport{TLS: tls.Config{}, // 指定TLS配置},}