目 录CONTENT

文章目录

Kafka 基本概念和 go 客户端使用

Sakura
2024-02-20 / 0 评论 / 0 点赞 / 42 阅读 / 38579 字 / 正在检测是否收录...

Kafka 基本概念和 go 客户端使用

1. 基本概念

1.1 主题和分区

Kafka 里的消息用主题进行分类(主题好比数据库中的表),主题下有可以被分为若干个 分区(分表技术)

分区本质上是个提交日志文件,有新消息,这个消息就会以追加的方式写入分区(写文件的形式),然后用先入先出的顺序读取。

分区的意义在于提高消息的消费速度,通常情况下消费速度是要远小于生产速度的所以几个分区也就意味着可以同时支持几个消费者并发消费消息。

Kafka通过分区来实现数据冗余和伸缩性,因为分区可以分布在不同的服务器上,那就是说一个主题可以跨越多个服务器(这是Kafka高性能的一个原因,多台服务器的磁盘读写性能比单台更高)。

1.2 消息和批次

  • 消息

Kafka 里的数据单元,也就是我们一般消息中间件里的消息的概念(可以比作数据库中一条记录)。消息由字节数组组成。消息还可以包含键(可选元数据,也是字节数组),主要用于对消息选取分区。

  • 批次

为了提高效率,消息会被分批写入Kafka。批次就是一组消息,这些消息属于同一个主题和分区。如果只传递单个消息,会导致大量的网络开销,把消息分成批次传输可以减少这开销。

在使用 API 发送消息的过程中,会有一个缓冲,不会一条一条发送消息,

  1. 首先会根据序列号和分区器将相关的消息组成一个批次

  2. 等到这批消息满足两个条件之一之后就会发送,消息累计的大小(例如 10kb)和批次经过的时间(例如 100ms)

1.3 生产者,消费者,偏移量,消费者群组

消费者消费数据:

  1. 启动一个消费者群组消费数据,Kafka 给消费者 1 分配消费分区 0 的消息

  2. 消费者消费完成后,发送一个 ack 确认消息

  3. 对应的分区就会移动偏移量

如果有 4 个分区,但是消费者只有 3 个。那么 Kakfa 会对分区的消费做负载均衡,有一个消费者会消费两个分区

Kafka 中,负载均衡是对分区做的,而不是对消息做的。一个消费者可以消费多个分区,一个分区只能被一个消费者消费

1.4 Broker 和集群

  • Broker

一个独立的 Kafka 服务器叫 Broker。broker 的主要工作是,接收生产者的消息,设置偏移量,提交消息到磁盘保存;为消费者提供服务,响应请求,返回消息。

在合适的硬件上,单个 broker 可以处理上千个分区和每秒百万级的消息量。( 需要调优才能达到 )

  • 集群

多个 broker 可以组成一个集群。每个集群中 broker 会选举出一个集群控制器。控制器会进行管理,包括将分区分配给 broker 和监控 broker。

集群里,一个分区从属于一个 broker,这个 broker 被称为首领。但是分区可以被分配给多个 broker,这个时候会发生分区复制。

集群中Kafka内部一般使用管道技术进行高效的复制。

1.5 保留消息

在一定期限内保留消息是 Kafka 的一个重要特性,Kafka broker 默认的保留策略是:

  1. 要么保留一段时间(7天)

  2. 要么保留一定大小(比如1个G)

到了限制,旧消息过期并删除。但是每个主题可以根据业务需求配置自己的保留策略(开发时要注意,Kafka不像 Mysql 之类的永久存储)。

1.6 分区一般怎么划分

完全的顺序消息: 1 个

消息的消费没有顺序要求: 根据业务生产消费能力进行评估,一般配 8 个

2. 组协调器

  • 消费者组:多个消费者可以构成一个消费者群组。怎么构成?共同读取一个主题的消费者们,就形成了一个群组。群组可以保证每个分区只被一个消费者使用。另外,消费者组中加入的第一个消费者为群主

  1. 群主负责给所在的消费者组分配分区,分配完成之后,向组协调器发送确认

  2. 组协调器通知对应的消费者

