1. Kafka概述

在 Kafka 架构中,有几个术语需要了解下。

  • Zookeeper:Kafka 通过 ZooKeeper 存储集群的 Meta 信息等;

  • Controller:Kafka 集群中的一个服务器,用来进行 Leader Election 以及各种 Fail Over;

  • Broker:Kafka 集群由多个 Kafka 实例(Server)组成,每个实例构成一个 Broker,其实就是服务器;

  • Producer:生产者,即消息发送者,Push 消息到 Kafka 集群的 Broker(就是 Server)中;

  • Topic:Producer 向 Kafka 集群 Push 的消息会被归于某一类别,即 Topic。本质上,这只是一个逻辑概念,面向的对象是 Producer 和 Consumer,Producer 只需关注将消息 Push 到哪一个 Topic 中,而 Consumer 只需关心自己订阅了哪个 Topic;

  • Partition:每个 Topic 又被分为多个 Partition,即物理分区。出于负载均衡的考虑,同一个 Topic 的 Partition 分别存储于 Kafka 集群的多个 Broker 上。而为了提高可靠性,这些 Partition 可以由 Kafka 机制中的 Replicas 来设置备份的数量;

  • Replicas:Partition 的副本,保障 Partition 的高可用;

  • Consumer:消费者,从 Kafka 集群的 Broker 中 Pull 消息、消费消息;

  • Consumer Group:High-Level Consumer API 中,每个 Consumer 都属于一个 Consumer Group,每条消息只能被 Consumer Group 中的一个 Consumer 消费,但可以被多个 Consumer Group 消费;

  • Leader:Replicas 中的一个角色, Producer 和 Consumer 只与 Leader 交互;

  • Follower:Replicas 中的一个角色,从 Leader 中复制数据,作为它的副本,同时一旦某 Leader 挂掉,便会从它的所有 Follower 中选举出一个新的 Leader 继续提供服务;

架构示意图

image-20241013082917813

特性讨论

  1. 一个topic多个partition,一个partition一主多从,leader承担所有的读写,然后同步到follower。
  2. 老版本producer/consumer需要直接和zk交互,新版本直接和broker交互,因为zk写性能不好。
  3. producer内部维护内存队列,异步一定间隔或数量发送到broker,失败的话,默认重试3次;同步发送,需要实时调用flush。
  4. 每个partition分布在不同server,一个partition只能被同组的一个consumer消费,一个consumer可以消费多个partition,所以一个topic在多个partition的情况下是无法保证有序的,为了简单和有序,consumer的数量不能大于partition的数量。
  5. 某个消息只有当所有follower都保存成功,这条消息才能被消费。
  6. 投递保证支持 at least one / at most one / exactly once
  7. 一旦出现leader出现故障,follower选举新leader时,可能遇到所有follower都落后于leader的情况,这时候可以选择最新的follower成为leader,但是可能碰到partition过多的问题。
  8. kafka使用pull模式,好处是消费速度可控,消费进度也可控。
  9. kafka消息投递机制推荐至少一次投递,如果未接收成功重新投递直到成功,对于服务端而言,重复消费要比丢数据好处理。
  10. kafka写入高性能(1. 单个文件顺序写,多个文件或者磁盘会出现随机写;2. 先写buff再flush到磁盘,减小io次数)
  11. 存储上使用了顺序访问磁盘和零拷贝技术(将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中),使得其具有非常强大的吞吐性能
  12. kafka读取指定offset(1. 文件大小指定,可以通过offset和文件大小计算出数据在哪个文件,2. 通过chunk size以及基于有序消息的二分查找,可以快速定位到指定offset)
  13. producer/consumer信息存储zookeeper,kafka的consumer本地记录offset,一定时间间隔注册zk。
  14. 据说机械硬盘顺序写比内存随机读要快。
  15. partition删数据:partition其实是个文件夹,segment是文件夹里面的小文件,过期或者磁盘不足可以删除比较老的segment。
  16. 消息路由默认实现,消息轮流选择partition,好处:负载均衡,坏处:同一个用户的消息可能放到不同的partition。
  17. 自治rebalance机制(旧):每次group的consumer出现变更,所有consumer都退出group然后,重新加入group触发分配partition。自治rebalance,通过zk保存状态,自身计算自己应该消费partition,坏处:这种情况依赖了zk的数据一致性,可能出现脑裂,不可控;每次变更都触发计算;好处:简单。
  18. 集中rebalance机制(新):zk为每个group选定一个leader,由leader计算分配方案,存放在syncGroup,然后每个consumer主动syncGroup拉去方案并且执行监听。
  19. ISR机制:partition的follower异步同步,但是需要所有follower成功commited,消息才能被消费。这里的异步指的是leader已经接收到消息并且写入磁盘,但是还没commited,目前可以配置时间间隔,如果落后时间大于指定配置,把备份从ISR剔除,可以保证吞吐率和一致性。如果leader挂了,从ISR中选择活的节点选为leader,原本故障节点恢复时,首先先剔除未commit的消息,然后同步当前leader的节点数据,直到一致才能重新加入ISR。如果ISR都不可用,两种策略:1. 等待以前ISR中的任何一个备份恢复(慢,但是数据不丢失),2. 等待任何一个备份恢复无论以前是否在ISR(默认策略,快,数据丢失,比如故障节点)

