Kafka使用指南1——简介和安装、原生命令行的使用


目录

  1. Kafka简介和安装以及原生命令行客户端(本篇)

    这块主要是一个Kafka所包含的内部元素的介绍以及简单的使用。

  2. Java客户端以及Spring集成

    Java客户端其实就是原生命令行的一种封装,学习如何使用即可,Spring就是在这个基础上再进行了对象的管理,使用起来并不难。

  3. Kafka集群架构设计

    Kafka在设计之初就是为了高吞吐、高性能、高可扩展,所以它的集群架构是非常值得学习的。

  4. Kafka日志索引详解

    Broker能够高效地处理和保存消息,是Kafka高性能的保障。我们从可见的log文件入手,去研究一下Kafka是如何保证消息高效的流转。

主要内容

这一部分主要是接触Kafka并熟悉Kafka的使用方式。从安装搭建开始到基础使用,快速感受Kafka的功能,可以帮助理解基于Kafka的解决方案。


Kafka简介和安装

简介

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写, Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

应用场景

日志收集

公司可以利用Kafka收集各种服务的日志,通过kafka以统一接口服务的方式开放给各种consumer。

简单的架构就是nginx记录访问日志,由logstash收集日志然后将日志发送给Kafka,再由spark等大数据处理框架处理日志数据,最终以一定的形式保存到日志储存层并由应用客户端展示。

消息系统

当做传统的消息队列使用,解耦生产者和消费者,削峰填谷。

用户活动跟踪

和日志收集类似,只不过数据来源是各web应用或app应用记录的用户各种活动,如点击、搜索、浏览等操作。这些活动信息被各个服务器发送到kafka,然后订阅者通过订阅这些topic来做实时的监控分析,或者使用大数据模型进行离线分析和数据挖掘。

Kafka基本概念

在之前学习过RabbitMQ和RocketMQ之后,对消息队列的理解应该已经很深了。Kafka主要的特点就是分布式和分区消息存储。主要的几个概念如下:

节点Broker

消息中间件的处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群。

主题Topic

和传统消息中间件一样,主题是对消息的分类,生产者发布消息需要指定一个Topic。

生产者Producer

消息生产者,向Broker发送消息的客户端。

消费者Consumer

消息消费者,从Broker读取消息的客户端。

消费者组ConsumerGroup

每个消费者属于一个特定的ConsumerGroup,一个消息可以被不同的ConsumerGroup消费,但是一个ConsumerGroup中的消费者只有一个Consumer可以消费到消息。

分区Partition

一个Topic可以被分为多个Partition,生产者发送消息是往指定的Topic中的某一个(可以指定,不指定的话使用Hash)Partition发送消息,消费者也是从Topic中的某一个(或多个)Partition中消费消息,一个Partition最多只能被一个消费者消费。

Partition的存在是为了实现高并发,每一个Partition能够存在不同的Broker中,这样多个消费者可以从不同的Broker上消费消息。

Kafka安装

依赖环境安装

Kafka由Scala语言开发,运行在JVM上,所以首先需要安装Jdk:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 下载jdk包之后解压
[root@localhost ~]# tar -zxvf jdk-8u362b09-linux-x64.tar.gz -C /opt/
# 将/opt/jdk-8u362b09-linux-x64 建立符号链接到/usr/local/jdk
[root@localhost ~]# ln -s /opt/jdk1.8.0_362-b09 /usr/local/jdk
# 写入环境变量
[root@localhost ~]# echo "export JAVA_HOME=/usr/local/jdk" >> /etc/profile
[root@localhost ~]# echo "export JRE_HOME=\$JAVA_HOME/jre" >> /etc/profile
[root@localhost ~]# echo "export CLASSPATH=.:\$JAVA_HOME/lib:\$JRE_HOME/lib" >> /etc/profile
[root@localhost ~]# echo "export PATH=\$JAVA_HOME/bin:\$PATH" >> /etc/profile
# 使配置文件生效。
[root@localhost ~]# source /etc/profile
# 赋予 jdk 中 bin 文件可执行权限
[root@localhost ~]# chmod -R +x /usr/local/jdk/bin
# 安装完毕,验证
[root@localhost ~]# java -version
openjdk version "1.8.0_362"
OpenJDK Runtime Environment (Temurin)(build 1.8.0_362-b09)
OpenJDK 64-Bit Server VM (Temurin)(build 25.362-b09, mixed mode)

