作为 Zookeeper 的替代,Kafka 3.3.1 提供了 KRaft 元数据管理组件。KRaft 替换 ZK,并不是元数据存储重新造轮子,而核心是集群协调机制的演进。整个通信协调机制本质上是事件驱动模型,也就是 Metadata as an Event Log,Leader 通过 KRaft 生产权威的事件,Follower 和 Broker 通过监听 KRaft 来获得这些事件,并且顺序处理事件,达到集群状态和期望的最终一致。

Kafka集群中的节点类型

一个Kafka集群是由下列几种类型的节点构成的,它们充当着不同的作用:

  • Broker节点:即代理节点,是Kafka中的工作节点,充当消息队列的角色,负责储存和处理消息,每个Broker都是一个独立的Kafka服务器,可以在不同的机器上运行,除此之外Broker还负责分区(partition)的管理,将主题(topic)划分为多个分区,并分布在集群的不同Broker上
  • Controller节点:即控制器节点,是集群中的特殊节点,负责储存和管理整个集群元数据和状态,它能够监控整个集群中的Broker,在需要时还能够进行平衡操作
  • 混合节点:即同时担任Broker和Controller节点角色的节点

两种模式集群的搭建方式

(1) Zookeeper模式集群

在这种模式下,每个Kafka节点都是依赖于ZK的,使用ZK存储集群中所有节点的元数据。只要所有的Kafka节点连接到同一个ZK上面(或者同一个ZK集群),这些Kafka节点就构成了一个集群,哪怕只有一个Kafka节点。ZK节点(或者集群)就充当了Controller的角色,而所有的Kafka节点就充当着Broker的角色。

(2) KRaft模式集群

在上述传统方案中,Kafka需要依赖ZK完成元数据存放和共享,有两个主要问题:

  • 搭建Kafka集群时还需要额外搭建ZK,增加了运维成本
  • ZK是强一致性的组件(符合CP理论),任何集群状态变化需要至少超过一半同步完成,这样性能差。

KRaft模式是新版本Kafka中推出的集群模式,它完全不需要ZK,只需要数个Kafka节点就可以直接构成集群。我们可以手动配置Kafka节点充当Controller或Broker节点,亦或同时充当这两种角色。

在KRaft模式中,集群的节点会通过投票选举的方式,选择出一个主要的Controller节点,这个节点也称作领导者,它将负责维护整个集群的元数据和状态信息,那么其它的Controller节点或者混合节点就称之为追随者,它们会从领导者同步集群元数据和状态信息。如果领导者宕机了,所有的节点会重新投票选举一个新的领导者。在选举过程中,所有的节点都会参与投票过程,而候选节点只会是Controller节点或者混合节点(单纯Broker节点不会被选举为领导者)。

默认情况下Kafka集群中的Broker节点和Controller节点监听不同的端口:

  • Broker节点是Kafka集群中的数据节点(消息队列),监听9092端口,这个端口可以称作客户端通信端口
  • Controller节点是Kafka集群中的控制器节点,监听9093端口,这个端口可以称作控制器端口
  • 混合节点同时监听90929093端口

Docker安装Kafka

官方下载地址:https://kafka.apache.org/downloads

1
2
3
4
5
# 获取镜像文件
docker pull docker.m.daocloud.io/apache/kafka:3.8.0
docker tag docker.m.daocloud.io/apache/kafka:3.8.0 10.10.200.11:5000/apache/kafka:3.8.0
docker rmi docker.m.daocloud.io/apache/kafka:3.8.0
docker push 10.10.200.11:5000/apache/kafka:3.8.0

单机版部署

docker中kafka的主安装目录为:/opt/kafka,数据存放默认目录:/tmp/kafka-logs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
docker run -id --name=kafka \
	-p 9092:9092 \
	-v /k8s-data/kafka-sdx/kafka-config:/mnt/shared/config \
	-v /k8s-data/kafka-sdx/kafka-data:/tmp/kafka-logs \
	-v /k8s-data/kafka-sdx/kafka-secret:/etc/kafka/secrets \
	-e LANG=C.UTF-8 \
	-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
	-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
	-e CLUSTER_ID=Kafka-SdxTest \
	-e KAFKA_NODE_ID=1 \
	-e KAFKA_PROCESS_ROLES=broker,controller \
	-e KAFKA_CONTROLLER_QUORUM_VOTERS="1@127.0.0.1:9093" \
	-e KAFKA_LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093" \
	-e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://k8snode11:9092" \
	10.10.200.11:5000/apache/kafka:3.8.0