Topic & Partition

为了便于区分消息,Producer 向 Kafka 集群 Push 的消息会被归于某一类别,即 Topic。为了负载均衡、增强可扩展性,Topic 又被分为多个 Partition。从存储层面来看,每一个 Partition 都是一个有序的、不可变的记录序列,通俗点就是一个追加日志(Append Log)文件。每个 Partition 中的记录都会被分配一个称为偏移量(Offset)的序列 ID 号,该序列 ID 号唯一地标识 Partition 内的每个记录。

Kafka 机制中,Producer Push 的消息是追加(Append)到 Partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存,如下图所示:

image-20240929123944042

Kafka 为什么要将 Topic 分区

简而言之:负载均衡 + 水平扩展。

前已述及,Topic 只是逻辑概念,面向的是 Producer 和 Consumer,而 Partition 则是物理概念。可以想象,如果 Topic 不进行分区,而将 Topic 内的消息存储于一个 Broker,那么该 Topic 的所有读写请求都将由这个 Broker 处理,吞吐量很容易陷入瓶颈,这显然不适合高吞吐量应用场景。

有了 Partition 概念,假设一个 Topic 被分为 10 个 Partition,Kafka 会根据一定的算法将 10 个 Partition 尽可能均匀地分布到不同的 Broker(服务器)上。当 Producer 发布消息时,Producer 客户端可以采用 Random、Key-Hash 及轮询等算法选定目标 Partition。若不指定,Kafka 也将根据一定算法将其置于某一分区上。Partiton 机制可以极大地提高吞吐量,并且使系统具备良好的水平扩展能力。

2. Kafka 高可靠性实现基础解读

谈及可靠性,最常规、最有效的策略就是“副本(Replication)机制”,Kafka 实现高可靠性同样采用了该策略。通过调节副本相关参数,可使 Kafka 在性能和可靠性之间取得平衡。

Kafka 文件存储机制

事实上Partition 并不是最终的存储粒度,Partition 还可以进一步细分为 Segment。换言之,一个 Partition 物理上由多个 Segment 组成,Segment 才是实际的存储粒度。消息队列搭建好后,通过命令创建一个 Topic,名为“mytopic_test”,Partition 的数量配置为 4。之后,可以在 Kafka 日志目录中看到新生成的文件夹,这只是文件夹而不是具体的文件,如下所示:

1
2
3
4
drwxr-xr-x 2 root root 4096 Oct 15 13:21 mytopic_test-0
drwxr-xr-x 2 root root 4096 Oct 15 13:21 mytopic_test-1
drwxr-xr-x 2 root root 4096 Oct 15 13:21 mytopic_test-2
drwxr-xr-x 2 root root 4096 Oct 15 13:21 mytopic_test-3

问题1:为什么不能以 Partition 作为存储单位?

任何一个 Topic 中的 Partition 数量受限于 Kafka Broker 的数量,不可能太多。如果以 Partition 为存储粒度,随着消息源源不断写入,数量有限的 Partition 将急剧扩张,会对消息文件的维护以及对已消费消息的清理工作带来严重影响。

Partition 的数量为什么受限于 Kafka Broker 的数量?为了保证可靠性,每个 Partition 都应有若干个副本(Replica),其中一个 Replica 为 Leader,其它都为 Follower。Leader 负责处理 Partition 的所有读写请求,Follower 则负责被动地复制 Leader 上的数据。不难理解,对于任意一个 Topic,有多少个 Partition 就有多少个 Leader,并且,这些 Leader 应尽量分散到不同的 Broker 上,否则,一旦某台 Broker(如果它部署有大量 Leader)故障下线,势必引起连锁反应:大量的 Partition 需要重选 Leader,而这期间是不可用的。

鉴于上述原因,有必要对 Partition 进一步细分,细分的单位就是 Segment。很容易理解,一个 Partition(文件夹)相当于一个巨大的文件,通过细分,被切分为多个大小相等的 Segment 文件,但由于消息大小不同,每个 Segment 中的消息数量通常是不同的。基于 Segment 机制,可以很方便地清理那些陈旧的消息(已被消费的消息),从而提高磁盘的利用率。关于 Segment,Kafka Server 有如下配置参数:

1
2
3
4
5
6
7
8
# 指定segment文件的大小,当超过这个值的时候,kafka会新起一个文件
log.segment.bytes=1073741824
# 默认消息的最大持久化时间,168小时,7天
log.retention.hours=168
# 每隔300000毫秒去检查上面配置的log失效时间,到目录查看是否有过期的消息,如果有则删除过期消息
log.retention.check.interval.ms=300000
# 是否启用log压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false

问题2:Segment 的工作原理是什么?

Partition 的细分单位为 Segment,接下来我们看下它的工作原理。打开一个 Partition 文件夹(目录),如下所示:

1
2
3
4
5
6
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

并没有发现“Segment”文件!事实上,Segment 文件由两部分组成,即 .index 文件和 .log 文件,分别为 Segment 索引文件和数据文件。观察上面的文件名,很容易理解它们的命规则,对于索引文件,Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件最后一条消息的偏移量(Offset)值,Offset 的数值由 20 位数字字符表示,没有数字的位置用 0 填充。对于数据文件,其命名与对应的索引文件保持一致即可。