群主知道所有的消费者的分区情况,消费者只知道自己的

  1. 选举 Leader 消费者客户端

  2. 处理申请加入组的客户端

  3. 再平衡后同步新的分配方案

  4. 维护与客户端的心跳检测

  5. 管理消费者已消费偏移量 , 并存储至 __consumer_offset

kafka 上的组协调器( GroupCoordinator )协调器有很多,有多少个 __consumer_offset分区, 那么就有多少个组协调器( GroupCoordinator )

默认情况下, __consumer_offset50个分区, 每个消费组都会对应其中的一个分区,对应的逻辑为 hash(group.id)%分区数。

__consumer_offset的目的就是保存 consumer 提交的位移

3. 消费者协调器

每个客户端(消费者的客户端)都会有一个消费者协调器, 他的主要作用就是向组协调器发起请求做交互, 以及处理回调逻辑

  1. 向组协调器发起入组请求

  2. 向组协调器发起同步组请求 ( 如果是Leader客户端 , 则还会计算分配策略数据放到入参传入 )

  3. 发起离组请求

  4. 保持跟组协调器的心跳线程

  5. 向组协调器发送提交已消费偏移量的请求

  • 消费者加入分组的流程

  1. 客户端启动的时候, 或者重连的时候会发起JoinGroup的请求来申请加入的组中。

  2. 当前客户端都已经完成JoinGroup之后, 客户端会收到JoinGroup的回调, 然后客户端会再次向组协调器发起SyncGroup的请求来获取新的分配方案

  3. 当消费者客户端关机/异常 时, 会触发离组LeaveGroup请求。

当然有主动的消费者协调器发起离组请求,也有组协调器一直会有针对每个客户端的心跳检测, 如果监测失败,则就会将这个客户端踢出Group。

  1. 客户端加入组内后, 会一直保持一个心跳线程,来保持跟组协调器的一个感知。

并且组协调器会针对每个加入组的客户端做一个心跳监测,如果监测到过期, 则会将其踢出组内并再平衡。

4. 分区再均衡

消费者群组里的消费者发生变化或者主题里的分区发生了变化,都会导致再均衡现象的发生

  • 再均衡

增加消费者的时候,新消费者会读取原本其他其他消费者的分区

减少消费者的时候,该消费者的分区就会给别的消费者

增加分区,指定一个消费者消费这个分区

发生这些变化之后导致分区所有权的改变,就称为再均衡

为了使得发生了再均衡之后,消费者能够继续工作,消费者需要读取每个分区最后一次提交的偏移量 ( consumeroffset ),然后从指定的地方,继续做处理。

5. Kafka 集群

5.1 Docker-Compose 配置Kafka集群

KAFKA_CFG_ADVERTISED_LISTENERS 填入自己服务器的 IP 地址

version: "3"

services:
  kafka1:
    container_name: kafka1
    image: 'bitnami/kafka:latest'
    ports:
      - '19092:9092'
      - '19093:9093'
    environment:
      ### 通用配置
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=1
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://xxxx:19092
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
      - KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 不允许自动创建主题
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
      - KAFKA_BROKER_ID=1
    volumes:
      - ./kafka/broker1:/bitnami/kafka:rw

  kafka2:
    container_name: kafka2
    image: 'bitnami/kafka:latest'
    ports:
      - '29092:9092'
      - '29093:9093'
    environment:
      ### 通用配置
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=2
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://xxx:29092
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
      - KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 不允许自动创建主题
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
      - KAFKA_BROKER_ID=2
    volumes:
      - ./kafka/broker2:/bitnami/kafka:rw

  kafka3:
    container_name: kafka3
    image: 'bitnami/kafka:latest'
    ports:
      - '39092:9092'
      - '39093:9093'
    environment:
      ### 通用配置
      # 允许使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_NODE_ID=3
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      # 定义kafka服务端socket监听端口(Docker内部的ip地址和端口)
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # 定义外网访问地址(宿主机ip地址和端口)ip不能是0.0.0.0
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://xxx:39092
      # 定义安全协议
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 指定供外部使用的控制类请求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 设置broker最大内存,和初始内存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
      # 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可(22byte)
      - KAFKA_KRAFT_CLUSTER_ID=xYcCyHmJlIaLzLoBzVwIcP
      # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 不允许自动创建主题
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      # broker.id,必须唯一,且与KAFKA_CFG_NODE_ID一致
      - KAFKA_BROKER_ID=3
    volumes:
      - ./kafka/broker3:/bitnami/kafka:rw

