Kafka使用指南3——Kafka集群架构设计
目录
Kafka简介和安装以及原生命令行客户端
这块主要是一个Kafka所包含的内部元素的介绍以及简单的使用。
Java客户端以及Spring集成
Java客户端其实就是原生命令行的一种封装,学习如何使用即可,Spring就是在这个基础上再进行了对象的管理,使用起来并不难。
Kafka集群架构设计(本篇)
Kafka在设计之初就是为了高吞吐、高性能、高可扩展,所以它的集群架构是非常值得学习的。
Kafka日志索引详解
Broker能够高效地处理和保存消息,是Kafka高性能的保障。我们从可见的log文件入手,去研究一下Kafka是如何保证消息高效的流转。
主要内容
这一部分主要是理解Kafka的服务端重要原理。但是Kafka为了保证高吞吐,高性能,高可扩展的三高架构,很多具体设计都是相当复杂的。我们会从数据存储入手一起探讨Kafka的集群设计。
Kafka在Zookeeper中存储的数据
Kafka将状态信息保存在Zookeeper中,这些状态信息保存了每个Broker服务的状态和信息。这些信息需要在每个Broker之间达成共识,所以统一保存在独立于所有Broker的中间件中。
这些共识数据需要保证强一致性,这样才能保证各Broker之间分工明确。基于CP的Zookeeper就成为了Kafka的首要选择,目前Kafka为了实现云原生推出的Kraft还有一段实践的要走。另外Zookeeper的watcher机制也可以很好的减少Broker读取Zookeeper的次数。
我们会回忆一下在第一篇文章中介绍的Zookeeper的架构,Kafka集群中最为主要的状态信息有两个,一个是多个Broker需要有一个Controller,一个是多个Partition需要有一个Leader。
- 选举一个Broker作为Controller。由Controller节点来管理整个集群中的分区和副本状态。
- 一个Topic下选举一个Partition作为Leader。由Leader角色的Partition来负责和客户端交互。
我们使用PreetyZoo工具查看Zookeeper中的数据如下,其中带▼
的说明有子节点:
1 | ▼admin |
对于Kafka保存在Zookeeper中的数据,大部分是比较明了的。比如/brokers
目录下,/ids
目录就保存了对应集群中broker的brokerId,/topics
目录就保存了各个topic及topic下partition的信息。
我们做一个简单的验证,在每启动一个新节点的时候查看/ids
目录,当启动完brokerId分别为0和1的两个节点之后,我们查看/ids节点为[0, 1]
,此时手动往/ids
目录下加上2
这个节点,会导致brokerId为3无法启动,启动日志中提示NodeExists
。
Broker的Controller选举
在Kafka集群开始工作之前,需要从Broker中选举出一个Controller来充当管理角色,负责管理集群中分区和副本状态。
选举是通过占用Zookeeper的controller
节点实现的。
当一个集群内的Kafka节点启动时,就会尝试往Zookeeper上创建一个/controller
临时节点,并将自己的brokerId写入这个节点,我们可以查看目前集群下的/controller
节点数据看一下:
1 | { |
其中brokerid
标识当前的controller为id为0的broker,timestamp为时间戳。version是为Kraft而修改的值,之前为1,kraftControllerEpoch也是为Kraft新增的值,这两项修改感兴趣可以自行查看KAFKA-14304 Add RPC changes, records, and config from KIP-866,和Kraft相关的这边就先不详细研究了。
Zookeeper会保证一个集群中只有一个broker能够成功创建这个节点。这个注册成功的broker就成了这个集群中的Controller节点。
当一个应用在Zookeeper上创建了一个临时节点之后,Zookeeper需要这个应用一直保持连接状态。如果Zookeeper长时间检测不到应用的心跳信息,就会删除节点。同时Zookeeper还允许应用监听节点的状态,当应用状态有变化时,会向该节点对应的所有监听器广播节点变化信息。
这样,如果集群中的Controller节点宕机了,Zookeeper就会删除/controller
节点,而其他未注册成功的Broker节点就会感知到这一事件并开始竞争,直到一个新的broker成功创建/controller
节点。这就是Kafka基于Zookeeper选举Controller的机制。
选举产生的Controller节点会负责监听Zookeeper中一些关键数据的变化,触发集群的相关管理工作。
- 监听
/brokers/ids
节点,感知Broker的增减变化。 - 监听
/brokers/topics
节点,感知Topic以及对应partition的增减变化。 - 监听
/admin/delete_topic
节点,处理删除Topic事件。
另外Controller还负责把元数据推送给其他Broker。
Partition的Leader选举
在Kafka中,一个Topic下的所有消息,是分开存储在不同的Partition中的。在使用kafka-topics.sh
脚本创建Topic时,可以通过--partitions
参数指定Topic下包含多少个Partition,还可以通过--replication-factors
参数指定每个Partition有几个备份。而在一个Partition的众多备份中,需要选举出一个Leader Partition,负责对接所有的客户端请求,并将消息优先保存,然后再通知其他Follower Partition来同步消息。
在理解Partition的Leader选举之前,我们回顾一下第一篇文章中的一些基本概念:
- AR:Assigned Replicas。表示Kafka分区中所有的副本(存活或者不存活的)。
- ISR:In Sync Replicas。表示在所有AR中,服务正常,保持与Leader同步的副本集合。如果Follower长时间没有向Leader发送通信请求(超时时间由
replica.lag.time.max.ms
决定,默认30s),那么这个Follower就会被提出ISR。
可以通过kafka-topics.sh
的--describe
指令查看AR和ISR。
1 | [root@localhost ~]# kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-multi-1 |
这个结果中,AR就是Replicas的集合。这些数据的来源都是Zookeeper,我们在Zookeeper中再来看一下这些数据。
1 | ▼topics |
在对应的/topic
下找到对应的topic,我们查看/topic
节点数据如下:
1 | { |
可查看对应的partition为0信息,我们查看/state
节点数据如下:
1 | { |
在进行Partition的选举的时候,Kafka会从该Topic的AR中选择排名靠前的优先选举,也就是我们看到/topic
节点下"partition":"0"
的列表中靠前的优先选举。如上面的实例,当Leader节点(brokerId=2)宕机时,brokerId=1的节点就会被选举成Leader。
我们从Kafka的--describe
指令来看一下选举的过程,初始状态如下:
1 | [root@localhost ~]# kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-multi-1 |
如果此时我们关闭brokerId为2的机器上的kafka服务,再执行命令查看状态:
1 | [root@localhost ~]# kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-multi-1 |
可以看到AR中并没有删除id为2的broker,而Isr的列表中id为2的记录已经没有了,同时Leader也从id为2的broker变为id为1的broker。
此时再去查看zookeeper中/state
节点的数据,其中Isr列表可以看到变化:
1 | { |
那么如果这时候再把brokerId为2的机器上的kafka服务启动呢,很明显能想到的就是brokerId=2重新加入了Isr的列表,但是Leader节点会有变化吗?
1 | # 重新启动broker.id为2机器上的服务后执行 --describe命令 |
我们可以看到Leader还是brokerId为1的机器,并没有因为brokerId为1的机器重新上线而把Leader交还给2,这样的好处是显而易见的,只有在某一个Broker宕机(网络断开)时进行一次选举,而加入时不用重新选举。同时这也会带来一定的风险,如果经过某一次宕机,大量Partition的Leader都被切换到某一个broker上,那么会导致单个broker的压力明显大于其他broker,这时候就需要用到Partition的自动平衡机制了。
Partition的Leader自动平衡
在一组Partition中,Leader Partition通常是比较繁忙的节点,因为一个Leader节点既要负责和客户端通信,又要负责向Follower同步数据。一般情况下,Kafka会尽量均匀地把Leader分配的不同的Broker上,用以保证整个集群的压力均衡。
但是经过了Leader选举之后这种平衡就有可能被打破,如同上一小节最后的示例,两个分区的Leader节点在重新选举后变成了同一个Broker,并且在故障节点重新加入之后依然是未发生故障的节点作为两个分区的Leader节点。当一个集群中Leader Partition过多的集中在同一个Broker节点上时,这个Broker的压力就会明显高于其他Broker,从而影响到集群的整体性能。
为此,Kafka设计了Leader Partition的自动平衡机制,当发现Leader分配不均衡时,自动进行Leader Partition的调整。
在Kafka官方文档的Balancing leadership中,详细描述了Kafka进行Leader Partition自动平衡的机制:
- Kafka默认从AR列表中取出第一个作为Leader节点。
- Controller会定期检测集群的Partition平衡情况,如果发现某个Broker中Partition里作为Leader节点比例大于某个阈值(
leader.imbalance.per.broker.percentage
),那么就会触发一次自动平衡。自动平衡时判断是否处在Isr列表中,并根据AR列表的顺序优先确定为Leader节点。
Kafka的自动平衡涉及到了一下几个参数的配置:
1 | #1 自平衡开关。默认true |
这几个参数可以到broker的server.properties文件中修改。但是注意要修改集群中所有broker的文件,并且要重启Kafka服务才能生效。
另外,也可以通过手动调用kafka-leader-election.sh
脚本,触发一次自平衡。效果如下:
1 | # Leader平衡之前查看状态 |
要注意的是,Leader Partition自平衡过程是一个比较重的操作,因为要涉及大量的消息转移和同步,并且在这个过程中可能会丢失消息。所以在对性能要求比较高的生产环境中,我们会把自动平衡设置为false
,在业务比较少的时候由维护人员手动执行自平衡,尽量减少对业务的影响。
Partition故障恢复机制
为什么Leader Partition自平衡过程会导致消息丢失呢。这我们就要从一个Leader所在的Broker发生宕机,一直到对应Partition选举出新的Leader过程中去细细分析了。
从初始状态开始,一组Partition选举出一个Leader节点,这个Leader节点负责和客户端通信以及同步数据给其他Follower。当Leader节点所在的Broker宕机时,Kafka就会触发重新选举。
在介绍此时的数据处理过程前,我们先来了解一下Kafka在多个副本中同步消息的机制。
- LEO(Log End Offset):在一个Partition的每个Replica中,都会保存自己的消息偏移量。Leader接收到消息并记录了生产者发布的一条消息之后就将LEO加1。接下来Follower每从Leader节点同步一条消息会把自己的LEO加1。
- HW(High Watermark):一组Partition中最小的LEO。Follower每从Leader节点同步一条消息时,会把自己的LEO给Leader,这样Leader根据Follower的LEO计算出HW的值,而最终HW又会同步给Follower。对于Leader来说,HW之前的消息都是被副本同步过的,是安全的消息。而从HW到LEO的消息都是不安全的,并且这些消息有可能被部分Follower同步,此时就可能出现消息数据的不一致。
我们可以从下面表格中的结构直观的看一下。该Partition有3个副本,其中BrokerId为0的节点为Leader,此时Leader节点上接收到了最新的消息,共8条,LEO=8。BrokerId为1上的副本只同步了4条消息,LEO=4。BrokerId为2上的副本同步了6条消息,LEO=6。此时HW为4。
Replicas | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | |
---|---|---|---|---|---|---|---|---|---|
Broker_0(Leader) | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | LEO=8 |
Broker_1 | 0 | 1 | 2 | 3 | LEO=4 | ||||
Broker_2 | 0 | 1 | 2 | 3 | 4 | 5 | LEO=6 |
在这种情况下,只有LEO=4之前的消息可以被消费者感知,而后面的消息虽然写入了Leader,但是消费者是消费不到的。
注:这个无法感知和生产者的ack参数是不一样的。
当服务出现故障时,如果是Follower发生故障,这不会影响消息写入,只不过是少了一个备份而已。处理相对简单一点。Kafka会做如下处理:
- 将故障的Follower节点临时提出ISR集合。而其他Leader和Follower继续正常接收消息。
- 出现故障的Follower节点恢复后,不会立即加入ISR集合。该Follower节点会读取本地记录的上一次的HW,将自己的日志中高于HW的部分信息全部删除掉,然后从HW开始,向Leader进行消息同步。
- 等到该Follower的LEO大于等于整个Partiton的HW后,就重新加入到ISR集合中。这也就是说这个Follower的消息进度追上了Leader。
如果是Leader发生故障则会比较复杂:
- Leader节点发生故障后进行重新选举,将原本是Follower的节点升级为Leader。这时消息可能还没同步完,所以新的Leader的LEO会低于原Leader的LEO。
- 整个Partition都以Leader节点为准,所以其他Follower将高于新Leader的LEO的消息都清除,然后从新Leader同步数据。
- 旧的Leader恢复之后,作为Follower节点加入集群并从Leader同步消息。
我们拿上面的示例举例:
如果此时Broker_0发生故障,根据ISR顺序[0, 1, 2]
Broker_1被选举为Leader,此时新Leader的LEO为4,所以Broker_2作为Follower需要根据新的Leader的LEO删除对应的标识为4和5的消息。而后新的标识为8的消息就会由Broker_1接收并同步给Follower。这样一来,编号为4,5,6,7的4条消息就丢失了。
所以在整个崩溃恢复过程中,Kafka优先保证了副本间数据的一致性,而舍弃了安全性。因此在数据比较重要的金融领域,会使用更能保证安全性的RocketMQ,而Kafka更适合用于极端高性能而可以忍受部分数据丢失的场景。
HW一致性保证
上一小节中可以看到LEO是分布式的数据,每个Broker保存自己的数据,那么HW在各个Broker上是如何保持一致的呢?
这里直接给出结论,HW在一个Partition中并不是总是一致的。
Leader要计算出HW值,需要保留所有Follower的LEO。对于Follower来说,他需要先把消息从Leader拉取到本地,才能更新自己的LEO值,然后将LEO上报给Leader。此时Follower是无法马上得知HW是否会更新,因为更新HW的动作是由Leader完成的,并且是需要获取其他Follower数据的。只有当Leader推进HW后,在Follower下一次拉取消息时才会更新本机的HW。所以HW在Follower上的更新相比Leader还是存在一定的延迟。
数据丢失问题
当HW在Leader和Follower之间不一致时出现Leader的切换,新Leader根据自己保存的HW进行数据恢复,那么新Leader就会截掉自己的HW之后的消息,那么此时就会出现消息丢失。我们可以模拟一下以上的场景:
当前状态:
节点 角色 LEO HW 消息列表 Broker_0 Leader 2 2 消息0,消息1 Broker_1 Follower 2 1 消息0,消息1 此时Follower还未从Leader同步到最新的HW。
Follower重启,重启后将LEO重置到HW的值,并删除HW后面的消息。开始尝试重新从Leader同步消息。
节点 角色 LEO HW 消息列表 Broker_0 Leader 2 2 消息0,消息1 Broker_1 Follower 1 1 消息0 ,消息1Leader宕机,选举Follower为新的Leader。此时HW是1。
节点 角色 LEO HW 消息列表 Broker_0 Leader(下线)2 2 消息0,消息1 Broker_1 Follower->Leader1 1 消息0 原Leader重启后,作为Follower加入会自动向Leader看齐,截断HW后的日志文件,将HW置为1。
节点 角色 LEO HW 消息列表 Broker_0 Leader->Follower1 1 消息0 Broker_1 Leader 1 1 消息0 经过这样一个故障恢复过程,消息1就从整个集群中彻底的移除了。
由此可见,即使Kafka使用了HW保证HW以后的消息不对消费者可见,但是由于HW在整个集群中的更新延迟,依然会导致消息丢失。
数据不一致问题
当HW在Leader和Follower之间不一致时Leader和Follower同时重启,Follower先启动会导致其成为新的Leader并负责消息的接收,而后原Leader再启动成为Follower加入集群时,由于新Leader的HW也进行了更新,导致两边的HW一致但是消息却不一致的情况。我们可以模拟一下以上的场景:
当前状态:
节点 角色 LEO HW 消息列表 Broker_0 Leader 2 2 消息0,消息1 Broker_1 Follower 2 1 消息0,消息1 此时Follower还未从Leader同步到最新的HW。
Leader和Follower重启,重启后Follower先启动成功,并成为Leader节点,负责收发消息。
节点 角色 LEO HW 消息列表 Broker_0 Leader(下线)2 2 消息0,消息1 Broker_1 Follower->Leader1 1 消息0 ,消息1Broker_0: Leader(下线) LEO=2 HW=2 消息0,消息1
Broker_1: Follower->Leader LEO=1 HW=1 消息0
新Leader接收到新的消息,由于集群内仅有其一个节点,故能够更新HW。
节点 角色 LEO HW 消息列表 Broker_0 Leader(下线)2 2 消息0,消息1 Broker_1 Leader 2 2 消息0,消息2 原Leader重启后,作为Follower加入会自动向Leader看齐,发现HW是一致的,无需进行任何操作。
节点 角色 LEO HW 消息列表 Broker_0 Leader->Follower2 2 消息0,消息1 Broker_1 Follower 2 2 消息0,消息2 经过这样一个重启过程,可以发现在两个副本中出现了消息的不一致。
问题的解决——Epoch
为了解决上面这两个问题,Kafka引入了Epoch机制。每个副本的Log下都有一个leader-epoch-checkpoint
文件,用来记录Leader的纪元(版本)和对应的首条消息,在需要进行数据同步和日志截断时使用leader epoch作为参考,不再使用HW。
leader-epoch概念: Epoch是一个单调递增的版本号,每当Leader发生变更时,该版本号就会更新。所以,当有多个Epoch时,只有最新的Epoch才是有效的,而其他Epoch对应的Leader就是过期的,无用的Leader。同时,Epoch中记录了一个新的Leader写入的第一个消息的偏移量,用来同步数据。
数据结构为Leader Epoch(Leader版本号) -> Start Offset(消息偏移量),可以从从具体文件中查看,其中第三行的两个数就是分别为Leader Epoch和Start Offset。
1 | [root@localhost ~]# cat leader-epoch-checkpoint |
解决数据丢失问题
我们使用Epoch代替HW之后,模拟一下上一小节中的数据丢失的问题:
当前状态:
节点 角色 LEO HW LE Offset 消息列表 Broker_0 Leader 2 2 0 0 消息0,消息1 Broker_1 Follower 2 1 0 0 消息0,消息1 此时Follower还未从Leader同步到最新的HW。
Follower重启,重启后不会将LEO重置到HW的值,而是根据LE向Leader获取LEO。由于Follower的LE与Leader的LE相同,Leader返回自己的LEO,此时Leader的LEO大于等于Follower的LEO,Follower不做截断。
节点 角色 LEO HW LE Offset 消息列表 Broker_0 Leader 2 2 0 0 消息0,消息1 Broker_1 Follower 2 1 0 0 消息0,消息1 Leader宕机,选举Follower为新的Leader。此时HW更新为2。而Epoch因为新的Leader产生了,所以生成了新的LE->Offset。
节点 角色 LEO HW LE Offset 消息列表 Broker_0 Leader(下线)2 2 0 0 消息0,消息1 Broker_1 Follower 2 2 1 2 消息0,消息1 原Leader重启后,作为Follower加入会自动向Leader看齐。
节点 角色 LEO HW LE Offset 消息列表 Broker_0 Leader->Follower2 2 1 2 消息0,消息1 Broker_1 Leader 2 2 1 2 消息0,消息1
解决数据不一致问题
我们使用Epoch代替HW之后,模拟一下上一小节中的数据不一致的问题:
当前状态:
节点 角色 LEO HW LE Offset 消息列表 Broker_0 Leader 2 2 0 0 消息0,消息1 Broker_1 Follower 1 1 0 0 消息0 此时Follower还未从Leader同步到最新的消息。
Leader和Follower重启,重启后Follower先启动成功,并成为Leader节点,负责收发消息。同时Epoch因为新的Leader产生了,所以生成了新的LE->Offset。
节点 角色 LEO HW LE Offset 消息列表 Broker_0 Leader(下线)2 2 0 0 消息0,消息1 Broker_1 Follower->Leader1 1 1 1 消息0 新Leader接收到新的消息,由于集群内仅有其一个节点,故能够更新HW。
节点 角色 LEO HW LE Offset 消息列表 Broker_0 Leader(下线)2 2 0 0 消息0,消息1 Broker_1 Leader 2 2 1 1 消息0,消息2 原Leader重启后,作为Follower加入会自动向Leader看齐,根据LE向Leader获取LEO。此时Leader的LE和Follower的LE不同,Leader根据自己最新的LE和Offset,返回对应Offset为1。Follower根据新版本Leader的Offset截取数据,所以删除了消息1,从Leader同步消息2。
节点 角色 LEO HW LE Offset 消息列表 Broker_0 Leader->Follower2 2 1 1 消息0 ,消息1,消息2Broker_1 Leader 2 2 1 1 消息0,消息2
小结
Kafka天生就是为了集群而生,单个节点也可以成为一个Kafka集群。在Apache Kafka官网上第一句描述就是Apache Kafka is an open-source distributed event streaming platform
。在服务器不稳定等复杂情况下,为了保证集群的高性能,高可用,高可扩展三高,做了非常多的设计。回看这一节中介绍的机制,都是为了保证整个集群中Partition内的数据的一致性,同时还能依靠其设计理念保证高性能和高可扩展。