Kafka使用指南4——Kafka日志索引分析


目录

  1. Kafka简介和安装以及原生命令行客户端

    这块主要是一个Kafka所包含的内部元素的介绍以及简单的使用。

  2. Java客户端以及Spring集成

    Java客户端其实就是原生命令行的一种封装,学习如何使用即可,Spring就是在这个基础上再进行了对象的管理,使用起来并不难。

  3. Kafka集群架构设计

    Kafka在设计之初就是为了高吞吐、高性能、高可扩展,所以它的集群架构是非常值得学习的。

  4. Kafka日志索引详解(本篇)

    Broker能够高效地处理和保存消息,是Kafka高性能的保障。我们从可见的log文件入手,去研究一下Kafka是如何保证消息高效的流转。

主要内容

Kafka的高性能,有很大一部分就是由他的消息存储实现的。只有设计一套高效的处理和存储方案,才能支撑起它的高吞吐量。


Kafka的Log日志

这一部分数据主要包含当前Broker节点的消息数据,这些数据是无状态的,也就是说每个Broker都以相同的逻辑运行,可以将旧Broker的数据迁移到另一个Broker上去继续运行。迁移可以使用kafka-reassign-partitions.sh脚本。

Topic下的消息的存储

在搭建Kafka服务时,我们在server.properties配置文件中通过log.dir属性指定了Kafka的日志存储目录。 实际上,Kafka的所有消息就全都存储在这个目录下。

1
2
3
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/usr/local/data/kafka1-logs

我们查看这个目录下的文件:

1
2
3
4
5
6
7
8
9
10
[root@localhost ~]# cd /usr/local/data/kafka1-logs
[root@localhost kafka1-logs]# ls
cleaner-offset-checkpoint __consumer_offsets-15 __consumer_offsets-22 __consumer_offsets-3 __consumer_offsets-37 __consumer_offsets-44 __consumer_offsets-7 test2-0
__consumer_offsets-0 __consumer_offsets-16 __consumer_offsets-23 __consumer_offsets-30 __consumer_offsets-38 __consumer_offsets-45 __consumer_offsets-8 test-java-client-0
__consumer_offsets-1 __consumer_offsets-17 __consumer_offsets-24 __consumer_offsets-31 __consumer_offsets-39 __consumer_offsets-46 __consumer_offsets-9 test-java-client-1
__consumer_offsets-10 __consumer_offsets-18 __consumer_offsets-25 __consumer_offsets-32 __consumer_offsets-4 __consumer_offsets-47 log-start-offset-checkpoint test-multi-1-0
__consumer_offsets-11 __consumer_offsets-19 __consumer_offsets-26 __consumer_offsets-33 __consumer_offsets-40 __consumer_offsets-48 meta.properties test-multi-1-1
__consumer_offsets-12 __consumer_offsets-2 __consumer_offsets-27 __consumer_offsets-34 __consumer_offsets-41 __consumer_offsets-49 recovery-point-offset-checkpoint
__consumer_offsets-13 __consumer_offsets-20 __consumer_offsets-28 __consumer_offsets-35 __consumer_offsets-42 __consumer_offsets-5 replication-offset-checkpoint
__consumer_offsets-14 __consumer_offsets-21 __consumer_offsets-29 __consumer_offsets-36 __consumer_offsets-43 __consumer_offsets-6 test-0

可以看到这个Broker下所分配到的所有partition都有一个对应的文件夹,如test-multi-1-0代表了topic为test-multi-1下partition为0的分区,test-multi-1-1代表了topic为test-multi-1下partition为1的分区。每一个分区下的有三个文件和消息相关,后缀分别为.index.log.timeindex

1
2
[root@localhost test-multi-1-0]# ls
00000000000000000004.index 00000000000000000004.log 00000000000000000004.timeindex leader-epoch-checkpoint partition.metadata

其中.log文件记录了具体的消息内容,固定大小为1G,由参数log.segment.bytes决定,写满后就会新增一个新的文件继续写入。而一个文件的文件名表示当前日志文件记录的第一条消息的偏移量。.index.timeindex是日志文件对应的索引文件,其中.index文件是以偏移量为索引来记录对应的.log日志文件中的偏移量,而.timeindex则是以时间戳为索引。

另外的两个文件,partition.metadata简单记录当前Partition所属的cluster和Topic。leader-epochcheckpoint文件参见第三篇的epoch机制。

这些文件都是二进制的文件,无法使用文本工具直接查看。但是,Kafka提供了工具可以用来查看这些日志文件的内容。

我们使用了kafka提供的测试脚本用生产者给服务器发送了300多条数据:

1
2
[root@localhost ~]# kafka-producer-perf-test.sh --topic test-multi-1 --num-records 500 --throughput -1 --record-size 512 --producer-props bootstrap.servers=localhost:9092
500 records sent, 702.247191 records/sec (0.34 MB/sec), 93.96 ms avg latency, 565.00 ms max latency, 93 ms 50th, 134 ms 95th, 135 ms 99th, 565 ms 99.9th.

再使用查看日志的功能查看对应的日志文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 查看时间戳文件,展示了时间戳和对应offset的数据
[root@localhost test-multi-1-0]# kafka-dump-log.sh --files 00000000000000000004.timeindex
Dumping 00000000000000000004.timeindex
timestamp: 1716209246182 offset: 115
timestamp: 1716209246193 offset: 146
timestamp: 1716209246198 offset: 177
timestamp: 1716209246200 offset: 208
timestamp: 1716209246201 offset: 239
timestamp: 1716209246202 offset: 270
timestamp: 1716209246204 offset: 301
# 查看索引文件,展示了offset和对应位置的数据
[root@localhost test-multi-1-0]# kafka-dump-log.sh --files 00000000000000000004.index
Dumping 00000000000000000004.index
offset: 115 position: 17425
offset: 146 position: 33637
offset: 177 position: 49849
offset: 208 position: 66061
offset: 239 position: 82273
offset: 270 position: 98485
offset: 301 position: 114697
# 查看日志文件
[root@localhost test-multi-1-0]# kafka-dump-log.sh --files 00000000000000000004.log
Dumping 00000000000000000004.log
Log starting offset: 4
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: 0 lastSequence: 0 producerId: 6000 producerEpoch: 0 partitionLeaderEpoch: 20 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1716208489605 size: 71 magic: 2 compresscodec: none crc: 1260142644 isvalid: true
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: 1 lastSequence: 1 producerId: 6000 producerEpoch: 0 partitionLeaderEpoch: 20 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 71 CreateTime: 1716208493986 size: 71 magic: 2 compresscodec: none crc: 3230703479 isvalid: true
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: 2 lastSequence: 2 producerId: 6000 producerEpoch: 0 partitionLeaderEpoch: 20 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 142 CreateTime: 1716208497627 size: 71 magic: 2 compresscodec: none crc: 3885604520 isvalid: true
baseOffset: 7 lastOffset: 12 count: 6 baseSequence: 0 lastSequence: 5 producerId: 6001 producerEpoch: 0 partitionLeaderEpoch: 20 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 213 CreateTime: 1716208718160 size: 125 magic: 2 compresscodec: none crc: 1930225591 isvalid: true
baseOffset: 13 lastOffset: 18 count: 6 baseSequence: 6 lastSequence: 11 producerId: 6001 producerEpoch: 0 partitionLeaderEpoch: 20 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 338 CreateTime: 1716208719224 size: 117 magic: 2 compresscodec: none crc: 2369992212 isvalid: true
baseOffset: 19 lastOffset: 25 count: 7 baseSequence: 12 lastSequence: 18 producerId: 6001 producerEpoch: 0 partitionLeaderEpoch: 20 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 455 CreateTime: 1716208720377 size: 127 magic: 2 compresscodec: none crc: 1818211086 isvalid: true
baseOffset: 26 lastOffset: 32 count: 7 baseSequence: 19 lastSequence: 25 producerId: 6001 producerEpoch: 0 partitionLeaderEpoch: 20 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 582 CreateTime: 1716208721560 size: 125 magic: 2 compresscodec: none crc: 720610688 isvalid: true
baseOffset: 33 lastOffset: 38 count: 6 baseSequence: 26 lastSequence: 31 producerId: 6001 producerEpoch: 0 partitionLeaderEpoch: 20 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 707 CreateTime: 1716208722632 size: 117 magic: 2 compresscodec: none crc: 115064756 isvalid: true
...

学习这些数据的存储方式,就是我们探究kafka存储理念的关键。

log文件追加

首先在每个文件内部,Kafka都会以追加的形式写入新的消息日志,position是这批消息的起点,size是消息序列化后的长度。Kafka中的消息日志,只允许追加,不支持删除和修改。所以,文件名最大的一个log文件是当前写入消息的日志文件,其他文件都是不可修改的历史消息文件。

每个Log文件都保持固定的大小。如果当前文件记录不下了,就会重新创建一个log文件,并以这个 log文件写入的第一条消息的偏移量命名。这种设计其实是为了更方便进行文件映射,加快读消息的效率。

index和timeindex加速读取log消息日志

首先,根据日志名称我们可以获取到绝对偏移量。如日志目录中有两组日志文件:

1
2
3
[root@localhost test-multi-1-0]# ls
00000000000000000004.index 00000000000000000004.log 00000000000000000004.timeindex leader-epoch-checkpoint partition.metadata
00000000000000006005.index 00000000000000006005.log 00000000000000006005.timeindex