# mkdir: cannot create directory '/bitnami/kafka/config': Permission denied
# 报错执行这行命令
sudo chown -R 1001:1001 kafka

5.2 控制器

Kafka 集群的 控制器就是一台 Broker,只不过它除了具有一般 broker的功能之外, 还负责分区首领的选举。

Kafka 使用主题来组织数据, 每个主题被分为若干个分区,每个分区有多个副本。

副本被保存在 broker 上, 每个 broker 可以保存成百上千个属于不同主题和分区的副本。

比如创建一个分区,分区数量3,复制因子 2

// 定义TopicDetail
topicDetail := &sarama.TopicDetail{
	// 分区数量
	NumPartitions: 3,
	// 副本因子数量
	ReplicationFactor: 2,
}
topicName := "Test_Topic2"
err := clusterAdmin.CreateTopic(topicName, topicDetail, false)

每个分区可以有多个副本,副本位于集群中不同的 broker 上,也就是说副本的数量不能超过 broker 的数量。

  • 首领副本

每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本 。

  • 跟随者副本

首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一一的任务就是从首领那里复制消息, 保持与首领一致的状态 。 如果首领发生崩溃, 其中的一个跟随者会被提升为新首领 。

5.3 ISR

Kafka的数据复制是以 Partition 为单位的。而多个备份间的数据复制,通过 Follower 向 Leader 拉取数据完成。从一这点来讲,有点像 Master-Slave 方案。不同的是,Kafka 既不是完全的同步复制,也不是完全的异步复制,而是基于 ISR 的动态复制方案。

ISR,也即 In-Sync Replica。每个 Partition 的 Leader 都会维护这样一个列表,该列表中,包含了所有与之同步的 Replica(包含Leader自己)。每次数据写入时,只有ISR 中的所有 Replica 都复制完,Leader才会将其置为 Commit,它才能被Consumer 所消费。

人话: ISR: 动态维护的副本链表

包含 Replica ( leader 和 Follower ) , 每当向 Leader 副本写入新的消息并提交时,ISR 中的 Follower 副本也必须及时地接收到这些消息并且持久化到本地日志中