Kafka依赖zookeeper,所以需要安装zookeeper:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 下载zookeeper包之后解压
[root@localhost ~]# tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz -C /opt/
[root@localhost ~]# cd /opt
# 在/usr/local/下创建一个不带版本号的zookeeper链接
[root@localhost opt]# ln -s /opt/zookeeper-3.4.11 /usr/local/zookeeper
[root@localhost opt]# cd /usr/local/zookeeper/conf/
# 依据默认的配置新建自己的配置文件
[root@localhost conf]# cp zoo_sample.cfg zoo.cfg
#配置zookeeper的数据文件和日志文件存放位置
[root@localhost conf]# echo -e "# append zk_env\nexport PATH=$PATH:/usr/local/zookeeper/bin" >> /etc/profile
[root@localhost conf]# sed -i 's/dataDir=\/tmp\/zookeeper/dataLogDir=\/usr\/local\/zookeeper\/logs\ndataDir=\/usr\/local\/zookeeper\/data/g' zoo.cfg
# 使配置文件生效。
[root@localhost conf]# source /etc/profile
# 新建zookeeper的日志文件和数据文件
[root@localhost conf]# mkdir -p /usr/local/zookeeper/{logs,data}
# 启动zookeeper
[root@localhost conf]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

注:zookeeper默认端口为2181,同时还会占用8080的管理端口。

安装Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 下载kafka包之后解压
[root@localhost ~]# tar -xzf kafka_2.13-3.7.0.tgz /opt/kafka_2.13-3.7.0
[root@localhost ~]# cd /opt/kafka_2.13-3.7.0/config/
# 进入目录修改配置文件
[root@localhost config]#vim server.properties
# 配置文件中内容如下:
#broker.id属性在kafka集群中必须要是唯一,多个broker设置为0,1,2...
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://127.0.0.1:9092
#kafka的消息存储文件
log.dir=/usr/local/data/kafka‐logs
#kafka连接zookeeper的地址 我这里zk和kafka在同一台上
zookeeper.connect=127.0.0.1:2181

# 启动Kafka 后台启动,不会打印日志到控制台
[root@localhost ~] kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
# 启动后链接zookeeper验证broker的状态
[root@localhost ~]# zkCli.sh
Connecting to localhost:2181
...
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[0] # 说明broker.id为0的节点已启动
# 关闭Kafka
[root@localhost /]# kafka-server-stop.sh

其中配置文件有大量参数可以设置,部分要点如下:

参数名称默认值说明
broker.id0每个broker都可以用一个唯一的非负整数id进行表示。可以选择任意数字作为id,只要id是唯一的即可。
log.dir或log.dirs/tmp/kafka-logsKafka存放数据的路径。路径不是唯一的,可以是多个,路径之间只需要逗号分割。
listenersPLAINTEXT://127.0.0.1:9092server接收客户端连接的端口,ip配置kafka本机ip即可。另外可以配置多个,如还需新增配置SSL的端口。
zookeeper.connect127.0.0.1:2181zookeeper的地址,如果是zookeeper集群则可以配置多个。如hostname1:port1, hostname2:port2, hostname3:port3。
log.retention.hours168每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样。
num.patitions1创建topic的默认分区数
default.replication.factor1自动创建topic的默认副本数量,建议设置为大于等于2。副本数量需要小于等于broker的数量。
min.insync.replicas1当producer设置acks为-1时,min.insync.replicas指定需要有多少个副本写数据是成功的,才认为消息发送成功。如果无法达到该数,那么生产者会抛出异常(NotEnoughReplicas)。

