Kafka 基本概念和 go 客户端使用
1. 基本概念
1.1 主题和分区
Kafka 里的消息用主题进行分类(主题好比数据库中的表),主题下有可以被分为若干个 分区(分表技术) 。
分区本质上是个提交日志文件,有新消息,这个消息就会以追加的方式写入分区(写文件的形式),然后用先入先出的顺序读取。
分区的意义在于提高消息的消费速度,通常情况下消费速度是要远小于生产速度的,所以几个分区也就意味着可以同时支持几个消费者并发消费消息。
Kafka通过分区来实现数据冗余和伸缩性,因为分区可以分布在不同的服务器上,那就是说一个主题可以跨越多个服务器(这是Kafka高性能的一个原因,多台服务器的磁盘读写性能比单台更高)。
1.2 消息和批次
消息
Kafka 里的数据单元,也就是我们一般消息中间件里的消息的概念(可以比作数据库中一条记录)。消息由字节数组组成。消息还可以包含键(可选元数据,也是字节数组),主要用于对消息选取分区。
批次
为了提高效率,消息会被分批写入Kafka。批次就是一组消息,这些消息属于同一个主题和分区。如果只传递单个消息,会导致大量的网络开销,把消息分成批次传输可以减少这开销。
在使用 API 发送消息的过程中,会有一个缓冲,不会一条一条发送消息,
首先会根据序列号和分区器将相关的消息组成一个批次
等到这批消息满足两个条件之一之后就会发送,消息累计的大小(例如 10kb)和批次经过的时间(例如 100ms)
1.3 生产者,消费者,偏移量,消费者群组
消费者消费数据:
启动一个消费者群组消费数据,Kafka 给消费者 1 分配消费分区 0 的消息
消费者消费完成后,发送一个 ack 确认消息
对应的分区就会移动偏移量
如果有 4 个分区,但是消费者只有 3 个。那么 Kakfa 会对分区的消费做负载均衡,有一个消费者会消费两个分区
Kafka 中,负载均衡是对分区做的,而不是对消息做的。一个消费者可以消费多个分区,一个分区只能被一个消费者消费
1.4 Broker 和集群
Broker
一个独立的 Kafka 服务器叫 Broker。broker 的主要工作是,接收生产者的消息,设置偏移量,提交消息到磁盘保存;为消费者提供服务,响应请求,返回消息。
在合适的硬件上,单个 broker 可以处理上千个分区和每秒百万级的消息量。( 需要调优才能达到 )
集群
多个 broker 可以组成一个集群。每个集群中 broker 会选举出一个集群控制器。控制器会进行管理,包括将分区分配给 broker 和监控 broker。
集群里,一个分区从属于一个 broker,这个 broker 被称为首领。但是分区可以被分配给多个 broker,这个时候会发生分区复制。
集群中Kafka内部一般使用管道技术进行高效的复制。
1.5 保留消息
在一定期限内保留消息是 Kafka 的一个重要特性,Kafka broker 默认的保留策略是:
要么保留一段时间(7天)
要么保留一定大小(比如1个G)
到了限制,旧消息过期并删除。但是每个主题可以根据业务需求配置自己的保留策略(开发时要注意,Kafka不像 Mysql 之类的永久存储)。
1.6 分区一般怎么划分
完全的顺序消息: 1 个
消息的消费没有顺序要求: 根据业务生产消费能力进行评估,一般配 8 个
2. 组协调器
消费者组:多个消费者可以构成一个消费者群组。怎么构成?共同读取一个主题的消费者们,就形成了一个群组。群组可以保证每个分区只被一个消费者使用。另外,消费者组中加入的第一个消费者为群主
群主负责给所在的消费者组分配分区,分配完成之后,向组协调器发送确认
组协调器通知对应的消费者
群主知道所有的消费者的分区情况,消费者只知道自己的
选举 Leader 消费者客户端
处理申请加入组的客户端
再平衡后同步新的分配方案
维护与客户端的心跳检测
管理消费者已消费偏移量 , 并存储至
__consumer_offset
中
kafka 上的组协调器( GroupCoordinator )协调器有很多,有多少个 __consumer_offset
分区, 那么就有多少个组协调器( GroupCoordinator )
默认情况下, __consumer_offset
有50个分区, 每个消费组都会对应其中的一个分区,对应的逻辑为 hash(group.id
)%分区数。
__consumer_offset
的目的就是保存 consumer 提交的位移
3. 消费者协调器
每个客户端(消费者的客户端)都会有一个消费者协调器, 他的主要作用就是向组协调器发起请求做交互, 以及处理回调逻辑
向组协调器发起入组请求
向组协调器发起同步组请求 ( 如果是Leader客户端 , 则还会计算分配策略数据放到入参传入 )
发起离组请求
保持跟组协调器的心跳线程
向组协调器发送提交已消费偏移量的请求
消费者加入分组的流程
客户端启动的时候, 或者重连的时候会发起JoinGroup的请求来申请加入的组中。
当前客户端都已经完成JoinGroup之后, 客户端会收到JoinGroup的回调, 然后客户端会再次向组协调器发起SyncGroup的请求来获取新的分配方案
当消费者客户端关机/异常 时, 会触发离组LeaveGroup请求。
当然有主动的消费者协调器发起离组请求,也有组协调器一直会有针对每个客户端的心跳检测, 如果监测失败,则就会将这个客户端踢出Group。
客户端加入组内后, 会一直保持一个心跳线程,来保持跟组协调器的一个感知。
并且组协调器会针对每个加入组的客户端做一个心跳监测,如果监测到过期, 则会将其踢出组内并再平衡。
4. 分区再均衡
当消费者群组里的消费者发生变化,或者主题里的分区发生了变化,都会导致再均衡现象的发生
再均衡
增加消费者的时候,新消费者会读取原本其他其他消费者的分区
减少消费者的时候,该消费者的分区就会给别的消费者
增加分区,指定一个消费者消费这个分区
发生这些变化之后导致分区所有权的改变,就称为再均衡
为了使得发生了再均衡之后,消费者能够继续工作,消费者需要读取每个分区最后一次提交的偏移量 (
consumeroffset
),然后从指定的地方,继续做处理。
5. Kafka 集群
5.1 Docker-Compose 配置Kafka集群
KAFKA_CFG_ADVERTISED_LISTENERS
填入自己服务器的 IP 地址
version: "3"
services:
kafka1:
container_name: kafka1
image: 'bitnami/kafka:latest'
ports:
- '19092:9092'
- '19093:9093'
environment:
### 通用配置
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=1
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:19092
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
- KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 不允许自动创建主题
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
# broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
- KAFKA_BROKER_ID=1
volumes:
- ./kafka/broker1:/bitnami/kafka:rw
kafka2:
container_name: kafka2
image: 'bitnami/kafka:latest'
ports:
- '29092:9092'
- '29093:9093'
environment:
### 通用配置
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=2
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://xxx:29092
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
- KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 不允许自动创建主题
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
# broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
- KAFKA_BROKER_ID=2
volumes:
- ./kafka/broker2:/bitnami/kafka:rw
kafka3:
container_name: kafka3
image: 'bitnami/kafka:latest'
ports:
- '39092:9092'
- '39093:9093'
environment:
### 通用配置
# 允许使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=3
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://xxx:39092
# 定义安全协议
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 指定供外部使用的控制类请求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 设置broker最大内存,和初始内存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
- KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 不允许自动创建主题
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
# broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
- KAFKA_BROKER_ID=3
volumes:
- ./kafka/broker3:/bitnami/kafka:rw
# mkdir: cannot create directory '/bitnami/kafka/config': Permission denied
# 报错执行这行命令
sudo chown -R 1001:1001 kafka
5.2 控制器
Kafka 集群的 控制器就是一台 Broker,只不过它除了具有一般 broker的功能之外, 还负责分区首领的选举。
Kafka 使用主题来组织数据, 每个主题被分为若干个分区,每个分区有多个副本。
副本被保存在 broker 上, 每个 broker 可以保存成百上千个属于不同主题和分区的副本。
比如创建一个分区,分区数量3,复制因子 2
// 定义TopicDetail
topicDetail := &sarama.TopicDetail{
// 分区数量
NumPartitions: 3,
// 副本因子数量
ReplicationFactor: 2,
}
topicName := "Test_Topic2"
err := clusterAdmin.CreateTopic(topicName, topicDetail, false)
每个分区可以有多个副本,副本位于集群中不同的 broker 上,也就是说副本的数量不能超过 broker 的数量。
首领副本
每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本 。
跟随者副本
首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一一的任务就是从首领那里复制消息, 保持与首领一致的状态 。 如果首领发生崩溃, 其中的一个跟随者会被提升为新首领 。
5.3 ISR
Kafka的数据复制是以 Partition 为单位的。而多个备份间的数据复制,通过 Follower 向 Leader 拉取数据完成。从一这点来讲,有点像 Master-Slave 方案。不同的是,Kafka 既不是完全的同步复制,也不是完全的异步复制,而是基于 ISR 的动态复制方案。
ISR,也即 In-Sync Replica。每个 Partition 的 Leader 都会维护这样一个列表,该列表中,包含了所有与之同步的 Replica(包含Leader自己)。每次数据写入时,只有ISR 中的所有 Replica 都复制完,Leader才会将其置为 Commit,它才能被Consumer 所消费。
人话: ISR: 动态维护的副本链表
包含 Replica ( leader 和 Follower ) , 每当向 Leader 副本写入新的消息并提交时,ISR 中的 Follower 副本也必须及时地接收到这些消息并且持久化到本地日志中
如果 Follower 副本落后 Leader 副本的消息数量超过了预设的阈值(可通过 replica.lag.time.max.ms
( 这个时间内未能及时向 Leader 发送 fetch 请求或更新其日志,则认为该 Follower 已落后于 Leader ) 则该 Follower 将会被暂时移出 ISR 集合,不再被认为是在同步状态。只有当这个副本追赶上 Leader 并且滞后的时间或消息数重新回到阈值之内时,才会被重新加入到 ISR 集合中
重点:只有 ISR 中的副本才有资格被选为新的 Leader。如果 Leader 或任何 ISR 中的副本出现故障或同步延迟超出限制,则该副本会被暂时移出 ISR,直到其重新追上 Leader 并满足同步要求为止。
// Broker的 min.insync.replicas 参数指定了Broker所要求的ISR最小长度,默认值为1。也即极限情况下ISR可以只包含Leader。
min.insync.replicas
更推荐的做法是,将 acks 设置为 all 或者 -1,此时只有 ISR 中的所有 Replica 都收到该数据(也即该消息被 Commit ),Leader 才会告诉 Producer 该消息发送成功,从而保证不会有未知的数据丢失。
4. go 客户端使用
封装 Kafka 连接
func GetKafkaConn() sarama.Client {
// 设置Kafka的配置
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.V3_6_0_0
kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll //生产者在发送消息时需要等待的确认数
kafkaConfig.Producer.Return.Successes = true // 设置生产者是否返回成功发送的消息。
kafkaConfig.Producer.Flush.Messages = 10 // 设置缓存消息数量
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // 设置缓存时间间隔
// Kafka集群地址
brokers := []string{"xxx.xxx.xxx.xxx:19092", "xxx.xxx.xxx.xxx:29092", "xxx.xxx.xxx.xxx:39092"}
client, err := sarama.NewClient(brokers, kafkaConfig)
if err != nil {
log.Fatalln(err)
}
return client
}
Admin相关配置:
Admin.Retry.Max
: 当执行管理操作(如创建或删除主题)时的最大重试次数。Admin.Retry.Backoff
: 管理操作重试之间的退避时间间隔。Admin.Timeout
: 执行管理操作的总超时时间。
网络连接相关配置:
Net.MaxOpenRequests
: 客户端同时打开的最大请求数。Net.DialTimeout
,Net.ReadTimeout
,Net.WriteTimeout
: 连接、读取和写入Kafka broker时的超时时间。Net.SASL.Handshake
: 是否启用SASL握手。Net.SASL.Version
: SASL握手协议版本。
元数据管理相关配置:
Metadata.Retry.Max
: 获取元数据时的最大重试次数。Metadata.Retry.Backoff
: 元数据请求重试之间的退避时间间隔。Metadata.RefreshFrequency
: 自动刷新元数据的频率。Metadata.Full
: 是否获取完整的元数据信息。Metadata.AllowAutoTopicCreation
: 是否允许自动创建主题。
生产者相关配置:
Producer.MaxMessageBytes
: 单个消息的最大大小。Producer.RequiredAcks
: 指定生产者需要多少个broker确认才能认为消息已成功提交。Producer.Timeout
: 生产者等待所有所需确认的时间。Producer.Partitioner
: 分区器策略,默认为哈希分区。Producer.Retry.Max
和Producer.Retry.Backoff
: 生产者重试次数和重试间隔。Producer.Return.Errors
: 是否将错误返回给生产者。Producer.CompressionLevel
: 消息压缩级别。
事务型生产者相关配置:
Producer.Transaction.Timeout
: 事务超时时间。Producer.Transaction.Retry.Max
和Producer.Transaction.Retry.Backoff
: 事务操作重试次数和间隔。
消费者相关配置:
Consumer.Fetch.Min
和Consumer.Fetch.Default
: 消费者每次fetch请求的最小和默认消息数量。Consumer.Retry.Backoff
: fetch失败后的重试间隔。Consumer.MaxWaitTime
: fetch请求最大等待时间。Consumer.MaxProcessingTime
: 处理FetchRequest的最大时间。Consumer.Return.Errors
: 是否将消费错误返回给应用程序。Consumer.Offsets.AutoCommit.Enable
和Consumer.Offsets.AutoCommit.Interval
: 是否自动提交offset以及提交间隔。Consumer.Offsets.Initial
: 初始偏移量位置(从最新的开始或者最旧的开始)。Consumer.Offsets.Retry.Max
: 提交offset时的最大重试次数。
消费者组协调相关配置:
Consumer.Group.Session.Timeout
: 消费者组会话超时时间。Consumer.Group.Heartbeat.Interval
: 心跳间隔时间。Consumer.Group.Rebalance.GroupStrategies
: 分区再均衡策略列表。Consumer.Group.Rebalance.Timeout
: 再均衡操作超时时间。Consumer.Group.Rebalance.Retry.Max
和Consumer.Group.Rebalance.Retry.Backoff
: 再均衡重试次数和间隔。Consumer.Group.ResetInvalidOffsets
: 是否在加入消费者组时重置无效的offset。
通用配置:
ClientID
: 客户端ID,用于标识不同的客户端实例。ChannelBufferSize
: 用于内部通信的通道缓冲区大小。ApiVersionsRequest
: 是否发送API版本请求以协商支持的Kafka API版本。Version
: Sarama使用的Kafka协议版本。MetricRegistry
: 度量指标注册表,用于收集客户端运行时的各种性能指标。
2.1 发送消息
同步发送消息
func SyncSend() {
kafkaConn := GetKafkaConn()
// 1.创建一个同步生产者
producer, err := sarama.NewSyncProducerFromClient(kafkaConn)
if err != nil {
log.Fatalln(err)
}
// 保证关闭生产者
defer func() {
if err = producer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 2.设置消息类型
message := &sarama.ProducerMessage{
Topic: "sakuras_topic",
//Key: nil,
Value: sarama.StringEncoder("My First Message " + uuid.NewString()),
//Headers: nil,
//Metadata: nil,
//Offset: 0,
//Partition: 0,
//Timestamp: time.Time{},
}
// 3.发送消息
for i := 0; i < 5; i++ {
partition, offset, err := producer.SendMessage(message)
if err != nil {
fmt.Println("Message Send Fail:", err)
} else {
fmt.Println("Send Success, Partition: ", partition, " Offset: ", offset)
}
}
}
如果在创建消息的时候指定 Partition 分区,那么就会发送消息dao Kafka指定分区
如果不指定 Partition 分区,会根据 key 或者 value 自动配置到各个分区
面试题: 为什么要在客户端发送消息的时候做一个缓冲 ( 消息批次 )
减少网络和 IO 的开销
减少 GC ( 当发送大量短暂的消息时,垃圾回收器可能需要更频繁地运行以回收这些消息。通过将多个消息组合成一个批次,可以减少单个消息的内存分配,从而降低垃圾回收的压力。这样,垃圾回收器可以更有效地管理内存,从而提高整体性能。 )
缓存发送的配置
func GetKafkaConn() sarama.Client {
// 设置Kafka的配置
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.V3_6_0_0
kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll //生产者在发送消息时需要等待的确认数
kafkaConfig.Producer.Return.Successes = true // 设置生产者是否返回成功发送的消息。
kafkaConfig.Producer.Flush.Messages = 10 // 设置缓存消息数量
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // 设置缓存时间间隔
// Kafka集群地址
brokers := []string{"xxx.xxx.xxx.xxx:19092", "xxx.xxx.xxx.xxx:29092", "xxx.xxx.xxx.xxx:39092"}
client, err := sarama.NewClient(brokers, kafkaConfig)
if err != nil {
log.Fatalln(err)
}
return client
}
异步发送消息
select 发送异步消息
func ASyncSend() {
conn := GetKafkaConn()
producer, err := sarama.NewAsyncProducerFromClient(conn)
if err != nil {
log.Fatalln(err)
}
message := &sarama.ProducerMessage{
Topic: "async_send",
//Key: nil,
Value: sarama.StringEncoder("async message"),
//Headers: nil,
//Metadata: nil,
//Offset: 0,
//Partition: 0,
//Timestamp: time.Time{},
}
// 异步发送消息
producer.Input() <- message
fmt.Println("massage already send")
// 持续监听发送成功还是失败
for {
select {
case err := <-producer.Errors():
fmt.Println("send to Failed:,", err)
case ok := <-producer.Successes():
fmt.Println("massage send success: topic:", ok.Topic, " partition:", ok.Partition, " offset:", ok.Offset)
return
}
}
}
goroutine 发送异步消息
func ASyncSend() {
conn := GetKafkaConn()
defer conn.Close()
producer, err := sarama.NewAsyncProducerFromClient(conn)
if err != nil {
log.Fatalln(err)
}
message := &sarama.ProducerMessage{
Topic: "async_send",
//Key: nil,
Value: sarama.StringEncoder("async message"),
//Headers: nil,
//Metadata: nil,
//Offset: 0,
//Partition: 0,
//Timestamp: time.Time{},
}
wg := sync.WaitGroup{}
// 开启协程异步发送消息
wg.Add(1)
go func() {
defer wg.Done()
producer.Input() <- message
fmt.Println("massage already send")
}()
// 开启协程持续监听发送成功和失败
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case err := <-producer.Errors():
fmt.Println("send to Failed:,", err)
producer.AsyncClose()
case ok := <-producer.Successes():
fmt.Println("massage send success: topic:", ok.Topic, " partition:", ok.Partition, " offset:", ok.Offset)
return
}
}
}()
wg.Wait()
}
2.2 消费消息
一个分区消费
func ConsumerMessage() {
conn := GetKafkaConn()
defer conn.Close()
Consumer, err := sarama.NewConsumerFromClient(conn)
if err != nil {
log.Fatalln(err)
}
// 给消费者指定对应的topic和分区
partitionConsumer, err := Consumer.ConsumePartition("async_send", 0, sarama.OffsetOldest)
// OFfsetNewest 从最新的消息开始
// offsetOldest 从第一条消息开始
if err != nil {
log.Fatalf("Error consuming from partition %d: %v", 0, err)
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Println("Error closing partition consumer:", err)
}
}()
// 处理接收到的消息
go func(pc sarama.PartitionConsumer) {
for message := range pc.Messages() {
fmt.Printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
}
}(partitionConsumer)
// 等待关闭信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
return
}
}
多个分区消费
func ConsumerMessage() {
conn := GetKafkaConn()
defer conn.Close()
Consumer, err := sarama.NewConsumerFromClient(conn)
if err != nil {
log.Fatalln(err)
}
// 二,获取某个topic下的全部分区
topic := "async_send"
partitions, err := Consumer.Partitions(topic)
if err != nil {
log.Fatalln(err)
}
// 三,遍历全部分区id,每个分区,完成分区消费
for _, partition := range partitions {
partitionConsumer, err := Consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
if err != nil {
log.Fatalln(err)
}
// 每个分区消费者,在独立的goroutine中完成消费
go func(pc sarama.PartitionConsumer) {
defer func() {
if err := pc.Close(); err != nil {
log.Fatalln(err)
}
}()
// 消费
for msg := range pc.Messages() {
log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s\n", msg.Partition, msg.Offset, msg.Topic, msg.Value)
}
}(partitionConsumer)
}
// 等待关闭信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
return
}
}
2.3 自定义消息编码器
// 只需要实现Encoder接口就可以自定义编码器
//type Encoder interface {
// Encode() ([]byte, error)
// Length() int
//}
type Person struct {
Name string
Age int
Addr string
}
type PersonEncoder Person
func (p PersonEncoder) Encode() ([]byte, error) {
bytes, err := json.Marshal(p)
if err != nil {
return nil, err
}
return bytes, nil
}
func (p PersonEncoder) Length() int {
bytes, _ := p.Encode()
return len(bytes)
}
定义消息的时候传入
person := Person{
Name: "Sakura",
Age: 22,
Addr: "Global",
}
message := &sarama.ProducerMessage{
Topic: "sakuras_topic",
//Key: nil,
Value: PersonEncoder(person),
//Headers: nil,
//Metadata: nil,
//Offset: 0,
//Partition: 0,
//Timestamp: time.Time{},
}
2.4 发送消息的 ACK 配置
通过配置指定producer在发送消息时的ack策略:
Request.required.acks=-1,全量同步确认,强可靠性保证
Request.required.acks = 1,leader 确认收到,
Request.required.acks = 0 ,不确认,可以提高吞吐量
const (
// NoResponse doesn't send any response, the TCP ACK is all you get.
NoResponse RequiredAcks = 0
// WaitForLocal waits for only the local commit to succeed before responding.
WaitForLocal RequiredAcks = 1
// WaitForAll waits for all in-sync replicas to commit before responding.
// The minimum number of in-sync replicas is configured on the broker via
// the `min.insync.replicas` configuration key.
WaitForAll RequiredAcks = -1
)
// 设置Kafka配置的时候指定
func GetKafkaConn() sarama.Client {
// 设置Kafka的配置
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = sarama.V3_6_0_0
kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll //生产者在发送消息时需要等待的确认数
kafkaConfig.Producer.Return.Successes = true // 设置生产者是否返回成功发送的消息。
kafkaConfig.Producer.Flush.Messages = 10 // 设置缓存消息数量
kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // 设置缓存时间间隔
// Kafka集群地址
//brokers := []string{"xxx.xxx.xxx.xxx:19092", "xxx.xxx.xxx.xxx:29092", "xxx.xxx.xxx.xxx:39092"}
client, err := sarama.NewClient(brokers, kafkaConfig)
if err != nil {
log.Fatalln(err)
}
return client
}
2.5 Topic 的分区
创建多个分区的 Topic
func Create_Topic() {
// 首先获取集群连接
_, clusterAdmin := GetKafkaConn()
defer func(clusterAdmin sarama.ClusterAdmin) {
err := clusterAdmin.Close()
if err != nil {
log.Fatalln(err)
}
}(clusterAdmin)
// 定义TopicDetail
topicDetail := &sarama.TopicDetail{
// 分区数量
NumPartitions: 5,
// 副本因子数量
ReplicationFactor: 1,
}
topicName := "Test_Topic2"
err := clusterAdmin.CreateTopic(topicName, topicDetail, false)
if err != nil {
log.Println(err)
}
fmt.Println("分区创建成功")
// 创建之后可以遍历查看topic和所分区信息
topicNames := []string{"Test_Topic"}
metadata, err := clusterAdmin.DescribeTopics(topicNames)
for _, topicMeta := range metadata {
fmt.Printf("Topic: %s\n", topicMeta.Name)
// 遍历每个Topic的分区信息
for _, partition := range topicMeta.Partitions {
fmt.Printf(" Partition ID: %d\n", partition.ID)
fmt.Printf(" Leader: %d\n", partition.Leader)
fmt.Printf(" Replicas: %v\n", partition.Replicas)
fmt.Printf(" ISR (In-Sync Replicas): %v\n", partition.Isr)
}
}
return
}
2.6 分区选择策略
生产者的分区选择策略
通过修改配置来决定分区的选择策略
// 随机选择
conf.Producer.Partitioner = sarama.NewRandomPartitioner
// 轮循
conf.Producer.Partitioner = sarama.NewRoundRobinPartitioner
// Hash
conf.Producer.Partitioner = sarama.NewHashPartitioner
// 自定义Hash算法
conf.Producer.Partitioner = sarama.NewCustomHashPartitioner
// 手动指定
conf.Producer.Partitioner = sarama.NewManualPartitioner
// 自定义
conf.Producer.Partitioner = sarama.NewCustomPartitioner()
轮询
指定分区
指定分区需要再定义消息的时候指定发送的分区
value := "Message" + strconv.Itoa(i+10)
message := &sarama.ProducerMessage{
Topic: "Test",
Value: sarama.StringEncoder(value),
Partition: 1,
}
随机
Hash
会根据对应的 key 计算出Hash 值然后取余
message := &sarama.ProducerMessage{
Topic: "Test",
Value: sarama.StringEncoder(value),
Key: sarama.StringEncoder("Key"),
}
消费者的分区选择策略
针对于同一个主题的多个分区,每个分区构建一个分区消费者并发执行,即可完成主题下全部分区的消息消费。
func ConsumerMessage() {
conn, _ := Kafka.GetKafkaConn()
defer func(conn sarama.Client) {
err := conn.Close()
if err != nil {
log.Fatalln(err)
}
}(conn)
Consumer, err := sarama.NewConsumerFromClient(conn)
if err != nil {
log.Fatalln(err)
}
// 二,获取某个topic下的全部分区
topic := "async_send"
partitions, err := Consumer.Partitions(topic)
if err != nil {
log.Fatalln(err)
}
// 三,遍历全部分区id,每个分区,完成分区消费
for _, partition := range partitions {
partitionConsumer, err := Consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
if err != nil {
log.Fatalln(err)
}
// 每个分区消费者,在独立的goroutine中完成消费
go func(pc sarama.PartitionConsumer) {
defer pc.AsyncClose()
// 消费
for msg := range pc.Messages() {
log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s\n", msg.Partition, msg.Offset, msg.Topic, msg.Value)
}
}(partitionConsumer)
}
time.Sleep(time.Second * 5)
}
2.7 消费组
消费组完成消费
func ConsumerGroup() {
conn, _ := Kafka.GetKafkaConn()
// 1.创建消费者组
groupsId := "Sakuras_Group"
group, err := sarama.NewConsumerGroupFromClient(groupsId, conn)
if err != nil {
log.Fatalln(err)
}
defer func() {
group.Close()
}()
// 2.处理错误
go func() {
for err := range group.Errors() {
log.Println(err)
}
}()
// 3.完成组消费
ctx, cancelFunc := context.WithCancel(context.Background())
topic := []string{"Test"}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// 具体的消费操作要在Handler中定义
consumer := GroupConsumer{}
err := group.Consume(ctx, topic, consumer)
if err != nil {
log.Println(err)
}
if ctx.Err() != nil {
log.Println("goroutine canel")
return
}
}
}()
// 终止
ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt)
select {
case <-ch:
cancelFunc()
}
wg.Wait()
}
// GroupConsumer 定义组消费处理器 group, consume,handler
type GroupConsumer struct {
}
// Setup 重新消费时执行
func (g GroupConsumer) Setup(session sarama.ConsumerGroupSession) error {
return nil
}
// Cleanup 在消费者组停止处理消息之后被调用(当组内消费者退出时执行)
func (g GroupConsumer) Cleanup(session sarama.ConsumerGroupSession) error {
fmt.Println("cleanup 方法调用")
return nil
}
// 组消费的核心方法
func (g GroupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
fmt.Printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
}
return nil
}
消费者组 handler 接口方法的执行
// GroupConsumer 定义组消费处理器 group, consume,handler
type GroupConsumer struct {
}
// Setup 重新消费时执行(新加入消费者时执行)
func (g GroupConsumer) Setup(session sarama.ConsumerGroupSession) error {
log.Println("setup")
log.Println(session.Claims()) // 打印该消费者的获得的分区
// 可以重置(修改)起始分区
session.ResetOffset("Test", 0, 2048, "")
"")
return nil
}
// Cleanup 在消费者组停止处理消息之后被调用(当组内消费者退出时执行)
func (g GroupConsumer) Cleanup(session sarama.ConsumerGroupSession) error {
fmt.Println("cleanup 方法调用")
return nil
}
// ConsumeClaim 组消费的核心方法
func (g GroupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
fmt.Printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
}
return nil
}
消费者组的分配策略
Sticky:
一种更加优化的分配方案,目的:
分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个
分区的分配尽可能的与上次分配的保持相同,也就是当消费者变化时,Reblance操作,尽量保证原来分配的分区还在
RoundRobin:
将全部订阅的 Topic 和 partition 作为一个整体,轮流分配:
1Topic,10Partition,3Consumer
C1, T-0, T-3, T-6, T-9
C2, T-1, T-4, T-7
C3, T-2, T-5, T-8
Range:
Range,根据分区数量,确定每个消费者分配几个分区,有多余的分区,前面的分区先分配。
1Topic,10Partition,3Consumer
C1, T-0, T-1, T-2, T-3
C2, T-4, T-5, T-6
C3, T-7, T-8, T-9
kafkaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
sarama.NewBalanceStrategySticky(), // 粘性分区,新的优化方案
sarama.NewBalanceStrategyRoundRobin(), // 轮询
sarama.NewBalanceStrategyRange(), // 默认方案
}
有多个分区策略是因为有些消费者 ( 其他语言的客户端 ) 可能不支持其他的分区策略
评论区