如果 Follower 副本落后 Leader 副本的消息数量超过了预设的阈值(可通过 replica.lag.time.max.ms( 这个时间内未能及时向 Leader 发送 fetch 请求或更新其日志,则认为该 Follower 已落后于 Leader ) 则该 Follower 将会被暂时移出 ISR 集合,不再被认为是在同步状态。只有当这个副本追赶上 Leader 并且滞后的时间或消息数重新回到阈值之内时,才会被重新加入到 ISR 集合中

重点:只有 ISR 中的副本才有资格被选为新的 Leader。如果 Leader 或任何 ISR 中的副本出现故障或同步延迟超出限制,则该副本会被暂时移出 ISR,直到其重新追上 Leader 并满足同步要求为止。

// Broker的 min.insync.replicas 参数指定了Broker所要求的ISR最小长度,默认值为1。也即极限情况下ISR可以只包含Leader。
min.insync.replicas

更推荐的做法是,将 acks 设置为 all 或者 -1,此时只有 ISR 中的所有 Replica 都收到该数据(也即该消息被 Commit ),Leader 才会告诉 Producer 该消息发送成功,从而保证不会有未知的数据丢失。

4. go 客户端使用

封装 Kafka 连接

func GetKafkaConn() sarama.Client {
	// 设置Kafka的配置
	kafkaConfig := sarama.NewConfig()
	kafkaConfig.Version = sarama.V3_6_0_0
	kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll         //生产者在发送消息时需要等待的确认数
	kafkaConfig.Producer.Return.Successes = true                  // 设置生产者是否返回成功发送的消息。
	kafkaConfig.Producer.Flush.Messages = 10                      // 设置缓存消息数量
	kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // 设置缓存时间间隔
	// Kafka集群地址
	brokers := []string{"xxx.xxx.xxx.xxx:19092", "xxx.xxx.xxx.xxx:29092", "xxx.xxx.xxx.xxx:39092"}
	client, err := sarama.NewClient(brokers, kafkaConfig)
	if err != nil {
		log.Fatalln(err)
	}
	return client
}
  1. Admin相关配置:

    • Admin.Retry.Max: 当执行管理操作(如创建或删除主题)时的最大重试次数。

    • Admin.Retry.Backoff: 管理操作重试之间的退避时间间隔。

    • Admin.Timeout: 执行管理操作的总超时时间。

  2. 网络连接相关配置:

    • Net.MaxOpenRequests: 客户端同时打开的最大请求数。

    • Net.DialTimeout, Net.ReadTimeout, Net.WriteTimeout: 连接、读取和写入Kafka broker时的超时时间。

    • Net.SASL.Handshake: 是否启用SASL握手。

    • Net.SASL.Version: SASL握手协议版本。

  3. 元数据管理相关配置:

    • Metadata.Retry.Max: 获取元数据时的最大重试次数。

    • Metadata.Retry.Backoff: 元数据请求重试之间的退避时间间隔。

    • Metadata.RefreshFrequency: 自动刷新元数据的频率。

    • Metadata.Full: 是否获取完整的元数据信息。

    • Metadata.AllowAutoTopicCreation: 是否允许自动创建主题。

  4. 生产者相关配置:

    • Producer.MaxMessageBytes: 单个消息的最大大小。

    • Producer.RequiredAcks: 指定生产者需要多少个broker确认才能认为消息已成功提交。

    • Producer.Timeout: 生产者等待所有所需确认的时间。

    • Producer.Partitioner: 分区器策略,默认为哈希分区。

    • Producer.Retry.MaxProducer.Retry.Backoff: 生产者重试次数和重试间隔。

    • Producer.Return.Errors: 是否将错误返回给生产者。

    • Producer.CompressionLevel: 消息压缩级别。

  5. 事务型生产者相关配置:

    • Producer.Transaction.Timeout: 事务超时时间。

    • Producer.Transaction.Retry.MaxProducer.Transaction.Retry.Backoff: 事务操作重试次数和间隔。

  6. 消费者相关配置:

    • Consumer.Fetch.MinConsumer.Fetch.Default: 消费者每次fetch请求的最小和默认消息数量。

    • Consumer.Retry.Backoff: fetch失败后的重试间隔。

    • Consumer.MaxWaitTime: fetch请求最大等待时间。

    • Consumer.MaxProcessingTime: 处理FetchRequest的最大时间。

    • Consumer.Return.Errors: 是否将消费错误返回给应用程序。

    • Consumer.Offsets.AutoCommit.EnableConsumer.Offsets.AutoCommit.Interval: 是否自动提交offset以及提交间隔。

    • Consumer.Offsets.Initial: 初始偏移量位置(从最新的开始或者最旧的开始)。

    • Consumer.Offsets.Retry.Max: 提交offset时的最大重试次数。

  7. 消费者组协调相关配置:

    • Consumer.Group.Session.Timeout: 消费者组会话超时时间。

    • Consumer.Group.Heartbeat.Interval: 心跳间隔时间。

    • Consumer.Group.Rebalance.GroupStrategies: 分区再均衡策略列表。

    • Consumer.Group.Rebalance.Timeout: 再均衡操作超时时间。

    • Consumer.Group.Rebalance.Retry.MaxConsumer.Group.Rebalance.Retry.Backoff: 再均衡重试次数和间隔。

    • Consumer.Group.ResetInvalidOffsets: 是否在加入消费者组时重置无效的offset。

  8. 通用配置:

    • ClientID: 客户端ID,用于标识不同的客户端实例。

    • ChannelBufferSize: 用于内部通信的通道缓冲区大小。

    • ApiVersionsRequest: 是否发送API版本请求以协商支持的Kafka API版本。

    • Version: Sarama使用的Kafka协议版本。

    • MetricRegistry: 度量指标注册表,用于收集客户端运行时的各种性能指标。

2.1 发送消息

同步发送消息

func SyncSend() {
	kafkaConn := GetKafkaConn()

	// 1.创建一个同步生产者
	producer, err := sarama.NewSyncProducerFromClient(kafkaConn)
	if err != nil {
		log.Fatalln(err)
	}
	// 保证关闭生产者
	defer func() {
		if err = producer.Close(); err != nil {
			log.Fatalln(err)
		}
	}()

	// 2.设置消息类型
	message := &sarama.ProducerMessage{
		Topic: "sakuras_topic",
		//Key:       nil,
		Value: sarama.StringEncoder("My First Message " + uuid.NewString()),
		//Headers:   nil,
		//Metadata:  nil,
		//Offset:    0,
		//Partition: 0,
		//Timestamp: time.Time{},
	}

	// 3.发送消息
	for i := 0; i < 5; i++ {
		partition, offset, err := producer.SendMessage(message)
		if err != nil {
			fmt.Println("Message Send Fail:", err)
		} else {
			fmt.Println("Send Success, Partition: ", partition, " Offset: ", offset)
		}
	}
}

如果在创建消息的时候指定 Partition 分区,那么就会发送消息dao Kafka指定分区

如果不指定 Partition 分区,会根据 key 或者 value 自动配置到各个分区

面试题: 为什么要在客户端发送消息的时候做一个缓冲 ( 消息批次 )

  1. 减少网络和 IO 的开销

  2. 减少 GC ( 当发送大量短暂的消息时,垃圾回收器可能需要更频繁地运行以回收这些消息。通过将多个消息组合成一个批次,可以减少单个消息的内存分配,从而降低垃圾回收的压力。这样,垃圾回收器可以更有效地管理内存,从而提高整体性能。 )

  • 缓存发送的配置

func GetKafkaConn() sarama.Client {
	// 设置Kafka的配置
	kafkaConfig := sarama.NewConfig()
	kafkaConfig.Version = sarama.V3_6_0_0
	kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll         //生产者在发送消息时需要等待的确认数
	kafkaConfig.Producer.Return.Successes = true                  // 设置生产者是否返回成功发送的消息。
	kafkaConfig.Producer.Flush.Messages = 10                      // 设置缓存消息数量
	kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // 设置缓存时间间隔
	// Kafka集群地址
	brokers := []string{"xxx.xxx.xxx.xxx:19092", "xxx.xxx.xxx.xxx:29092", "xxx.xxx.xxx.xxx:39092"}
	client, err := sarama.NewClient(brokers, kafkaConfig)
	if err != nil {
		log.Fatalln(err)
	}
	return client
}

异步发送消息

  • select 发送异步消息

func ASyncSend() {
	conn := GetKafkaConn()
	producer, err := sarama.NewAsyncProducerFromClient(conn)
	if err != nil {
		log.Fatalln(err)
	}
	message := &sarama.ProducerMessage{
		Topic: "async_send",
		//Key:       nil,
		Value: sarama.StringEncoder("async message"),
		//Headers:   nil,
		//Metadata:  nil,
		//Offset:    0,
		//Partition: 0,
		//Timestamp: time.Time{},
	}

	// 异步发送消息
	producer.Input() <- message
	fmt.Println("massage already send")

	// 持续监听发送成功还是失败
	for {
		select {
		case err := <-producer.Errors():
			fmt.Println("send to Failed:,", err)
		case ok := <-producer.Successes():
			fmt.Println("massage send success: topic:", ok.Topic, " partition:", ok.Partition, " offset:", ok.Offset)
			return
		}
	}
}

  • goroutine 发送异步消息

func ASyncSend() {
	conn := GetKafkaConn()
	defer conn.Close()
	producer, err := sarama.NewAsyncProducerFromClient(conn)
	if err != nil {
		log.Fatalln(err)
	}
	message := &sarama.ProducerMessage{
		Topic: "async_send",
		//Key:       nil,
		Value: sarama.StringEncoder("async message"),
		//Headers:   nil,
		//Metadata:  nil,
		//Offset:    0,
		//Partition: 0,
		//Timestamp: time.Time{},
	}

	wg := sync.WaitGroup{}
	// 开启协程异步发送消息
	wg.Add(1)
	go func() {
		defer wg.Done()
		producer.Input() <- message
		fmt.Println("massage already send")
	}()
	// 开启协程持续监听发送成功和失败
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case err := <-producer.Errors():
				fmt.Println("send to Failed:,", err)
				producer.AsyncClose()
			case ok := <-producer.Successes():
				fmt.Println("massage send success: topic:", ok.Topic, " partition:", ok.Partition, " offset:", ok.Offset)
				return
			}
		}
	}()
	wg.Wait()
}