为了便于读者理解,以上面的其中一“对” Segment 文件为例:00000000000000170410.index00000000000000170410.log,绘制其关系图,如下所示:

image-20240929130142152

.index 文件作为索引文件,存储的是元数据;.log 文件作为数据文件,存储的是消息。

如何通过索引访问具体的消息呢?事实上,索引文件中的元数据指向的是对应数据文件中消息的物理偏移地址,有了消息的物理地址,自然也就可以访问对应的消息了。

其中以 .index 索引文件中的元数据 [2, 365] 为例,在 .log 数据文件表示第 2 个消息,即在全局 Partition 中表示 170410+2=170412 个消息,该消息的物理偏移地址为 365。

问题3:如何从 Partition 中通过 Offset 查找 Message?

基于问题 2 中的数据和图形,如何读取 offset=170425 的 Message(消息)呢?关键在于通过 Offset 定位出消息的物理偏移地址。首先,列出各个 Segment 的索引文件及其偏移量范围,如下:

索引文件1:00000000000000000000.index,偏移量范围:[0,170410];
索引文件2:00000000000000170410.index,偏移量范围:[170410+1,239430];
索引文件3:00000000000000239430.index,偏移量范围:[239430+1,239430+1+X];

根据索引文件的偏移量范围,170410 < offset=170425 < 239430,因此,Offset=170425 对应的索引文件为 00000000000000170410.index。索引文件是“有序的”,通过二分查找(又称折半查找)便可快速定位具体文件位置。此后,根据 00000000000000170410.index 文件中的 [15,2369] 定位到数据文件 00000000000000170410.log 中的 2369 位置,这便是目标消息的位置,读取即可。

问题4:读取一条消息时,如何确定何时读完本条消息呢?

通过对问题 3 的解答,相信读者已经理解了如何根据 Offset 定位一条消息的物理偏移地址。但这个物理偏移地址实际上是一个起始地址,如何确定本条消息的结尾(终止地址)呢?

这个问题可通过消息在磁盘上的物理存储结构来解决(其中包含偏移量、消息体长度等可度量消息终止地址的数据),消息的存储结构如下(取自官网):

1
2
3
4
5
baseOffset: int64 				# 偏移量
batchLength: int32 				# 消息体长度
partitionLeaderEpoch: int32
......
records: [Record]

也就是磁盘起始位置前几个字节固定就表示了消息体的结构,从中解析出消息体的长度并最终读取即可。

磁盘存储更全面的解析,参考:

https://zhuanlan.zhihu.com/p/680960774

复制原理和同步方式

副本机制初探

提高可用性,冗余设计(也称副本机制)是常用的策略,Kafka 也不例外。在 Kafka 中,副本是以 Partition 为单位,换言之,每个 Partition 有 N 个副本(Replica),副本数量可在 Kafka 配置文件 server.properties 中设置:

1
2
# 复制因子,默认值为2,如果一个副本失效了,另一个还可以继续提供服务,从而保证可用性。
default.relication.factor=2  

基于多副本机制,Kafka 可实现故障自动转移(Fail Over)。副本根据角色的不同可分为以下三类。

  • Leader 副本:响应 Client 端读写请求的副本,Producer 和 Consumer 只跟 Leader 交互;
  • Follower 副本:被动地备份 Leader 副本中的数据,不能响应 Client 端读写请求。一旦 Leader 挂掉,会从它的 Follower 中选举出一个新的 Leader 继续提供服务;
  • ISR 副本:包含了 Leader 副本和所有与 Leader 副本保持同步的 Follower 副本。如何判定是否与 Leader 同步,后面详述。

为了便于读者更好的理解副本概念,我们看下面这个例子。

一个具有 4 个 Broker 的 Kafka 集群,TopicA 有 3 个 Partition,每个 Partition 有 3 个副本(Leader+Follower)。

image-20240929131821279

如果 Leader 所在的 Broker 发生故障或宕机,对应 Partition 将因无 Leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 Leader 将从 Follower 中被选举出来并继续处理客户端的请求。

如何确保新选举出的 Leader 是最优秀的?

一个 Partition 有多个副本(Replica),为了提高可靠性,这些副本分散在不同的 Broker 上。由于带宽、读写性能、网络延迟等因素,同一时刻,这些副本的状态通常是不一致的,即 Follower 与 Leader 的状态不一致。那么,如何保证新 Leader 是优选出来的呢?

Kafka 机制中,Leader 将负责维护和跟踪一个 ISR(In-Sync Replicas)列表,即同步副本队列,这个列表里面的副本与 Leader 保持同步,状态一致。如果新的 Leader 从 ISR 列表里的副本中选出,那么就可以保证新 Leader 是优秀的。当然,这不是唯一的策略,下文将继续解读。

同步副本ISR

同步副本队列 ISR(In-Sync Replicas),即 Leader 副本 + 所有与 Leader 副本保持同步的 Follower 副本。虽然副本可以提高可用性,但副本数量对 Kafka 的吞吐率有一定影响。默认情况下 Kafka 的 Replica 数量为 2(部分 Kafka 版本默认值为 1),即每个 Partition 都有一个 Leader,一个 Follower。所有的副本(Replica)统称为 Assigned Replicas(AR)。显然,ISR 是 AR 中的一个子集,由 Leader 维护 ISR 列表,Follower 从 Leader 那里同步数据会有一些延迟(由参数 replica.lag.time.max.ms 设置超时阈值),超过阈值的 Follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 Follower 也会先存放在 OSR 中,即有关系式 AR=ISR+OSR。