offset从4-6004的消息就可以根据00000000000000000004.index00000000000000000004.log00000000000000000004.timeindex去获取,而后面的消息则通过后面的文件获取。

其次,在一组消息文件中,可以根据.index文件中的offset直接获取到对应.log文件的位置,方便快速定位。而如果是根据时间戳则可以通过.timeindex文件从时间戳->offset->position这个路径获得。

这两个索引并不是对每一条消息都建立索引。而是Broker每写入40KB的数据,就建立一条index索引。由参数log.index.interval.bytes定制。所以如上面的文件中offset: 115 position: 17425 offset: 146 position: 33637,当我们想获取offset为120对应的消息时,就从115对应的17425位置开始向后查找。

index文件的作用类似于数据结构中的跳表,他的作用是用来加速查询log文件的效率。而timeindex文件的作用则是用来进行一些跟时间相关的消息处理。比如文件清理。 这两个索引文件也是Kafka的消费者能够指定从某一个offset或者某一个时间点读取消息的原因。

文件清理机制

Kafka为了防止过多的日志文件给服务器带来过大的压力,他会定期删除过期的log文件。Kafka的删除机制涉及到几组配置属性:

判断过期:

  • log.retention.check.interval.ms定时检测文件是否过期。默认是300000毫秒,也就是五分钟。
  • log.retention.hourslog.retention.minuteslog.retention.ms。 这一组参数表示文件保留多长时间。默认生效的是log.retention.hours,默认值是168小时,也就是7天。如果设置了更高的时间精度,以时间精度最高的配置为准。
  • 在检查文件是否超时时,是以每个.timeindex中最大那一条记录为准。所以我们在上面的例子中可以看到log文件夹下只有00000000000000000004.index文件而没有00000000000000000000.index文件,因为offset为0~3的消息都是写前几篇文章时测试的数据,已经被删除了。新写入的文件就以offset为4开始,并且这一份文件就是第一份文件。

过期的日志文件如何处理:

  • log.cleanup.policy:日志清理策略。有两个选项,delete是删除文件,compact为压缩日志文件。
  • log.cleanup.policy选择delete时,还有一个参数可以选择。log.retention.bytes:表示所有日志文件 的大小。当总的日志文件大小超过这个阈值后,就会删除最早的日志文件。默认是-1,表示无限大。

Kafka的文件高效读写机制

Kafka的文件结构

Kafka的文件结构可以加快日志文件的读取。从上面小节中可以学习到,同一个Topic的不同Partition使用不同的文件夹区分,每个日志文件使用index的稀疏索引结构加快log日志的检索速度。

顺序写磁盘

这个设计主要和硬盘结构相关。对每个log文件,Kafka都会提前规划固定的大小,这样在申请文件的时候,可以提前占据一块连续的磁盘空间。然后Kafka只能以追加的形式往文件末端写入数据,这样的写入方式成为顺序写。这样新数据的写入可以直接往申请好的磁盘空间中写入,不需要另外申请空间,另外申请空间会导致随机写,这样申请的空间不是连续的,会产生很多碎片。

零拷贝

零拷贝是Linux操作系统提供的一种IO优化机制,而Kafka大量的运用了零拷贝机制来加速文件读写。

传统情况下,一次读磁盘往网络传输的过程如下:

磁盘读取数据---DMA Copy--->页缓存---CPU Copy--->JVM内存---CPU Copy--->Socket缓冲区---DMA Copy--->网络

其中页缓存和Socket缓冲区都是内核层,由于需要从内核层将数据放到用户层的JVM中再放回去,消耗了时间和资源。所以零拷贝的技术,重点是要配合内核态的复制机制,减少用户态与内核态之间的内容拷贝。

具体有两种实现机制:

  • mmap文件映射机制:这种方式是在用户态不再缓存整个IO的内容,改为只持有文件的一些映射信息。通过这些映射,"遥控"内核态的文件读写。这样就减少了内核态与用户态之间的拷贝数据大小,提升了IO效率。

    也就是在磁盘把数据读到页缓存之后,JVM仅拥有对象的映射,然后直接使用CPU Copy将数据复制到Socket缓冲区。这种mmap文件映射方式,适合于操作不是很大的文件,通常映射的文件不建议超过2G。所以Kafka中的日志文件设计成为1G大小,便于对文件进行映射,从而加快对.log文件等本地文件的写入效率。

    注:可学习jdk中DirectByteBuffer的实现机制。

  • sendfile文件传输机制:这种方式用户态根本不需要对象的引用,直接通知CPU去复制文件。这样数据完全不会复制到用户态,从而实现零拷贝。

    这种方式适合于服务端不需要对数据进行任何处理的场景,如消费者从broker拉取消息时,Broker只要读取文件,复制到网卡的Socket缓冲区,然后发送即可。整个过程中,用户态只要往内核态发送sendfile指令。Kafka运用了大量sendfile机制用来加速本地文件数据的读取过程。

    注:可学习jdk中FileChannel里transferTo和transferFrom的实现机制。