2.2 消费消息

  • 一个分区消费

func ConsumerMessage() {
	conn := GetKafkaConn()
	defer conn.Close()

	Consumer, err := sarama.NewConsumerFromClient(conn)
	if err != nil {
		log.Fatalln(err)
	}
	// 给消费者指定对应的topic和分区
	partitionConsumer, err := Consumer.ConsumePartition("async_send", 0, sarama.OffsetOldest)
	// OFfsetNewest 从最新的消息开始
	// offsetOldest 从第一条消息开始
	if err != nil {
		log.Fatalf("Error consuming from partition %d: %v", 0, err)
	}
	defer func() {
		if err := partitionConsumer.Close(); err != nil {
			log.Println("Error closing partition consumer:", err)
		}
	}()

	// 处理接收到的消息
	go func(pc sarama.PartitionConsumer) {
		for message := range pc.Messages() {
			fmt.Printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
		}
	}(partitionConsumer)

	// 等待关闭信号
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
	select {
	case <-signals:
		return
	}
}
  • 多个分区消费

 func ConsumerMessage() {
	conn := GetKafkaConn()
	defer conn.Close()

	Consumer, err := sarama.NewConsumerFromClient(conn)
	if err != nil {
		log.Fatalln(err)
	}

	// 二,获取某个topic下的全部分区
	topic := "async_send"
	partitions, err := Consumer.Partitions(topic)
	if err != nil {
		log.Fatalln(err)
	}

	// 三,遍历全部分区id,每个分区,完成分区消费
	for _, partition := range partitions {
		partitionConsumer, err := Consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
		if err != nil {
			log.Fatalln(err)
		}

		// 每个分区消费者,在独立的goroutine中完成消费
		go func(pc sarama.PartitionConsumer) {
			defer func() {
				if err := pc.Close(); err != nil {
					log.Fatalln(err)
				}
			}()

			// 消费
			for msg := range pc.Messages() {
				log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s\n", msg.Partition, msg.Offset, msg.Topic, msg.Value)
			}
		}(partitionConsumer)
	}

	// 等待关闭信号
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)
	select {
	case <-signals:
		return
	}
}