混合模式集群部署

容器名 节点id 节点外网地址 节点类型 容器9092映射到宿主机 容器9093映射到宿主机
kafka-1 1 10.10.213.11 broker,controller 9001 10001
kafka-2 2 10.10.213.11 broker,controller 9002 10002
kafka-3 3 10.10.213.11 broker,controller 9003 10003
# 节点1
docker run -id --name=kafka-1 \
	-p 9001:9092 -p 10001:9093 \
	-v /k8s-data/kafka-sdx/kafka-config-1:/mnt/shared/config \
	-v /k8s-data/kafka-sdx/kafka-data-1:/tmp/kafka-logs \
	-v /k8s-data/kafka-sdx/kafka-secret-1:/etc/kafka/secrets \
	-e LANG=C.UTF-8 \
	-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
	-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
	-e CLUSTER_ID=Kafka-SdxTest \
	-e KAFKA_NODE_ID=1 \
	-e KAFKA_PROCESS_ROLES=broker,controller \
	-e KAFKA_CONTROLLER_QUORUM_VOTERS="1@10.10.213.11:10001,2@10.10.213.11:10002,3@10.10.213.11:10003" \
	-e KAFKA_LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093" \
	-e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://10.10.213.11:9001" \
	10.10.200.11:5000/apache/kafka:3.8.0

# 节点2
docker run -id --name=kafka-2 \
	-p 9002:9092 -p 10002:9093 \
	-v /k8s-data/kafka-sdx/kafka-config-2:/mnt/shared/config \
	-v /k8s-data/kafka-sdx/kafka-data-2:/tmp/kafka-logs \
	-v /k8s-data/kafka-sdx/kafka-secret-2:/etc/kafka/secrets \
	-e LANG=C.UTF-8 \
	-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
	-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
	-e CLUSTER_ID=Kafka-SdxTest \
	-e KAFKA_NODE_ID=2 \
	-e KAFKA_PROCESS_ROLES=broker,controller \
	-e KAFKA_CONTROLLER_QUORUM_VOTERS="1@10.10.213.11:10001,2@10.10.213.11:10002,3@10.10.213.11:10003" \
	-e KAFKA_LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093" \
	-e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://10.10.213.11:9002" \
	10.10.200.11:5000/apache/kafka:3.8.0

# 节点3
docker run -id --name=kafka-3 \
	-p 9003:9092 -p 10003:9093 \
	-v /k8s-data/kafka-sdx/kafka-config-3:/mnt/shared/config \
	-v /k8s-data/kafka-sdx/kafka-data-3:/tmp/kafka-logs \
	-v /k8s-data/kafka-sdx/kafka-secret-3:/etc/kafka/secrets \
	-e LANG=C.UTF-8 \
	-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
	-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
	-e CLUSTER_ID=Kafka-SdxTest \
	-e KAFKA_NODE_ID=3 \
	-e KAFKA_PROCESS_ROLES=broker,controller \
	-e KAFKA_CONTROLLER_QUORUM_VOTERS="1@10.10.213.11:10001,2@10.10.213.11:10002,3@10.10.213.11:10003" \
	-e KAFKA_LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093" \
	-e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://10.10.213.11:9003" \
	10.10.200.11:5000/apache/kafka:3.8.0

Broker + Controller集群部署

这是推荐的生产环境的集群部署方式,集群中不存在混合节点,每个节点要么是Broker类型,要么是Controller类型:

容器名 节点id 节点外网地址 节点类型 容器9092映射到宿主机 容器9093映射到宿主机
kafka-1 1 10.10.213.11 controller / 10001
kafka-2 2 10.10.213.11 broker 9002 /
kafka-3 3 10.10.213.11 broker 9003 /
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 节点1-Controller
docker run -id --name=kafka-1 \
	-p 10001:9093 \
	-v /k8s-data/kafka-sdx/kafka-config-1:/mnt/shared/config \
	-v /k8s-data/kafka-sdx/kafka-data-1:/tmp/kafka-logs \
	-v /k8s-data/kafka-sdx/kafka-secret-1:/etc/kafka/secrets \
	-e LANG=C.UTF-8 \
	-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
	-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
	-e CLUSTER_ID=Kafka-SdxTest2 \
	-e KAFKA_NODE_ID=1 \
	-e KAFKA_PROCESS_ROLES=controller \
	-e KAFKA_CONTROLLER_QUORUM_VOTERS="1@10.10.213.11:10001" \
	-e KAFKA_LISTENERS="CONTROLLER://:9093" \
	10.10.200.11:5000/apache/kafka:3.8.0