这些底层的优化机制都是操作系统提供的优化机制,其实针对任何上层应用语言来说,都是一个黑盒,只能去调用,但是控制不了具体的实现过程。而上层的各种各样的语言,也只能根据操作系统提供的支持进行 自己的实现。虽然不同语言的实现方式会有点不同,但是本质都是一样的。

合理配置刷盘频率

缓存的数据断电会丢失,但是缓存能够提高处理速度。如果缓存中的数据没有及时写入磁盘,那么这些数据就会丢失。所以合理配置刷盘频率是必要的,最安全的方式当然是写一条数据,刷一次盘。

刷盘操作在linux中对应为fsync的调用。

1
2
3
4
5
6
FSYNC(2)
Linux Programmer's Manual
FSYNC(2)

NAME
fsync, fdatasync - synchronize a file's in-core state with storage device

这里所提到的in-core state,并不是我们平常开发过程中接触到的缓存,而是操作系统内核态的缓存pageCache。这是应用程序接触不到的一部分缓存。比如我们用应用程序打开一个文件,实际上文件里的内容,是从内核态的PageCache中读取出来的。因为与磁盘这样的硬件交互,相比于内存,效率是很低的。操作系统为了提升性能,会将磁盘中的文件加载到PageCache缓存中,再向应用程序提供数据。修改文件时也是一样的。用记事本修改一个文件的内容,不管你保存多少次,内容都是写到PageCache里的。然后操作系统会通过他自己的缓存管理机制,在未来的某个时刻将所有的PageCache统一写入磁盘。这个操作就是刷盘。比如在操作系统正常关闭的过程中,就会触发一次完整的刷盘机制。

也就是说,这个刷盘的过程应用系统是无法插手的,只能尽量每次修改就去调用一遍系统的刷盘,具体是否能够成功,成功前断电导致缓存丢失等问题应用系统无法去避免。但是,每一次修改都去刷盘显然会影响系统的性能,并且就算调用了刷盘命令,仍然无法保证能够成功将数据写入磁盘。

Kafka其实在Broker端设计了一系列的参数,来控制刷盘操作的频率。如果对这些频率进行深度定制,是可以实现来一个消息就进行一次刷盘的同步刷盘效果的。但是,这样的定制显然会大大降低Kafka的执行效率,这与Kafka的设计初衷是不符合的。所以,在实际应用时,我们通常也只能根据自己的业务场景进行权衡。

  • flush.ms : 多长时间进行一次强制刷盘。
  • log.flush.interval.messages:表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX
  • log.flush.interval.ms:当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。 他的默认值是空。如果这个参数配置为空,则生效的是下一个参数。
  • log.flush.scheduler.interval.ms:检查是否有日志文件需要进行刷盘的频率。默认也是Long.MAX

这里可以看到,Kafka为了最大化性能,默认是将刷盘操作交由了操作系统进行统一管理,并没有手动去执行刷盘操作。

客户端消费进度管理

kafka为了实现分组消费的消息转发机制,需要在Broker端保持每个消费者组的消费进度。而这些消费进度,就被Kafka管理在自己的一个内置Topic中。这个Topic就是__consumer__offsets。这是Kafka内置的一 个系统Topic,在日志文件可以看到这个Topic的相关目录。Kafka默认会将这个Topic划分为50个分区。可以从下面的命令中看到,除了我们自己创建的topic以"topic" + "-" + "partition"命名的文件夹外,还有以"__consumer__offsets" + "-" + "partition"命名的Kafka自带的文件夹。

1
2
3
4
5
6
7
8
9
10
[root@localhost ~]# cd /usr/local/data/kafka1-logs
[root@localhost kafka1-logs]# ls
cleaner-offset-checkpoint __consumer_offsets-15 __consumer_offsets-22 __consumer_offsets-3 __consumer_offsets-37 __consumer_offsets-44 __consumer_offsets-7 test2-0
__consumer_offsets-0 __consumer_offsets-16 __consumer_offsets-23 __consumer_offsets-30 __consumer_offsets-38 __consumer_offsets-45 __consumer_offsets-8 test-java-client-0
__consumer_offsets-1 __consumer_offsets-17 __consumer_offsets-24 __consumer_offsets-31 __consumer_offsets-39 __consumer_offsets-46 __consumer_offsets-9 test-java-client-1
__consumer_offsets-10 __consumer_offsets-18 __consumer_offsets-25 __consumer_offsets-32 __consumer_offsets-4 __consumer_offsets-47 log-start-offset-checkpoint test-multi-1-0
__consumer_offsets-11 __consumer_offsets-19 __consumer_offsets-26 __consumer_offsets-33 __consumer_offsets-40 __consumer_offsets-48 meta.properties test-multi-1-1
__consumer_offsets-12 __consumer_offsets-2 __consumer_offsets-27 __consumer_offsets-34 __consumer_offsets-41 __consumer_offsets-49 recovery-point-offset-checkpoint
__consumer_offsets-13 __consumer_offsets-20 __consumer_offsets-28 __consumer_offsets-35 __consumer_offsets-42 __consumer_offsets-5 replication-offset-checkpoint
__consumer_offsets-14 __consumer_offsets-21 __consumer_offsets-29 __consumer_offsets-36 __consumer_offsets-43 __consumer_offsets-6 test-0