LEO & HW

前面提到 Kafka 中,Topic 的每个 Partition 可能有多个副本(Replica)用于实现冗余,从而实现高可用。每个副本又有两个重要的属性 LEO 和 HW。

在 Kafka 的存储机制中,Partition 可以细分为 Segment,而 Segment 是最终的存储粒度。不过,对于上层应用来说,仍然可以将 Partition 看作最小的存储单元,即 Partition 可以看作是由一系列的 Segment 组成的粒度更粗的存储单元,它由一系列有序的消息组成,这些消息被连续的追加到 Partition 中。

LEO、HW 以及 Offset 的关系图如下:

image-20240929132600621

  • LEO:即日志末端位移(Log End Offset),表示每个副本的 Log 最后一条 Message 的位置。比如 LEO=10、HW=7,则表示该副本保存了 10 条消息,而后面 3 条处于 Uncommitted 状态。
  • HW:即水位值(High Watermark)。对于同一个副本而言,其 HW 值不大于 LEO 值。小于等于 HW 值的所有消息都被认为是“已备份”的(Replicated),对于任何一个 Partition,取其对应的 ISR 中最小的 LEO 作为 HW,Consumer 最多只能消费到 HW 所在的位置。

此外,需要说明的是,每个 Replica (包含 Leader 和 Follower)都有 HW,它们各自负责更新自己的 HW 的状态。

注:HW 并不是 Kafka 特有的概念,HW 通常被用在流式处理领域(比如 Apache Flink、Apache Spark 等),以表征元素或事件在时间层面上的进度。在 Kafka 中,对于 Leader 新写入的消息,Leader 会等待该消息被 ISR 中所有的 Replica 同步后再更新 HW,之后该消息才能被 Consumer 消费。这种机制有一个好处,确保 HW 及其之前的消息(Committed 状态)都是已备份的,即便 Leader 所在的 Broker 因故障下线,那么 Committed 状态的消息仍然可以从新选举出的 Leader 中获取。

下面我们举例说明。

某个 Partition 的 ISR 列表包括 3 个副本(1 个 Leader+2 个 Follower),当 Producer 向其 Leader 写入一条消息后,HW 和 LEO 有如下变化过程:

image-20240929132751591

由上图可以看出,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。同步复制要求所有能工作的 Follower 都复制完,这条消息才会被置为 Committed 状态,该复制方式受限于复制最慢的 Follower,会极大地影响吞吐率,因而极少应用于生产环境。而异步复制方式下,Follower 异步地从 Leader 复制数据,数据只要被 Leader 写入 Log 就被认为已经 Committed(类似 Redis,主从异步复制)。如果在 Follower 尚未复制完成的情况下,Leader 宕机,则必然导致数据丢失,很多时候,这是不可接受的。

相较于完全同步复制和异步复制,Kafka 使用 ISR 的策略则是一种较“中庸”的策略,在可靠性和吞吐率方面取得了较好的平衡。某种意义上,ISR 策略与 Raft 共识算法所采用的“多数派原则”类似,不过 ISR 更为灵活。

Kafka 消息生产的可靠性

就 Kafka 而言,可靠性贯穿消息的生产、发送、存储及消费等过程,在客户端,可以通过 request.required.acks 参数来设置数据可靠性的级别。

(1)request.required.acks = 1

这是默认情况,即消息的强制备份数量为 1,Producer 发送数据到 Leader,只要 Leader 成功写入本地日志,即成功返回客户端,不要求 ISR 中的其它副本与 Leader 保持同步。对于 Producer 发来的消息,如果 Leader 刚写入并成功返回后便宕机,此次发送的消息就会丢失。

(2)request.required.acks = 0

即消息的强制备份数量为 0,Producer 不停向 Leader 发送数据,而不需要 Leader 反馈成功消息,这种情况下数据传输效率最高,与此同时,可靠性也是最低的。可能在发送过程中丢失数据,可能在 Leader 宕机时丢失数据。

(3)request.required.acks = -1(all)

即消息的强制备份数量为 ISR 列表中副本的数量,Producer 发送数据给 Leader,Leader 收到数据后要等到 ISR 列表中的所有副本都完成数据同步后(强一致性),才向生产者返回成功消息。如果一直收不到成功消息,则认为发送数据失败会自动重发数据。这是可靠性最高的方案,当然,性能也会受到一定影响。

单纯设置 request.required.acks 保障不了消息生产的可靠性

回顾之前讲得,由于 Follower 从 Leader 同步数据有一些延迟(由参数 replica.lag.time.max.ms 设置超时阈值),超过阈值的 Follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,因此,ISR 列表实际上是动态变化的。

由于 ISR 列表是动态变化的,如果 ISR 中的副本因网络延迟等原因被踢出,只剩下 Leader,即便设置参数 request.required.acks=-1 也无法保证可靠性。鉴于此,需要对 ISR 列表的最小副本数加以约束,即 ISR 列表中的副本数不得小于一个阈值。Kafka 提供了这样一个参数:min.insync.replicas,该参数用于设定 ISR 中的最小副本数,默认值为 1,当且仅当 request.required.acks 参数设置为 -1 时,此参数才生效。当 ISR 中的副本数少于 min.insync.replicas 配置的数量时,客户端会返回如下异常:

1
2
org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected
since there are fewer in-sync replicas than required。

不难理解,如果 min.insync.replicas 设置为 2,当 ISR 中实际副本数为 1 时(只有 Leader),将无法保证可靠性,此时拒绝客户端的写请求以防止消息丢失。

Leader 选举机制解读

关于 Kafka 的可靠性,前面介绍了消息生产过程中的可靠性保障机制:设置参数 request.required.acks=-1 时,对于任意一条消息,只有在它被对应 Partition 的 ISR 中的所有副本都复制完毕后,才会被认为已提交,并返回信息给 Producer。进一步,通过参数 min.insync.replicas 协同,可以提高消息生产过程中的可靠性。

问题1:如何在保证可靠性的前提下避免吞吐量下降?

根据官网的解释,如果 Leader 永远不会宕机,我们根本不需要 Follower,当然,这是不可能的。考虑这样一个问题:当 Leader 宕机了,怎样从 Follower 中选举出新的 Leader?由于各种原因,Follower 可能滞后很多或者直接崩溃掉,因此,我们必须确保选择 “最新(Up-to-Date)” 的 Follower 作为新的 Leader。

对于日志复制算法而言,必须提供这样一个基本的保证:如果 Leader 挂掉,新晋 Leader 必须拥有原来的 Leader 已经 Commit(即响应客户端 Push 消息成功)的所有消息。显然,ISR 中的副本都具有该特征。

但存在一个问题,ISR 列表维持多大的规模合适呢?换言之,Leader 在一条消息被 Commit 前需要等待多少个 Follower 确认呢?等待 Follower 的数量越多,与 Leader 保持同步的 Follower 就越多,可靠性就越高,但这也会造成吞吐率的下降。

少数服从多数”是最常用的选举原则,它也称为“多数派原则”,Raft 算法就采用了这种选举原则。不过,Kafka 并不是采用这种方式

基于该原则,如果有 2f+1 个副本,那么,在一条消息被 Commit 前必须保证有 f+1 个 Replica 复制完消息;同时,失败的副本数不能超过 f 个,否则将丧失选举基础。这种方式有个很出色的特点:延迟只取决于最快的服务器。也就是说,如果复制因子是 3,则延迟是由更快的 Follower 而不是较慢的 Follower 决定的。

当然,“少数服从多数”的选举策略也有一些不足,为了保证 Leader 选举正常进行,它所能容忍的失败 Follower 数比较少:如果要容忍 m 个 Follower 故障,至少要 2*m+1 个副本。换言之,在生产环境中为了保证较高的容错率,必须维持大量的副本,而过多的副本又会反过来制约性能,鉴于此,这种算法很少在需要保证高吞吐量的系统中使用。

问题2:Kafka 选举 Leader 的策略是怎样的?

Leader 选举有很多实用的算法,比如 ZooKeeper 的 Zab、Raft 以及 Viewstamped Replication。就目前而言,与 Kafka 所使用的 Leader 选举算法相似度最高的是微软的 PacificA 算法。

Kafka 通过 ZooKeeper 为每一个 Partition 动态维护了一个 ISR 列表,通过前面的学习,我们知道 ISR 里的所有 Replica 都与 Leader 保持同步,严格得讲,为了保证可靠性,只有 ISR 里的成员才能有被选为 Leader 的可能(通过参数配置:unclean.leader.election.enable=false)。基于该策略,如果有 f+1 个副本,一个 Kafka Topic 能在保证不丢失已经 Commit 消息的前提下容忍 f 个副本的失败,也就是说只有1个副本完全同步了Leader就足够了,在大多数使用场景下,这种折衷策略是合理的。

实际上,对于任意一条消息,只有它被 ISR 中的所有 Follower 都从 Leader 复制过去才会被认为 Committed,并返回信息给 Producer,从而保证可靠性。但与“少数服从多数”策略不同的是,Kafka ISR 列表中副本的数量不需要超过副本总数的一半,即不需要满足“多数派”原则,通常,ISR 列表副本数大于等于 2 即可,如此,便在可靠性和吞吐量方面取得平衡。

极端情况下的 Leader 选举策略

前已述及,当 ISR 中至少有一个 Follower 时(ISR 包括 Leader),Kafka 可以确保已经 Commit 的消息不丢失,但如果某一个 Partition 的所有 Replica 都挂了,所谓的保证自然也就不存在了。这种情况下如何进行 Leader 选举呢?通常有两种方案。

  1. 等待 ISR 中任意一个 Replica 恢复过来,并且选它作为 Leader(希望它仍然有它所有的数据);
  2. 选择第一个恢复过来的 Replica(不一定非在 ISR 中)作为 Leader。

如何选择呢?这就需要在可用性和一致性当中作出抉择。如果坚持等待 ISR 中的 Replica 恢复过来,不可用的时间可能相对较长。如果 ISR 中所有的 Replica 都无法恢复,或者数据丢失了,这个 Partition 将永远不可用。