2.3 自定义消息编码器

// 只需要实现Encoder接口就可以自定义编码器
//type Encoder interface {
//	Encode() ([]byte, error)
//	Length() int
//}

type Person struct {
	Name string
	Age  int
	Addr string
}

type PersonEncoder Person

func (p PersonEncoder) Encode() ([]byte, error) {
	bytes, err := json.Marshal(p)
	if err != nil {
		return nil, err
	}
	return bytes, nil
}

func (p PersonEncoder) Length() int {
	bytes, _ := p.Encode()
	return len(bytes)
}

定义消息的时候传入

person := Person{
	Name: "Sakura",
	Age:  22,
	Addr: "Global",
}
message := &sarama.ProducerMessage{
	Topic: "sakuras_topic",
	//Key:       nil,
	Value: PersonEncoder(person),
	//Headers:   nil,
	//Metadata:  nil,
	//Offset:    0,
	//Partition: 0,
	//Timestamp: time.Time{},
}

2.4 发送消息的 ACK 配置

通过配置指定producer在发送消息时的ack策略:

  • Request.required.acks=-1,全量同步确认,强可靠性保证

  • Request.required.acks = 1,leader 确认收到,

  • Request.required.acks = 0 ,不确认,可以提高吞吐量

const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAcks = -1
)