集群部署

我们创建一个3个节点的集群,三个节点的脚本中对应的配置分别为:

1
2
3
4
5
6
7
8
9
10
11
12
# 第一个节点
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.56.100:9092
# 第二个节点
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.56.101:9092
# 第三个节点
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.56.102:9092

可以从zookeeper的数据中看到brokers有0,1,2三个节点:

1
2
3
4
5
6
7
8
[zk: localhost:2181(CONNECTED) 25] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 26] get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.56.100:9092"],"jmx_port":-1,"port":9092,"host":"192.168.56.100","version":5,"timestamp":"1711979088489"}
[zk: localhost:2181(CONNECTED) 27] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.56.101:9092"],"jmx_port":-1,"port":9092,"host":"192.168.56.101","version":5,"timestamp":"1711978899480"}
[zk: localhost:2181(CONNECTED) 28] get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.56.102:9092"],"jmx_port":-1,"port":9092,"host":"192.168.56.102","version":5,"timestamp":"1711979031132"}

简单使用

以下命令行中的命令均有大量附加的选项,如果想要查看某一个命令的详细用法,可以不带任何参数运行即可查看帮助。

创建主题

使用以下命令创建一个名为test的主题,并且设置副本数为1,分区数为1,其中副本数一定要小于broker的数量:

1
2
[root@localhost ~]# kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.

注:新版本可以使用--bootstrap-server代替--zookeeper来直接指定broker的地址,因为新版本的kafka可以不依赖于zookeeper,直接使用zookeeper内置的kraft进行元数据的存放和共享。

可以通过以下命令来查看kafka中目前存在的topic:

1
2
[root@localhost ~]# kafka-topics.sh --list --bootstrap-server localhost:9092
test

删除主题可以使用以下命令:

1
[root@localhost ~]# kafka-topics.sh --delete --topic test --bootstrap-server localhost:9092

发送消息

使用以下命令往名为test的topic中发送两条消息,如果topic不存在会自动创建:

1
2
3
[root@localhost ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic test
>msg1
>msg2

接收消息

使用以下命令从名为test的topic中接收消息,使用--from-beginning命令从头开始消费,否则默认消费新的消息。

1
2
3
[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
msg1
msg2

消费多主题

1
2
3
4
5
6
7
8
9
10
11
12
# 先往test2主题中发送两条消息
[root@localhost ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic test2
>test2-1
>test2-2
# 启动消费者从test和test2主题中从头消费消息
[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "test|test2" --from-beginning
test2-msg1
test2-msg2
msg1
msg2
msg3
msg4

多播消费

实现类似于发布订阅(pub-sub)的模式,针对kafka同一条消息只能被统一消费组下的某一个消费者消费的特性,要实现多播只要保证这些消费者属于不同的消费组即可。再增加一个消费者,给其定义一个新的消费组testGroup2,这样两个消费者客户端均可消费到消息。

1
2
3
4
5
6
7
8
9
10
11
# 消费者1,使用testGroup1消费组
[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=testGroup1
msg5

# 消费者2,使用testGroup1消费组
[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=testGroup1
# 未消费到消息

# 消费者3,使用testGroup2消费组
[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=testGroup1
msg5

可以使用以下命令来查看消费者的名称,可以看到我们一开始不指定消费组创建的消费者会有默认创建一个消费组:

1
2
3
4
[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
testGroup2
testGroup1
console-consumer-29003

可以使用以下命令来查看消费者的消费偏移量,其中current-offset表示当前消费组已消费的偏移量,log-end-offset表示当前分区最后一条消息的偏移量,lag表示当前消费组的未消费的消息量:

1
2
3
4
[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup1

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testGroup1 test 0 57 57 0 console-consumer-19b01122-2844-484d-a14f-f67b57f42cc8 /127.0.0.1 console-consumer

分区、副本的理解

分区

我们从上面提到的消费偏移量以及消费者指定从头消费可以看出,所有的消息在一个分区中是按序存储的,所以才会有偏移量和从头消费。而每个分区是一个单独的存储空间,一个Topic是一组分区的一个类别名称。

Topic内部分区数据日志
Partition 01 | 2 | 3 | 4 | 5 | 6 | 7 |
Partition 11 | 2 | 3 | 4 | 5 |
Partition 21 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 |

消息每次发到对应分区后在分区的最后追加写入,所以分区Partition是一个有序的消息对序列,这些消息在物理上按顺序添加到一个叫做commit log的日志文件中。每个partition中的消息都有一个唯一的编号,称之为offset,用来唯一标识某个分区中的message。

每个partiton都对应一个commit log文件。一个partition中的message的offset都是唯一的,但是不同的partition中的message的offset可能是相同的,如上面的表格所示。

kafka一般不会删除消息,不管这些消息有没有被消费,这样不会有消费后删除消息导致的磁盘随机读取,只会根据配置的日志保留时间(log.retention.hours)确认消息多久被删除,而因为消息是连续存储的,等到删除的时候根据时间序列找到需要删除的第一条消息再向后遍历即可做到快速删除,默认kafka保存最近一周的日志消息。kafka的性能与保留的日志数据量大小关系不大,因为一般从队尾消费只需要从最新的offset读取数据,而从某个offset开始消费需要的也仅仅是根据日志索引找到某一个offset的位置之后向后遍历即可,因此保存大量的数据消息日志不会有什么影响。

每个consumer是基于自己在commit log中的消费offset进行工作的,消费的offset由consumer自己来维护。所以消费者可以通过指定offset来重复消费某些消息或者跳过某些消息。也因此,kafka集群中comsumer的数量对集群的影响并不大。

副本

为了保证可用性,创建一个或多个副本来备份分区数据。我们以一个3个broker的集群来看看消息在分区+备份存储下的情况。

我们在之前搭建好的集群中创建一个4个分区2个副本的topic:

1
2
[root@localhost kafka1-logs]# kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 4 --topic test-multi-1
Created topic test-multi-1.

并查看这个topic的情况:

1
2
3
4
5
6
[root@localhost kafka1-logs]# kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-multi-1
Topic: test-multi-1 TopicId: MSzmgHCqQwef_SaRWPCbKw PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: test-multi-1 Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: test-multi-1 Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: test-multi-1 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: test-multi-1 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2

可以看到,我们创建了4个分区:

第一个分区(partition0):两个副本在broker0和broker1上,其中Leader为broker1。

第二个分区(partition1):两个副本在broker0和broker2上,其中Leader为broker0。

第三个分区(partition2):两个副本在broker1和broker2上,其中Leader为broker2。

第四个分区(partition3):两个副本在broker1和broker2上,其中Leader为broker1。

Leader节点负责处理这个分区的所有读写请求,Replicas副本会备份Leader的数据。Isr表示在副本中当前存活并且已经同步备份了的副本节点。

我们可以进去某一个服务器的kafka的日志目录下验证一下:

1
2
3
4
5
6
7
8
9
10
11
# 192.168.56.100
[root@localhost kafka1-logs]# cd /usr/local/data/kafka1-logs/
[root@localhost kafka1-logs]# ls
cleaner-offset-checkpoint __consumer_offsets-15 __consumer_offsets-22 __consumer_offsets-3 __consumer_offsets-37 __consumer_offsets-44 __consumer_offsets-7 test2-0
__consumer_offsets-0 __consumer_offsets-16 __consumer_offsets-23 __consumer_offsets-30 __consumer_offsets-38 __consumer_offsets-45 __consumer_offsets-8 test-multi-1-0
__consumer_offsets-1 __consumer_offsets-17 __consumer_offsets-24 __consumer_offsets-31 __consumer_offsets-39 __consumer_offsets-46 __consumer_offsets-9 test-multi-1-1
__consumer_offsets-10 __consumer_offsets-18 __consumer_offsets-25 __consumer_offsets-32 __consumer_offsets-4 __consumer_offsets-47 log-start-offset-checkpoint
__consumer_offsets-11 __consumer_offsets-19 __consumer_offsets-26 __consumer_offsets-33 __consumer_offsets-40 __consumer_offsets-48 meta.properties
__consumer_offsets-12 __consumer_offsets-2 __consumer_offsets-27 __consumer_offsets-34 __consumer_offsets-41 __consumer_offsets-49 recovery-point-offset-checkpoint
__consumer_offsets-13 __consumer_offsets-20 __consumer_offsets-28 __consumer_offsets-35 __consumer_offsets-42 __consumer_offsets-5 replication-offset-checkpoint
__consumer_offsets-14 __consumer_offsets-21 __consumer_offsets-29 __consumer_offsets-36 __consumer_offsets-43 __consumer_offsets-6 test-0

在56.100这台服务器上,也就是我们的broker0,应该只有分区0和分区1的数据。可以从日志文件中看到,文件夹下topic为test-multi-1的只有test-multi-1-0test-multi-1-1两个文件夹,这就是分区0和分区1的数据。同理,在56.101服务器上,也就是我们的broker1,只有test-multi-1-0test-multi-1-2两个文件夹。

进到test-multi-1-0目录下,可以看到多个日志文件,其中.log文件为消息日志文件,.index文件为基于offset的索引文件,.timeindex是基于时间的索引文件:

1
2
3
[root@localhost kafka1-logs]# cd test-multi-1-0/
[root@localhost test-multi-1-0]# ls
00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epoch-checkpoint partition.metadata

小结

可以这么来理解Topic,Partition和Broker:

1、一个topic,代表逻辑上的一个业务数据集,比如按数据库里不同表的数据操作消息区分放入不同topic,订单相关操作消息放入订单topic,用户相关操作消息放入用户topic。

2、对于大型网站来说,后端数据都是海量的,订单消息很可能是非常巨量的,比如有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可定会有容量限制问题,那么就可以在topic内部划分多个partition来分片存储数据,不同的partition可以位于不同的机器上,每台机器上都运行一个Kafka的进程Broker。每个partition还可以设置一个Leader和若干个Follower进行备份,partition的个数称为备份因子。

3、每一个partition可以理解为一个独立的队列,kafka可以使用一些机制保证这一组业务(topic)的各队列(partition)的消费平衡。

所以对Topic下数据进行分区存储的优点有以下两个: 1、commit log文件会受到所在机器的文件系统大小的限制,分区之后可以将不同的分区放在不同的机器上,相当于对数据做了分布式存储,理论上一个topic可以处理任意数量的数据。 2、为了提高并行度。

集群容灾

在上面创建的3个broker的集群中,我们通过zookeeper确认3个节点是否启动成功,注意,需要在3个节点的服务器上开启对应端口,包括kafka端口和zookeeper端口。

1
2
[zk: localhost:2181(CONNECTED) 15] ls /brokers/ids
[0, 1, 2]

我们创建一个新的topic,副本数设置为3,分区数设置为2,并查看topic的情况:

1
2
3
4
5
6
[root@localhost ~]# kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 2 --topic test-multi-1
Created topic test-multi-1.
[root@localhost ~]# kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-multi-1
Topic: test-multi-1 TopicId: uHfcZjY4T-SaBfR_ezgsXA PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: test-multi-1 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: test-multi-1 Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2

其中第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。

Leader节点负责给定partition的所有读写请求,同一个主题不同分区leader副本一般不一样(为了容灾)。

Replicas表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。

Isr 是Replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。

此时我们向这个Topic发送几条消息试一下,此时broker-list可以加上kafka集群中所有的节点:

1
2
3
[root@localhost ~]# kafka-console-producer.sh --broker-list 192.168.56.100:9092,192.168.56.101:9092,192.168.56.102:9092 --topic test-multi-1
>msg1
>msg2

再启动一个消费者去消费这个topic的消息:

1
2
3
[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server 192.168.56.100:9092,192.168.56.101:9092,192.168.56.102:9092 --topic test-multi-1 --from-beginning
msg1
msg2

由于分区0的Leader是2,我们关闭broker.id为2的节点,然后再查看topic信息。

1
2
3
Topic: test-multi-1     TopicId: uHfcZjY4T-SaBfR_ezgsXA PartitionCount: 2       ReplicationFactor: 3    Configs:
Topic: test-multi-1 Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0
Topic: test-multi-1 Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0

可以看到分区0的Leader已经自动变成了节点1,并且此时Isr中也没有了节点2,所以Leader的选举也是从Isr中选举出来的。

继续关闭broker.id为1的节点,然后查看topic信息。

1
2
3
4
[root@localhost ~]# kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test-multi-1
Topic: test-multi-1 TopicId: uHfcZjY4T-SaBfR_ezgsXA PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: test-multi-1 Partition: 0 Leader: 0 Replicas: 2,1,0 Isr: 0
Topic: test-multi-1 Partition: 1 Leader: 0 Replicas: 1,0,2 Isr: 0

此时,就只剩下broker.id为0的一个节点正在正常运行了。

继续执行发送消息,消费者能够正常消费到消息:

1
2
3
4
[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server 192.168.56.100:9092,192.168.56.101:9092,192.168.56.102:9092 --topic test-multi-1 --from-beginning
msg1
msg2
msg3 #新发送的消息

由此可见,log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。kafka 集群支持配置一个partition备份的数量。 针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。 leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果,不提供读写(主要是为了保证多 副本数据与消费的一致性)。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。

集群的生产与消费

生产者

生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过round-robin做简单的负载均衡,也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。

消费者

传统的消息队列传递模式有两种:队列(queue)和发布-订阅(publish-subscribe)。

  • queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer。
  • pub-sub模式:消息会被广播给所有的consumer。

为此Kafka提供了消费组(Consumer Group)的概念。

  • queue模式:所有consumer位于同一个consumer group下,同一个consumer group内只会有一个消费者消费到消息。
  • pub-sub模式:每个消费者都属于自己唯一的consumer group。

举例说明,由两个broker组成的kafka集群中的某一个topic拥有4个partition(p0、p1、p2、p3),分别位于不用的broker上,broker0分有p0和p2,broker1分有p1和p3。

此时一个消费组groupA下有2个消费者A1和A2从这个topic中消费消息,A1被分配到消费p0和p1分区的消息,A2被分配到消费p2和p3分区的消息。

消费组groupB下有5个消费者B1,B2,B3,B4,B5从这个topic中消费消息,B1-B4分别被分配到消费p0-p3的分区的消息,B5没有分区可以消费。

此时如果有一条消息发送到p0分区,那么A1和B1都将会消费到这条消息,满足了不同消费组都可以消费到消息,消费组内部仅一个消费者可以消费到消息。

通常一个topic会有多个消费组去消费,比如系统日志需要被异常收集的系统去统计异常,同时也要被用户活动记录分析的系统去统计用户画像,每一个消费组都是逻辑上的一个订阅者(Logical subscribe)。每个消费组里面由多个消费者组成,提供可扩展和容灾的保证。

消费顺序

要保证消费顺序,需要保证一个partition同一时刻在一个consumer group中只能有一个consumer在消费,并且即使这样,在高并发环境下依然无法保证顺序消费。

consumer group中的consumer数量不能比partition数量多,否则多出来的消费者消费不到消息。

Kafka并不能保证同一个topic内多个partition总的消费顺序是一致的,只能保证一个partition内部是顺序,所以保证总体上的顺序消费,只能把partition数量设置为1,同时由于partition数量的限制,consumer数量也只能是1。即使是这样,由于consumer在消费时会有多个线程去获取数据,依然无法保证在高并发情况下的顺序消费。就算非高并发下能满足需求,但是影响性能,浪费了Kafka为了高并发的设计,所以用Kafka实现顺序消费比较少见。