如果选择方案 2,有可能存在这样的情况:首先恢复过来的 Replica 不是 ISR 中的 Replica,那么,它可能并不具备所有已经 Commit 的消息,从而造成消息丢失。默认情况下(自 0.11.0.0 版本起),Kafka 采用第一种策略,即 unclean.leader.election.enable=false,以保证一致性。unclean.leader.election.enable 这个参数对于 Leader 选举、系统可用性、一致性以及数据可靠性都有重要影响,在生产环境中应慎重权衡。

集群节点变更再平衡

要剖析协调者端处理再平衡的全流程,我们就需要分几个场景来讨论。这几个场景分别是新成员加入组、组成员主动离组、组成员崩溃离组、组成员提交位移

集群节点扩容

Kafka集群扩容通常涉及以下步骤:

  1. 增加新的broker节点到集群。
  2. 修改配置文件(如server.properties),确保新的broker有唯一的broker.id。
  3. 启动新的broker。
  4. 使用kafka-topics.sh工具修改分区数增加分区,这会分散数据到新的broker。
  5. 重新平衡broker间的负载,通过kafka-reassign-partitions.sh工具。

3. Kafka 架构中 ZooKeeper 以怎样的形式存在

ZooKeeper 是一个分布式、开放源码的分布式应用程序协调服务,是 Google 的 Chubby 开源实现。分布式应用程序可以基于它实现统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等工作。

在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有 Broker 注册、Topic 注册、Producer 和 Consumer 负载均衡、维护 Partition 与 Consumer 的关系、记录消息消费的进度以及 Consumer 注册等。

Broker 在 ZooKeeper 中的注册

为了便于大家理解,我首先解释下“注册”一词。ZooKeeper 是一个共享配置中心,我们可以将一些信息存放入其中,比如 Broker 信息,本质上就是存放一个文件目录。这个配置中心是共享的,分布式系统的各个节点都可以从配置中心访问到相关信息。同时,ZooKeeper 还具有 Watch 机制(参考Raft算法),一旦注册信息发生变化,比如某个注册的 Broker 下线,ZooKeeper 就会删除与之相关的注册信息,其它节点可以通过 Watch 机制监听到这一变化,进而做出响应。

其实,ZooKeeper 与 Etcd 有很多共性,如果读者已经阅读过 Etcd 相关内容,将很容易理解 ZooKeeper 相关内容。

言归正传,Broker注册,也就是 Kafka 节点注册,本质上就是在 ZooKeeper 中创建一个专属的目录(又称为节点),其路径为 /brokers。某一个具体的Broker路径为 /brokers/ids/{broker.id}。在专属节点创建好后,Kafka 会将该 Broker 相关的信息存入其中,包括 broker.name 、端口号。

需要特别说明的是,Broker 在 ZooKeeper 中注册的节点是“临时节点”,一旦 Broker 故障下线,ZooKeeper 就会将该节点删除。同时,可以基于 Watch 机制监听到这一删除事件,进而做出响应(如负载均衡)。

Topic 在 ZooKeeper 中的注册

在 Kafka 中,所有 Topic 与 Broker 的对应关系都由 ZooKeeper 来维护,在 ZooKeeper 中,通过建立专属的节点来存储这些信息,其路径为 /brokers/topics/{topic_name}

每个 Topic 的 Partition 实际上是存在备份的,并且备份的数量由 Kafka 机制中的 Replicas 来控制。那么问题来了,假设某个 TopicA 被分为 2 个 Partition,并且存在两个备份,由于这 2 个 Partition(1-2)被分布在不同的 Broker 上,同一个 Partiton 与其备份不能(也不应该)存储于同一个 Broker 上。以 Partition1 为例,假设它被存储于 Broker2,其对应的备份分别存储于 Broker1 和 Broker4,有了备份,可靠性得到保障,但数据一致性却是个问题。

为了保障数据的一致性,ZooKeeper 机制得以引入。基于 ZooKeeper,Kafka 为每一个 Partition 找一个节点作为 Leader,其余备份作为 Follower;就 TopicA 的 Partition1 而言,如果位于 Broker2(Kafka 节点)上的 Partition1 为 Leader,那么位于 Broker1 和 Broker4 上面的 Partition1 就充当 Follower,如下图:

image-20240929140825112

基于上图的架构,当 Producer Push 的消息写入 Partition(分区)时,作为 Leader 的 Broker(Kafka 节点)会将消息写入自己的分区,同时还会将此消息复制到各个 Follower,实现同步。如果某个 Follower 挂掉,Leader 会再找一个替代并同步消息;如果 Leader 挂了,Follower 们会选举出一个新的 Leader 替代,继续业务,这些都是由 ZooKeeper 完成的。

Consumer 在 ZooKeeper 中的注册

Consumer Group 注册

与 Broker、Topic 注册类似,Consumer Group 注册本质上也是在 ZooKeeper 中创建专属的节点,以记录相关信息,其路径为 /consumers/{group_id}

这里补充一点,在 ZooKeeper 中,/consumers/{group_id} 虽然被称为节点,但本质上是一个目录。既然是目录,在记录信息时,就可以根据信息的不同,进一步创建子目录(子节点),分别记录不同类别的信息。对于 Consumer Group 而言,有三类信息需要记录,因此,/consumers/{group_id} 下还有三个子目录,如下所示。

  • ids:Consumer Group 中有多个 Consumer,ids 用于记录这些 Consumer;
  • owners:记录该 Consumer Group 可消费的 Topic 信息;
  • offsets:记录 owners 中每个 Topic 的所有 Partition 的 Offset。