// 设置Kafka配置的时候指定
func GetKafkaConn() sarama.Client {
	// 设置Kafka的配置
	kafkaConfig := sarama.NewConfig()
	kafkaConfig.Version = sarama.V3_6_0_0
	kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll         //生产者在发送消息时需要等待的确认数
	kafkaConfig.Producer.Return.Successes = true                  // 设置生产者是否返回成功发送的消息。
	kafkaConfig.Producer.Flush.Messages = 10                      // 设置缓存消息数量
	kafkaConfig.Producer.Flush.Frequency = 500 * time.Millisecond // 设置缓存时间间隔
	// Kafka集群地址
	//brokers := []string{"xxx.xxx.xxx.xxx:19092", "xxx.xxx.xxx.xxx:29092", "xxx.xxx.xxx.xxx:39092"}
	client, err := sarama.NewClient(brokers, kafkaConfig)
	if err != nil {
		log.Fatalln(err)
	}
	return client
}

2.5 Topic 的分区

  • 创建多个分区的 Topic

func Create_Topic() {
	// 首先获取集群连接
	_, clusterAdmin := GetKafkaConn()
	defer func(clusterAdmin sarama.ClusterAdmin) {
		err := clusterAdmin.Close()
		if err != nil {
			log.Fatalln(err)
		}
	}(clusterAdmin)

	// 定义TopicDetail
	topicDetail := &sarama.TopicDetail{
		// 分区数量
		NumPartitions: 5,
		// 副本因子数量
		ReplicationFactor: 1,
	}
	topicName := "Test_Topic2"
	err := clusterAdmin.CreateTopic(topicName, topicDetail, false)
	if err != nil {
		log.Println(err)
	}
	fmt.Println("分区创建成功")


	// 创建之后可以遍历查看topic和所分区信息
	topicNames := []string{"Test_Topic"}
	metadata, err := clusterAdmin.DescribeTopics(topicNames)
	for _, topicMeta := range metadata {
		fmt.Printf("Topic: %s\n", topicMeta.Name)
		// 遍历每个Topic的分区信息
		for _, partition := range topicMeta.Partitions {
			fmt.Printf("  Partition ID: %d\n", partition.ID)
			fmt.Printf("  Leader: %d\n", partition.Leader)
			fmt.Printf("  Replicas: %v\n", partition.Replicas)
			fmt.Printf("  ISR (In-Sync Replicas): %v\n", partition.Isr)
		}
	}
	return
}

2.6 分区选择策略

生产者的分区选择策略

通过修改配置来决定分区的选择策略

// 随机选择
conf.Producer.Partitioner = sarama.NewRandomPartitioner
// 轮循
conf.Producer.Partitioner = sarama.NewRoundRobinPartitioner
// Hash
conf.Producer.Partitioner = sarama.NewHashPartitioner
// 自定义Hash算法
conf.Producer.Partitioner = sarama.NewCustomHashPartitioner
// 手动指定
conf.Producer.Partitioner = sarama.NewManualPartitioner
// 自定义
conf.Producer.Partitioner = sarama.NewCustomPartitioner()

  • 轮询

  • 指定分区

指定分区需要再定义消息的时候指定发送的分区

value := "Message" + strconv.Itoa(i+10)
message := &sarama.ProducerMessage{
	Topic: "Test",
	Value: sarama.StringEncoder(value),
	Partition: 1,
}

  • 随机

  • Hash

会根据对应的 key 计算出Hash 值然后取余

		message := &sarama.ProducerMessage{
			Topic:     "Test",
			Value:     sarama.StringEncoder(value),
			Key:       sarama.StringEncoder("Key"),
		}

消费者的分区选择策略

针对于同一个主题的多个分区,每个分区构建一个分区消费者并发执行,即可完成主题下全部分区的消息消费。

func ConsumerMessage() {
	conn, _ := Kafka.GetKafkaConn()
	defer func(conn sarama.Client) {
		err := conn.Close()
		if err != nil {
			log.Fatalln(err)
		}
	}(conn)

	Consumer, err := sarama.NewConsumerFromClient(conn)
	if err != nil {
		log.Fatalln(err)
	}

	// 二,获取某个topic下的全部分区
	topic := "async_send"
	partitions, err := Consumer.Partitions(topic)
	if err != nil {
		log.Fatalln(err)
	}

	// 三,遍历全部分区id,每个分区,完成分区消费
	for _, partition := range partitions {
		partitionConsumer, err := Consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
		if err != nil {
			log.Fatalln(err)
		}

		// 每个分区消费者,在独立的goroutine中完成消费
		go func(pc sarama.PartitionConsumer) {
			defer pc.AsyncClose()

			// 消费
			for msg := range pc.Messages() {
				log.Printf("Consumed message, partition: %d, offset: %d, topic: %s, value: %s\n", msg.Partition, msg.Offset, msg.Topic, msg.Value)
			}
		}(partitionConsumer)
	}

	time.Sleep(time.Second * 5)
}