# 节点2-Broker
docker run -id --name=kafka-2 \
	-p 9002:9092 \
	-v /k8s-data/kafka-sdx/kafka-config-2:/mnt/shared/config \
	-v /k8s-data/kafka-sdx/kafka-data-2:/tmp/kafka-logs \
	-v /k8s-data/kafka-sdx/kafka-secret-2:/etc/kafka/secrets \
	-e LANG=C.UTF-8 \
	-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
	-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
	-e CLUSTER_ID=Kafka-SdxTest2 \
	-e KAFKA_NODE_ID=2 \
	-e KAFKA_PROCESS_ROLES=broker \
	-e KAFKA_CONTROLLER_QUORUM_VOTERS="1@10.10.213.11:10001" \
	-e KAFKA_LISTENERS="PLAINTEXT://:9092" \
	-e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://10.10.213.11:9002" \
	10.10.200.11:5000/apache/kafka:3.8.0

# 节点3-Broker
docker run -id --name=kafka-3 \
	-p 9003:9092 \
	-v /k8s-data/kafka-sdx/kafka-config-3:/mnt/shared/config \
	-v /k8s-data/kafka-sdx/kafka-data-3:/tmp/kafka-logs \
	-v /k8s-data/kafka-sdx/kafka-secret-3:/etc/kafka/secrets \
	-e LANG=C.UTF-8 \
	-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
	-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
	-e CLUSTER_ID=Kafka-SdxTest2 \
	-e KAFKA_NODE_ID=3 \
	-e KAFKA_PROCESS_ROLES=broker \
	-e KAFKA_CONTROLLER_QUORUM_VOTERS="1@10.10.213.11:10001" \
	-e KAFKA_LISTENERS="PLAINTEXT://:9092" \
	-e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://10.10.213.11:9003" \
	10.10.200.11:5000/apache/kafka:3.8.0

简单测试

命令行参数:

  • –partitions 每个topic的分区数
  • –replication-factor 每个分区的副本数(默认为1,意思是没有复制,数据只有一份,这是不安全的)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# kafka-3上做测试
# cd /opt/kafka/bin
# 1. 创建主题
./kafka-topics.sh --create --topic tpc-sdxtst1 --bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 3
# 2. 发送消息
./kafka-console-producer.sh --topic tpc-sdxtst1 --bootstrap-server localhost:9092
# 3. 接收消息
./kafka-console-consumer.sh --topic tpc-sdxtst1 --bootstrap-server localhost:9092 \
--from-beginning

其它命令示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 主题
./kafka-topics.sh --bootstrap-server localhost:9092 --list
./kafka-topics.sh --bootstrap-server localhost:9092 --topic tpc-sdxtst1 --describe
# 删除主题(注意其它连接必须全部关闭,删除操作才能删除干净)
./kafka-topics.sh --bootstrap-server localhost:9092 --topic tpc-sdxtst1 --delete
# 修改分区数(只能加大)
./kafka-topics.sh --bootstrap-server localhost:9092 --topic tpc-sdxtst1 --alter --partitions 4

# 消费组
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group gpName
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group gpName

下面是3个分区3个副本的例子:

/opt/kafka/bin$ ./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic tpc-sdxtst1
Topic: tpc-sdxtst1 TopicId: pm6Yl3PPTPKluxh2dxEFQg PartitionCount: 3 ReplicationFactor: 3 Configs: 
Topic: tpc-sdxtst1 Partition: 0    Leader: 1 Replicas: 1,2,3 Isr: 1,2,3      Elr:    LastKnownElr: 
Topic: tpc-sdxtst1 Partition: 1    Leader: 2 Replicas: 2,3,1 Isr: 2,3,1      Elr:    LastKnownElr: 
Topic: tpc-sdxtst1 Partition: 2    Leader: 3 Replicas: 3,1,2 Isr: 3,1,2      Elr:    LastKnownElr: 

image-20241009232135726

参考阅读:

https://juejin.cn/post/7380421216019365897

(完)