同时这些消费进度的topic也会存到zookeeper中:

1
2
[zk: localhost:2181(CONNECTED) 9] get /brokers/topics/__consumer_offsets/partitions/0/state
{"controller_epoch":20,"leader":0,"version":1,"leader_epoch":20,"isr":[0]}

而对应的数据一样被存到对应的文件文件夹中:

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@localhost /]# cd /usr/local/data/kafka1-logs/__consumer_offsets-36
[root@localhost __consumer_offsets-36]# ls -l
total 32
-rw-r--r--. 1 root root 0 Apr 26 16:42 00000000000000000000.index
-rw-r--r--. 1 root root 317 Apr 26 16:42 00000000000000000000.log
-rw-r--r--. 1 root root 12 Apr 26 16:42 00000000000000000000.timeindex
-rw-r--r--. 1 root root 10485760 May 28 14:05 00000000000002531037.index
-rw-r--r--. 1 root root 174 May 20 15:02 00000000000002531037.log
-rw-r--r--. 1 root root 10 May 20 14:52 00000000000002531037.snapshot
-rw-r--r--. 1 root root 10485756 May 28 14:05 00000000000002531037.timeindex
-rw-r--r--. 1 root root 10 May 20 22:35 00000000000002531040.snapshot
-rw-r--r--. 1 root root 9 May 28 14:05 leader-epoch-checkpoint
-rw-r--r--. 1 root root 43 Apr 1 21:44 partition.metadata

其实这个用来储存消费者进度的topic也是可以被消费的:

1
2
3
[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
[test-group-1,test-java-client,1]::OffsetAndMetadata(offset=15, leaderEpoch=Optional[4], metadata=, commitTimestamp=1716879294157, expireTimestamp=None)
[test-group-1,test-java-client,0]::OffsetAndMetadata(offset=10, leaderEpoch=Optional[7], metadata=, commitTimestamp=1716879294157, expireTimestamp=None)

可以看到其中记录了“消费者组+topic+partition”对应的offset,如果消费者手动修改了offset的值,这个队列里也会有新的消息来记录。

Kafka生产调优实践

通常在生产环境中,Kafka都是用来应对整个项目中最高峰的流量的。这种极高的请求流量,对任何服务都是一个很大的负担,因此如果在生产环境中部署Kafka,也可以从以下几个方面进行一些优化。

搭建监控平台

生产环境通常会对Kafka搭建监控平台。而Kafka-eagle就是一个可以监控Kafka集群整体运行情况的框架,在生产环境经常会用到。官网地址:https://www.kafka-eagle.org/ 以前叫做Kafka-eagle,现在用了个简写,EFAK(Eagle For Apache Kafka)。

合理规划Kafka部署环境

  • 机械硬盘:对于准备部署Kafka服务的服务器,建议配置大容量机械硬盘。Kakfa顺序读写的实现方式不太需要SSD这样高性能的磁盘。同等容量SSD硬盘的成本比机械硬盘要高出非常多,没有必要。将SSD的成本投入到MySQL这类的服务更合适。

  • 大内存:在Kafka的服务启动脚本bin/kafka-start-server.sh中,对于JVM内存的规划是比较小的,可以根据之前JVM调优专题中的经验进行适当优化。 脚本中的JVM内存参数默认只申请了1G内存。

    1
    KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
  • 高性能网卡:Kafka本身的服务性能非常高,单机就可以支持百万级的TPS。在高流量冲击下,网络非常有可能优先于服务,成为性能瓶颈。并且Kafka集群内部也需要大量同步消息。因此,对于Kafka服务器,建议配置高性能的网卡。成本允许的话,尽量选择千兆以上的网卡。

合理优化Kafka集群配置

  • 合理配置Partition数量:我们在上面的log日志章节中看到,每一个Partition都会有一个专门的文件夹去存储。虽然单个Partition的读写效率非常高,但是如果Partition的数量过多,会严重影响Kafka的整体性能。

    首先,不要设置过多的Topic,通常建议不超过3个Topic。其次不要设置过多的副本,大部分情况下设置为2就行。至于Partition的数量,最好根据业务情况灵活调整。partition数量设置多一些,可以一定程度增加Topic的吞吐量。但是过多的partition数量还是同样会带来partition索引的压力。

    具体情况可以使用压测脚本来进行测试。

    1
    2
    3
    4
    [root@localhost ~]# kafka-producer-perf-test.sh --topic test-multi-1 --num-record 10000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=1
    376 records sent, 71.4 records/sec (0.07 MB/sec), 2326.7 ms avg latency, 3456.0 ms max latency.
    3480 records sent, 689.0 records/sec (0.67 MB/sec), 5571.2 ms avg latency, 7090.0 ms max latency.
    10000 records sent, 875.196919 records/sec (0.85 MB/sec), 6258.41 ms avg latency, 7393.00 ms max latency, 6670 ms 50th, 7239 ms 95th, 7378 ms 99th, 7390 ms 99.9th.
  • 合理对数据进行压缩:

    在生产者的ProducerConfig中,有一个配置COMPRESSION_TYPE_CONFIG是用来对消息进行压缩的。生产者配置了压缩策略后,会对生产的每个消息进行压缩,从而降低Producer到Broker的网络传输,也降低了Broker的数据存储压力。 从介绍中可以看到,Kafka的生产者支持四种压缩算法。这几种压缩算法中,zstd算法具有最高的数据压缩比,但是吞吐量不高。lz4在吞吐量方面的优势比较明显。在实际使用时,可以根据业务情况选择合适的压缩 算法。但是要注意下,压缩消息必然增加CPU的消耗,如果CPU资源紧张,就不要压缩了。

    关于Broker端数据压缩机制,在broker.conf文件中,也是可以配置压缩算法的。正常情况下,Broker从Producer端接收到消息后不会对其进行任何修改,但是如果Broker端和Producer端指定了不同的压缩算法,就会产生很多异常的表现。

    当然,如果在生产者开启了压缩,那么消费者是一定需要解压缩的。这里就要注意生产者、服务端、消费者的兼容问题。如果生产者压缩了消息,在消息中会带入压缩的算法,如果这个算法在消费者端没有实现,那么消费者端就无法正确的消费消息。

优化客户端

  • 生产者合理保证消息安全

    • 设置发送者应答参数:主要涉及到两个参数。生产者的ACKS_CONFIG配置和服务端的min.insync.replicas参数。这个在第二篇文章的发送应答机制中介绍过。
    • 打开生产者端的幂等性配置:ENABLE_IDEMPOTENCE_CONFIG,生产者将这个参数设置为true后, 服务端会根据生产者实例以及消息的目标Partition,进行重复判断,从而过滤掉生产者一部分重复发送的消息。这个在第二篇文章的生产者消息幂等性中介绍过。
    • 使用生产者事务机制发送消息:打开幂等性之后,如果一个生产者实例需要发送多条消息,而你能够确定这些消息都是发往同一个Partition的,那么你就不需要再过多考虑消息安全的问题。但是如果这些消息是发往不同的Partition,那么尽量使用异步发送机制+事务来进一步提高消息的安全性。尤其在使用Spring时,Producer往往使用单例放到Spring容器中,这时候更需要注意事务的使用。实际在SpringBoot中集成KafkaTemplate时,需要在springboot的配置文件中配置transaction-id-prefix来开启事务。
  • 消费者端合理保证消息安全

    • 消费者处理方式尽量不要使用异步:Kafka消费消息是有重试机制的,如果消费者没有主动提交事务(自动提交或者手动提交),那么这些失败的消息是可以交由消费者组进行重试的,所以正常情况下,消费者这一端是不会丢失消息的。但是如果消费者要使用异步方式进行业务处理,那么如果业务处理失败,此时消费者已经提交了Offset,这个消息就无法重试了,这就会造成消息丢失。因此在消费者端,尽量不要使用异步处理方式,在绝大部分场景下,就能够通过Kafka的消费者重试机制,保证消息安全处理。此时,在消费者端,需要更多考虑的问题就变成了消费重试机制造成的消息重复消费的问题。

    • 消费者防止消息重复消费:在大部分消费场景下,消费者消费完后提交offset不会有什么问题,但是如果消息处理时间过长,服务端认为消息消费失败了,此时就有可能让同组的其他消费者消费这条消息。此时就有可能造成消息消费重复。这时就需要一种统一的方式去解决消息幂等性问题。这里以普通的Java客户端为例写一段伪代码。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      // 循环获取消息
      while (true) {
      // 从redis中获取当前topic和partition未消费的offset(可能有正在处理的消息)
      int offset = redisUtil.get("topic_parition_processing");
      // 从redis中获取当前topic和partition已经处理完的offset
      int handled_offset = redisUtil.get("topic_parition_handled");
      // 如果这两个值不一致,说明上一个消费者消费消息未处理完成,被服务端认为处理失败,此时可以选择等待一段时间,如果等待某一特定时间还是失败,可以选择重新处理。
      wait();
      // 从offset开始拉取信息
      consumer.seek(new TopicPartition(topic, partition), offset);
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
      // 更新redis的topic_parition_processing为拉取到的最后一条信息
      int max_offset = getMaxOffsetFromRecords(records);
      redisUtil.set("topic_parition_processing", max_offset);
      // 处理消息
      handle(records);
      // 更新redis的topic_parition_handled标识已经处理完成
      redisUtil.set("topic_parition_handled", max_offset);
      // 提交服务端处理完成
      commit();
      }

      如果使用了SpringBoot那么需要在@KafkaListener的注解上个性化自己的@ContainerFactory,在个性化的ContainerFactory中个性化ConsumerRebalanceListener,在getOffsetcommitOffset方法中实现对Redis的操作。

生产环境常见方案解析(总结)

消息零丢失方案

生产者发送消息到Broker不丢失

Kafka的消息生产者Producer,支持定制一个参数,ProducerConfig.ACKS_CONFIG

  • acks配置为0:生产者只负责往Broker端发消息,而不关注Broker的响应。也就是说不关心Broker端 有没有收到消息。性能高,但是数据会有丢消息的可能。
  • acks配置为1:当Broker端的Leader Partition接收到消息后,只完成本地日志文件的写入,然后就给生产者答复。其他Partiton异步拉取Leader Partiton的消息文件。这种方式如果其他Partiton拉取消息失败,也有可能丢消息。
  • acks配置为-1或者all:Broker端会完整所有Partition的本地日志写入后,才会给生产者答复。数据安全性最高,但是性能显然是最低的。

对于KafkaProducer,只要将acks设置成1或-1,那么Producer发送消息后都可以拿到Broker的反馈RecordMetadata,里面包含了消息在Broker端的partition、offset等信息。通过这这些信息可以判断消息是否发送成功。如果没有发送成功,Producer就可以根据情况选择重新进行发送。

Broker端保存消息不丢失

首先,合理优化刷盘频率,防止服务异常崩溃造成消息未刷盘。Kafka的消息都是先写入操作系统的PageCache缓存,然后再刷盘写入到硬盘。PageCache缓存中的消息是断电即丢失的。如果消息只在 PageCache中,而没有写入硬盘,此时如果服务异常崩溃,这些未写入硬盘的消息就会丢失。Kafka并不支持写一条消息就刷一次盘的同步刷盘机制,只能通过调整刷盘的执行频率,提升消息安全。主要涉及几个参数:

  • flush.ms : 多长时间进行一次强制刷盘。
  • log.flush.interval.messages:表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。
  • log.flush.interval.ms:当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。他的默认值是空。

然后,配置多备份因子,防止单点消息丢失。在Kafka中,可以给Topic配置更大的备份因子replicationfactors。配置了备份因子后,Kafka会给每个Partition分配多个备份Partition。这些Partiton会尽量平均的分配到多个Broker上。并且,在这些Partiton中,会选举产生Leader Partition和Follower Partition。这样, 当Leader Partition发生故障时,其他Follower Partition上还有消息的备份。就可以重新选举产生Leader Partition,继续提供服务。

当然,我们在第三篇文章介绍崩溃恢复机制的时候说到,在服务经常崩溃的情况下,Kafka是为了高性能而牺牲了消息安全性的。

消费者端防止异步处理丢失消息

消费者端由于有消息重试机制,正常情况下是不会丢消息的。每次消费者处理一批消息,需要在处理完后给Broker应答,提交当前消息的Offset。Broker接到应答后,会推进本地日志的Offset记录。如果Broker没有接到应答,那么Broker会重新向同一个消费者组的消费者实例推送消息,最终保证消息不丢失。这时,消费者端采用手动提交Offset的方式,相比自动提交会更容易控制提交Offset的时机。

消费者端唯一需要注意的是,不要异步处理业务逻辑。因为如果业务逻辑异步进行,而消费者已经同步提交了Offset,那么如果业务逻辑执行过程中出现了异常,失败了,那么Broker端已经接收到了消费者的应答,后续就不会再重新推送消息,这样就造成了业务层面的消息丢失。

消息积压如何处理

通常情况下,Kafka本身是能够存储海量消息的,他的消息积压能力是很强的。但是,如果发现消息积压问题已经影响了业务处理进度,这时就需要进行一定的优化。

  1. 如果业务运行正常,只是因为消费者处理消息过慢,造成消息加压。那么可以增加Topic的Partition分区数,将消息拆分到更到的Partition。然后增加消费者个数,最多让消费者个数=Partition分区数,让一个Consumer负责一个分区,将消费进度提升到最大。

    另外,在发送消息时,还是要尽量保证消息在各个Partition中的分布比较均匀。比如,在原有Topic下,可以调整Producer的分区策略,让Producer将后续的消息更多的发送到新增的Partition里,这样可以让各个Partition上的消息能够趋于平衡。如果你觉得这样太麻烦,那就新增一个Topic,配置更多的Partition以及对 应的消费者实例。然后启动一批Consumer,将消息从旧的Topic搬运到新的Topic。这些Consumer不处理业务逻辑,只是做消息搬运,所以他们的性能是很高的。这样就能让新的Topic下的各个Partition数量趋于平衡。

  2. 如果是消费者的业务问题导致消息阻塞了,从而积压大量消息,并影响了系统正常运行。比如消费者序列化失败,或者业务处理全部异常。这时可以采用一种降级的方案,先启动一个Consumer将Topic下的消息先转发到其他队列中,然后再慢慢分析新队列里的消息处理问题。类似于死信队列的处理方式。

如何保证消息顺序

这也是一个常见的面试题。有时候业务上会需要消息按照顺序进行处理。例如QQ的聊天记录,一问一答必须有顺序,要是顺序乱了,就没法看了。这时应该怎么做?这个问题要交由Kafka来处理是很麻烦的,因为我们一直强调过,kafka设计的最优先重点是海量吞吐,所以他对于传统MQ面临的这些问题,处理是比较粗犷的。比如最典型的就是单partition,单Consumer组合其实并不能在Kafka上解决这个问题。

首先我们把这个问题分为两个部分:生产者和消费者

  • 生产者:因为kafka中各个Partition的消息是并发处理的,所以要保证消息顺序,对于Producer,要保证将一组有序的消息发到同一个Partition里。因为Partition的数据是顺序写的,所以自然就能保证消息是按顺序保存的。所以很显然,只要只配置一个Partition就能实现,但是这样实际上是放弃了Kafka多分片提高吞吐量的优势。所以我们对特定的业务,是可以依靠某些业务主键来保证局部的有序性的,如甲的操作ABC需要保证有序性,乙的操作DEF需要保证有序性,则我们可以根据甲和乙的唯一id进行分区发送,这种情况下不需要保证ABC和DEF的总体有序。

    是不是Producer都将消息往同一个Partition发,就能保证消息顺序呢?如果只追求答案,那么结果肯定是正确的,因为Partition就是FIFO的队列结构。但是,稍微深入想想怎么实现的,就没这么简单了。因为消息可能发送失败。比如Producer依次发送1,2,3三条消息。如果消息1因为网络原因发送失败了,2和3发送成功了,这样消息顺序就乱了。如果要每次只发送一次消息,或者再加上重试机制,那么整个效率基本没法看,所以Kafka其实在这个问题上是有做了考量的。

    回想一下在第二篇文章中提到幂等性,每一条消息都会有一个sn,这个sn是单调递增的。这个sn除了保证消息的唯一性,同时也可以用来检测消息是否有丢失。如果在消息2,3都成功之后,消息1的重试请求来了,会发现消息1的sn是在已经写入的消息2和3之前的,此时就会往Producer抛出一个 OutOfOrderSequenceException异常。

    我们可以在ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTIONRETRIES_CONFIG的描述中看到对排序的相关说明。

    max.in.flight.requests.per.connection指一个生产者在得到确认前能够向服务端发送的消息数量,如果配置了retries重试,并且该参数大于1,那么就有可能因为重试导致消息的乱序。

  • 消费者:Partition中的消息有序后,我们就要保证Consumer的消费顺序是有序的。

    根据ConsumerConfig.FETCH_MAX_BYTES_CONFIG的描述,Consumer其实是每次并行的拉取多个Batch批次的消息进行处理的。也就是说Consumer拉取过来的多批消息并不是串行消费的。所以在Kafka提供的客户端Consumer中,是没有办法直接保证消费的消息顺序。其实这也比较好理解,因为Kafka设计的重点是高吞吐量,所以他的设计是让Consumer尽最大的能力去消费消息。而只要对消费的顺序做处理,就必然会影响Consumer拉取消息的性能。

    所以这时候,我们能做的就是在Consumer的处理逻辑中,将消息进行排序。比如将消息按照业务独立性 收集到一个集合中,然后在集合中对消息进行排序。

    那么针对消费者顺序消费的问题,有没有其他的处理思路呢?在RocketMQ中提供了一个比较好的方式。 RocketMQ中提供了顺序消息的实现。他的实现原理是先锁定一个队列(在RocketMQ中称为 MessageQueue,类似于Kafka中的Partition,都是实际存储消息的队列结果),消费完这一个队列后,才开始锁定下一个队列,并消费队列中的消息。再结合MessageQueue中的消息有序性,就能保证整体消息的消费顺序是有序的。