2.7 消费组

消费组完成消费

func ConsumerGroup() {
	conn, _ := Kafka.GetKafkaConn()
	// 1.创建消费者组
	groupsId := "Sakuras_Group"
	group, err := sarama.NewConsumerGroupFromClient(groupsId, conn)
	if err != nil {
		log.Fatalln(err)
	}
	defer func() {
		group.Close()
	}()

	// 2.处理错误
	go func() {
		for err := range group.Errors() {
			log.Println(err)
		}
	}()

	// 3.完成组消费
	ctx, cancelFunc := context.WithCancel(context.Background())
	topic := []string{"Test"}

	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			// 具体的消费操作要在Handler中定义
			consumer := GroupConsumer{}

			err := group.Consume(ctx, topic, consumer)
			if err != nil {
				log.Println(err)
			}
			if ctx.Err() != nil {
				log.Println("goroutine canel")
				return
			}
		}
	}()

	// 终止
	ch := make(chan os.Signal)
	signal.Notify(ch, os.Interrupt)
	select {
	case <-ch:
		cancelFunc()
	}
	wg.Wait()
}

// GroupConsumer 定义组消费处理器 group, consume,handler
type GroupConsumer struct {
}

// Setup 重新消费时执行
func (g GroupConsumer) Setup(session sarama.ConsumerGroupSession) error {
	return nil
}

// Cleanup 在消费者组停止处理消息之后被调用(当组内消费者退出时执行)
func (g GroupConsumer) Cleanup(session sarama.ConsumerGroupSession) error {
	fmt.Println("cleanup 方法调用")
	return nil
}

// 组消费的核心方法
func (g GroupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		fmt.Printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))

	}
	return nil
}

消费者组 handler 接口方法的执行

// GroupConsumer 定义组消费处理器 group, consume,handler
type GroupConsumer struct {
}

// Setup 重新消费时执行(新加入消费者时执行)
func (g GroupConsumer) Setup(session sarama.ConsumerGroupSession) error {
    log.Println("setup")
    log.Println(session.Claims()) // 打印该消费者的获得的分区
	// 可以重置(修改)起始分区
	session.ResetOffset("Test", 0, 2048, "")
 "")
    return nil
}

// Cleanup 在消费者组停止处理消息之后被调用(当组内消费者退出时执行)
func (g GroupConsumer) Cleanup(session sarama.ConsumerGroupSession) error {
	fmt.Println("cleanup 方法调用")
	return nil
}

// ConsumeClaim 组消费的核心方法
func (g GroupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		fmt.Printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))

	}
	return nil
}

消费者组的分配策略

Sticky:

一种更加优化的分配方案,目的:

  1. 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个

  2. 分区的分配尽可能的与上次分配的保持相同,也就是当消费者变化时,Reblance操作,尽量保证原来分配的分区还在

RoundRobin:

将全部订阅的 Topic 和 partition 作为一个整体,轮流分配:

1Topic,10Partition,3Consumer

C1, T-0, T-3, T-6, T-9

C2, T-1, T-4, T-7

C3, T-2, T-5, T-8

Range:

Range,根据分区数量,确定每个消费者分配几个分区,有多余的分区,前面的分区先分配。

1Topic,10Partition,3Consumer

C1, T-0, T-1, T-2, T-3

C2, T-4, T-5, T-6

C3, T-7, T-8, T-9

kafkaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
	sarama.NewBalanceStrategySticky(), // 粘性分区,新的优化方案
	sarama.NewBalanceStrategyRoundRobin(), // 轮询
	sarama.NewBalanceStrategyRange(), // 默认方案
}

有多个分区策略是因为有些消费者 ( 其他语言的客户端 ) 可能不支持其他的分区策略

0

评论区