Consumer 注册

原理同 Consumer Group 注册,不过需要注意的是,其节点路径比较特殊,需在路径 /consumers/{group_id}/ids 下创建专属子节点,它是临时的。比如,某 Consumer 的临时节点路径为 / consumers/{group_id}/ids/my_consumer_for_test-1223234-fdfv1233df23

负载均衡

对于一条消息,订阅了它的 Consumer Group 中只能有一个 Consumer 消费它。那么就存在一个问题:一个 Consumer Group 中有多个 Consumer,如何让它们尽可能均匀地消费订阅的消息呢(也就是负载均衡)?这里不讨论实现细节,但要实现负载均衡,实时获取 Consumer 的数量显然是必要的,通过 Watch 机制监听 /consumers/{group_id}/ids 下子节点的事件便可实现。

Producers 负载均衡

为了负载均衡和避免连锁反应,Kafka 中,同一个 Topic 的 Partition 会尽量分散到不同的 Broker 上。而 Producers 则根据指定的 Topic 将消息 Push 到相应的 Partition,如何将消息均衡地 Push 到各个 Partition 呢?这便是 Producers 负载均衡的问题。

Producers 启动后同样也要进行注册(依然是创建一个专属的临时节点),为了负载均衡,Producers 会通过 Watcher 机制监听 Brokers 注册节点的变化。一旦 Brokers 发生变化,如增加、减少,Producers 可以收到通知并更新自己记录的 Broker 列表 。此外,基于ZooKeeper 提供的 Watcher 机制,还可以监听其它在 ZooKeeper 上注册的节点,如 Topic、Consumer 等。

Producer 向 Kafka 集群 Push 消息的时候,必须指定 Topic,但Partition 却是非必要的。事实上,目前高级的客户端已经不提供指定 Partition 的接口。虽然不提供,但并不代表无须指定 Partition,只是隐藏了细节。通常有两种方式用于指定 Partition。

  • 低级接口。在指定 Topic 的同时,需指定 Partition 编号(0、1、2……N),消息数据将被 Push 到指定的 Partition 中。从负载均衡的角度看,这并不是一种友好的方式。
  • 高级接口。不支持指定 Partition,隐藏相关细节,内部则采用轮询、对传入 Key 进行 Hash 等策略将消息数据均衡地发送到各个 Partition。此外,有一些 Kafka 客户端还支持自定义负载均衡策略。

Consumer 负载均衡

基于 Producer 的负载均衡策略,对于任意一个 Topic,其各个 Partition 中消息量相对均衡。进一步,对于 Topic 的任意一条消息,订阅了它的任何一个 Consumer Group 中都只能有一个 Consumer 消费它,在此约束下,如何实现 Consumer 均衡地消费消息呢?

一种最朴实的想法是,对于订阅的 Topic,既然 Partition 中的消息是均衡的,那么,可以为 Consumer Group 中的各个 Consumer 分别指定不同的 Partition,只要保证该过程“相对公平”即可。不过,需要注意的是,Consume Group 中 Consumer 的数量是动态变化的,Topic 的 Partition 数量也不是固定值,如何“均匀”分配呢?

借助 ZooKeeper 实现负载均衡

在 Consumer 消费消息时,高级别 API 只需指定 Topic 即可,隐藏了负载均衡策略;而低级别的 API 通常需要同时指定 Topic 和 Partition,需要自行实现负载均衡策略。高级别 API 的负载均衡策略需借助 ZooKeeper 实现,具体原理如下。

Consumer Group、Consumer、Broker 都会在 ZooKeeper 中注册节点,因此基于 ZooKeeper 提供的 Watcher,Consumer 可以监听同一 Group 中 Consumers 的变化,以及 Broker 列表的变化。进一步,根据 Consumer 列表,将 Partition 排序后,轮流进行分配。由于这是一个动态过程,相应的负载均衡被称为 Rebalance,其描述如下:

  1. 对任意一个 Topic 中所有的 Partirtion 进行排序,用数组 PT 记录;
  2. 某一 Consumer Group 订阅了上述 Topic,对它的所有 Consumer 排序,用数组 CG 记录,第 i 个 Consumer 记为 CG[i]
  3. 比例系数为 F=size(PT)/size(CG),向上取整;
  4. 解除 CG[i] 对原来分配的 Partition 的消费权(i 从 0 开始,最大值为 size(CG)-1);
  5. 将第 i*F(i+1)*F-1 个 Partition 分配给 CG[i]。 

集群负载均衡再平衡

触发再平衡的条件

  • 消费者组内成员发生变更,这个变更包含消费者的增加与减少。这里的减少有很大可能是被动的,就是某个消费者出现崩溃掉线了
  • 主题分区发生改变,kafka 目前只支持分区增加,当出现分区数增加的时候就会触发再平衡
  • 订阅的主题发生变化,当消费者使用正则表达式订阅主题时,而恰好增加的主题就是订阅的主题,就会触发再平衡机制

为什么说 kafka 再平衡为人诟病呢?因为再平衡的过程中,消费者无法从 kafka 消费消息,这对 kafka 的 TPS 影响极大,而如果 Kafka 集群内节点较多,那么再平衡可能耗时极多。数分钟到数小时都有可能,而这段时间,Kafka 是处于不可用状态。所以在实际环境中,应该尽量避免。

再平衡过程如何通知到其他消费者

答案就是:依靠消费者端的心跳线程(Heartbeat Thread)

Kafka 消费者需要定期地发送心跳请求(Heartbeat Request) 到 Broker 端的协调者,以证明消费者还活着。再平衡就是通过这个心跳线程去通知其他消费者触发再平衡机制。

主动再平衡

在进行主动再均衡期间,所有消费者都会停止读取消息,放弃分区所有权,重新加入消费组,并获得重新分配到的分区。这样会导致整个消费组在一个很短的时间窗口内不可用。这个时间窗口的长短取决于消费组的大小和几个配置参数。两个阶段:

  1. 所有消费者都放弃分区所有权。
  2. 消费者重新加入群组,获得重新分配到的分区,并继续读取消息。

协作再均衡

协作再均衡(也称为增量再均衡)是指将一个消费者的部分分区重新分配给另一个消费者,其他消费者则继续读取没有被重新分配的分区。这种再均衡包含两个或多个阶段:

  1. 消费组leader会通知所有消费者,它们将失去部分分区的所有权,然后消费者会停止读取这些分区,并放弃对它们的所有权。
  2. 消费组leader会将这些没有所有权的分区分配给其他消费者。虽然这种增量再均衡可能需要进行几次迭代,直到达到稳定状态,但它避免了主动再均衡中出现的“Stop The World”。

记录消费进度 Offset

我们曾提及 Offset,它是 /consumers/[group_id] 下的一个子节点。Kafka 中,Consumer 采用 Pull 模式消费相应 Partition 中的消息,是一种异步消费模式。为了避免因 Consumer 故障、重启、Rebalance 等原因造成重复消费、遗漏消费消息,需要记录 Consumer 对 Partition 中消息的消费进度,即偏移量 Offset。Offset 在 ZooKeeper 中,有一个专属的节点(目录)用于记录 Offset,其路径样式如下:

1
2
# 节点内容就是Offset的值。
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

Kafka 的最新版本 Kafka 2.0 中,Offset 信息不再记录于 ZooKeeper,而是保存于 Kafka 的 Topic 中,路径如下:

1
__consumer_offsets(/brokers/topics/__consumer_offsets)

记录 Partition 与 Consumer 的关系

Consumer Group 在 ZooKeeper 上的注册节点为 /consumers/[group_id],而 Consumer Group 中的 Consumer 在 ZooKeeper 上的注册节点为 /consumers/[group_id] 下的子节点 owners,它们共享一个 Group ID。为了 Consumer 负载均衡,同一个 Group 订阅的 Topic 下的任一 Partition 都只能分配给一个 Consumer。Partition 与 Consumer 的对应关系也需要在 ZooKeeper 中记录,路径为:

1
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

补充:这个路径也是一个临时节点,进行 Rebalance 时会被删除,而后依据新的对应关系重建。此外,[broker_id-partition_id] 是一个消息分区的标识,其内容就是该消息分区消费者的 Consumer ID,通常采用 hostname:UUID 形式表示。

4. 全程解析(Producer-kafka-Consumer)

Producer 发布消息

Producer 采用 Push 模式将消息发布到 Kafka Broker,根据负载均衡算法(如轮询、Hash 等),这些消息将均衡写入到相应 Topic 对应的各个 Partition 中。在存储层面,采用顺序写磁盘(即 Append)模式写入。详细流程如下:

  1. Producer Push 消息,基于负载均衡算法获得目标 Partition 后,Producer 先从 ZooKeeper 的 /brokers/.../state 节点找到该 Partition 的 Leader;
  2. Producer 将消息发送给该 Leader;
  3. Leader 将消息写入本地 Log;
  4. 所有 Follower 主动从 Leader Pull 消息,写入本地 Log 后向 Leader 发送 ACK;
  5. Leader 收到所有 ISR 中所有 Replica 的 ACK 后,更新 HW(High Watermark,最后 Commit 的 Offset),并向 Producer 发送 ACK;
  6. Producer 接到 ACK,确认发送成功。

Broker 存储消息

Topic 是逻辑概念,而 Topic 对应的 Partition 则是物理概念,每个 Partition 在存储层面都对应一个文件夹(目录)。由于 Partition 并不是最终的存储粒度,该文件夹下还有多个 Segment(消息索引和数据文件,它们是真正的存储文件)。

Consumer 消费消息

前面介绍过,目前采用的高级 API,Consumer 在消费消息时,只需指定 Topic 即可,API 内部实现负载均衡,并将 Offset 记录到 ZooKeeper 上。

值得一提的是,Consumer 采用 Pull 模式从 Broker 中读取数据,这是一种异步消费模式,与 Producer 采用的 Push 模式全然不同。Push 模式追求速度,越快越好,当然它取决于 Broker 的性能,而 Pull 模式则是追求自适应能力,Consumer 根据自己的消费能力消费。

参考阅读

Kafka 官方文档

https://www.zhihu.com/question/43557507/answer/2914586926

kafka问答好文章:https://my.oschina.net/dataclub/blog/4712235

(完)