Kafka使用指南2——Java客户端以及Spring集成
目录
Kafka简介和安装以及原生命令行客户端
这块主要是一个Kafka所包含的内部元素的介绍以及简单的使用。
Java客户端以及Spring集成(本篇)
Java客户端其实就是原生命令行的一种封装,学习如何使用即可,Spring就是在这个基础上再进行了对象的管理,使用起来并不难。
Kafka集群架构设计
Kafka在设计之初就是为了高吞吐、高性能、高可扩展,所以它的集群架构是非常值得学习的。
Kafka日志索引详解
Broker能够高效地处理和保存消息,是Kafka高性能的保障。我们从可见的log文件入手,去研究一下Kafka是如何保证消息高效的流转。
主要内容
这一部分主要是介绍Kafka在Java客户端上的使用,包括Java客户端和Spring的集成,从客户端的角度去更深入的理解Kafka。同时还会有不同调用参数的介绍,用来实现不同的业务细节。
基础的客户端
要把Kafka在Java中使用起来是非常简单的,只需要引入一个maven依赖即可:
1 | <dependency> |
构建Topic,该Topic有两个分区:
1 | [root@localhost ~]# kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 2 --topic test-java-client |
消息生产者
整体来说,构建一个生产者Producer的过程分为三个步骤:
- 设置Producer的核心属性:Producer所有的可选属性都由
ProducerConfig
类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
代表了服务器地址,这边是Kafka服务器的地址。对于大部分重要的属性,在ProducerConfig
类中都写了对应参数名+_Doc
的注解,介绍还是非常详细的。 - 构建消息:作为内容的载体,Kafka的消息结构为Key-Value形式的
ProducerRecord
类。其中Key会用于在Topic中分配Partition,而Value就是具体的消息内容。 - 使用Producer发送消息:发送消息一般有单向发送、同步发送和异步发送三种,对应着不同的安全性和效率。
设置Producer的核心属性
1 | Properties props = new Properties(); |
构建消息
1 | private static final String TOPIC = "test-java-client"; |
发送消息
单向发送
这种发送方式不关心服务端的应答,是速度最快的模式,但是如果消息未发送成功也不会管。
1 | producer.send(record); |
同步发送
这种发送方式会阻塞当前线程,等待发送成功之后才会返回RecordMetadata
类,可以从返回对象中拿出对应的分区和offset信息。
1 | RecordMetadata recordMetadata = producer.send(record).get(); |
异步发送
这种发送方式发送消息后不阻塞,服务端有应答后会触发回调函数。同样,返回的RecordMetadata
类包含对应的分区和offset信息。
1 | // 需要定义一个CountDownLatch判断消息是否都发送成功 |
尝试执行生产者发送5条消息,输出控制台如下:
1 | metadata:[test-java-client-0@0] sent with topic:test-java-client; partition:0;offset:0 |
可以看到消息被均匀的发送到了分区0和分区1中。
消息消费者
使用Kafka提供的Consumer类进行快速的消息消费,主要过程也分为三个步骤:
- 设置Consumer的核心属性:Consumer所有的可选属性都由
ConsumerConfig
类管理。同样比如ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
代表了服务器地址,这边是Kafka服务器的地址。对于大部分重要的属性,在ConsumerConfig
类中也都写了对应参数名+_Doc
的注解,介绍还是非常详细的。 - 拉取消息:Kafka采用消费者主动pull的模式去拉取消息,由消费者去决定自己拉取哪一部分消息。
- 处理消息并提交偏移量:消费者拉取到消息之后就可以由业务处理消息了,处理完成之后需要向Broker提交偏移量Offset,这样才能告诉Broker消息消费成功,否则Broker会认为消息消费失败。
设置Consumer的核心属性
1 | Properties props = new Properties(); |
拉取消息并处理
1 | while (true) { |
提交Offset
1 | //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。 |
尝试使用两个消费者(因为有个两个分区,可以启动两个消费者,数量关系具体在下一个小节说明)消费刚刚生产者发送的5条消息:
1 | consumer1: |
可以看到消费者1分配到去消费分区0的消息,而消费者2分配到去消费分区1的消息。如果只启动一个消费者的话,那么该消费者会消费到两个分区所有的消息。
从生产者和消费者的属性设置来理解Kafka工作机制
消费者分组机制
我们在第一篇文章就提到了每一个消费者都有一个对应的消费组,可能一个消费组下有多个消费者,也可能只有一个消费者。
在Consumer的属性中,有一个ConsumerConfig.GROUP_ID_CONFIG
属性就是用来标识消费组的,在需要使用消费者管理功能或者Kafka提供的Offset管理策略时,ConsumerConfig.GROUP_ID_CONFIG
是必填的。还有一个GROUP_INSTANCE_ID_DOC
属性可以给消费者一个唯一的标识,一个消费组只会有一个该标识的消费者,这个属性设置可以有效的防止特殊性情况下消费组内不必要的rebalance(比如重启)。
生产者往一个Topic发送消息时,会尽量均匀地把消息发送的每个Partition中。Partition接收到消息会推送给对应的消费组,每个消费组只会推送一份,也就是一个消费组中只有一个消费组可以接收到消息,而不同的消费组可以都接受到消息。与之相关的还有Offset偏移量,这个偏移量记录了一个消费组在Partition中的消费进度。
我们将两个消费者的GROUP_INSTANCE_ID_DOC
属性分别设置为id1
和id2
,启动消费者。使用--describe
命令查看消费组信息,可以看到test-group-1
消费组下两个消费者的消费情况。
1 | [root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group-1 |
其中CURRENT-OFFSET
就是当前消费的偏移量,LEO
在上一篇文章中提到过,就是总的消息偏移量,LAG
是还剩多少消息未消费。
CURRENT-OFFSET
需要消费组消费之后主动向Broker提交,提交完成之后Broker会更新这个Partition的消费进度,表示这个消息已经被处理完成。若消费者没有提交Offset,那么Broker会认为这个消息没有处理过,会往对应消费组重新推送,不过会尽量推送给这个消费组中的其他消费者。
在文章上一节中的示例代码中,我们使用了手动提交Offset的方法,也可以通过设置ENABLE_AUTO_COMMIT_CONFIG
为true
来开启自动提交。
Offset的存储
Offset是和Partition和Group相关的,所以一个Partition一个Group只能记录一个Offset。由此可以推导出,一个Partition只能有一个消费者进行消费。所以在一个有4个副本的Topic下,一个消费组最多能有4个消费者实例,多的消费者是无法订阅到消息的。
Offset是在服务端管理的,但是Offset的更新是由客户端发起的。AUTO_OFFSET_RESET_CONFIG
参数可以设置服务端在客户端提交的Offset不存在(如数据文件被删除)时的处理方式。设置为earliest
将Offset设置为最早的Offset,设置为latest
将Offset设置为最新的Offset,设置为none
向客户端抛出异常。
Offset的更新
在文章上一节中的示例代码中,我们展示了手动提交Offset的两种方法:同步提交和异步提交。
同步提交可以保证提交成功Offset之后才确认处理完成,但是会导致消息速度降低。
异步提交可以提交处理效率,但是如果处理完成之后提交Offset失败,会导致重复消费。
当然追求效率还可以在接收到消息马上异步提交,同时处理消息,但是如果消息处理失败就会丢失这条消息。
消费者各种处理Offset的方式各有优缺点,甚至还可以将Offset自行用Redis处理,根据自己的业务处理来更新Offset向前进。
生产者拦截机制
在消息生产者中,INTERCEPTOR_CLASSES_CONFIG
属性可以设置自定义拦截器类。
1 | public class MyInterceptor implements ProducerInterceptor<String, String> { |
在设置了拦截器之后,我们使用异步发送重新调用生产者发送消息。根据控制台打印的顺序,可以清楚地看到拦截器在整个流程中进行拦截的节点:
1 | 处理配置项 |
可以设置多个拦截器,用逗号隔开。
拦截器可以用来给消息批量赋值,比如给一个对象设置一个发送时间的属性。
消息序列化机制
在设置Producer的属性的时候,有两个属性ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
和ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
,分别对应了消息Key的序列化和Value的序列化。
我们在使用消息队列时,虽然消息是String的可能性非常大,但是还是有很多场景需要传输pojo类型。如果需要把数据进行网络传输到Kafka,那么就需要把这个对象转换成byte[]进行传输,对于常见的类型如String,Double,Long等都实现了默认的序列化类,而业务上自己的对象就需要自行编写序列化类。
假设拥有一个User对象,姓名:张三,年龄:18,性别:男。我们可以简单粗暴的使用json转换,但是更加效率的方法可以是将数据拼成一个字符串,如value=男018张三
。性别只会有男/女,年龄也不会超过999,而姓名放在最后是因为长度不确定,可以从年龄的后一位一直读到最后。经过这样的处理,value值就会尽可能的缩小,提高传输效率和Kafka存储效率。
其他对于非定长的属性也可以使用数据长度+数据的形式,如value=4:male,2:18,7:zhangsan,8:engineer,
表示性别:男,年龄:18,姓名:张三,职业:工程师。
在生产者设置了序列化方式之后,对应的在消费者也需要反序列化,根据value的特定格式去将byte[]还原成业务对象。
序列化机制在高并发场景中非常重要的优化机制,除了Kafka,像Netty在进行网络调用时也会定制化序列化机制防止粘包。
消息分区路由机制
在前面的示例中,多分区的Topic的每个分区总是会分到数量均衡的消息,那么消息是如何分区发送的,而消费者又是怎么选择分区去消费的呢?
我们把问题分为两个:
- Producer根据Key将消息分发到不同的Partition上
- Consumer选择消费特定的Partition的消息
分发路由
首先我们来看生产者端。在Producer中我们可以指定一个Partitioner对象来对消息进行分配。使用ProducerConfig.PARTITIONER_CLASS_CONFIG
参数来指定个性化的分发实现类。
Kafka之前提供了三种实现机制,分别是RoundRobinPartitioner
,DefaultPartitioner
和UniformStickyPartitioner
,目前后两个已经被标记为过时,默认使用BuiltInPartitioner
(默认实现类没有继承Partitioner接口)。
默认的BuiltInPartitioner
使用了Sticky的策略,如果在给一个生产者分配了一个分区后,会尽可能一直使用这个分区。等待该分区的batch.size(默认16K)已满,或者这个分区的消息已完成linger.ms
(默认0毫秒,表示如果batch.size迟迟没有满后的等待时间)后批量发送消息,之后重新选择分区。
RoundRobinPartitioner
是在各个Partition中进行轮询发送,这种方式没有考虑到消息大小以及各个Broker性能差异,用得比较少。
另外可以自行指定一个Partitioner实现类,定制分区逻辑。在Partitioner接口中,核心要实现的就是partition
方法。根据相关信息,选择一个Partition。比如用key对partition的个数取模之类的。而Topic下的所有Partition信息都在cluster参数中。
1 | // 获取所有的Partition信息。 |
消费绑定
消费者端可以使用ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
参数来指定个性化的分配策略。查看抽象类AbstractPartitionAssignor
的实现类,可以看到Kafka默认实现了4种策略:
- RangeAssignor:范围策略,比如一个Topic有10个Partiton(partition 0~9),一个消费者组下有三个 Consumer(consumer13)。Range策略就会将分区03分给一个Consumer,46给一个Consumer,79给一个Consumer。
- RoundRobinAssignor:轮询分配策略,可以理解为在Consumer中一个一个轮流分配分区。比如0,3,6,9分区给一个Consumer,1,4,7分区给一个Consumer,然后2,5,8给一个Consumer。
- StickyAssignor:粘性策略,第一,在开始分区时,尽量保持分区的分配均匀。比如按照Range策略分(这一步实际上是随机的)。 第二、分区的分配尽可能的与上一次分配的保持一致。比如在range分区的情况下,第三个Consumer的服务宕机了,那么按照sticky策略,就会保持consumer1和consumer2原有的分区分配情况。然后将consumer3分配的7~9分区尽量平均的分配到另外两个consumer上。这种粘性策略可以很好的保持Consumer的数据稳定性,不需要将所有的Comsumer重新分配。
默认情况下,Kafka使用了RangeAssignor,CooperativeStickyAssignor两种分配策略。同时我们也可以实现AbstractPartitionAssignor
类来个性化分配。比如,我们在配置消费者服务器时性能差异明显,那么我们就可以让性能比较高的消费者处理更多的分区消息。这些各种实现最终的目的都是为了消费者更均匀的消费,提高整体的消费速度。
生产者消息缓存机制
生产者在对消息分区完成之后,就要准备向服务端发送消息了,为了提升发送消息的速度,Kafka的发送的时候也做了优化。
类似于后端执行sql时使用批处理一下,为了减少段时间的网络请求次数,在短时间内发往Kafka的消息会被缓存起来一起批量发送。这种缓存并批量发送的方法在高并发的设计中非常常见。
具体在Kafka客户端中,负责实现这部分功能的是KafkaProducer里的accumulator
和sender
组件。
accumulator缓存
accumulator
是发送的累加器,发送消息都会在这里面缓存起来。在KafkaProducer
类的构造方法里,可以看到对accumulator
的初始化,其中batchSize
表示批量发送(缓存)的最多消息数量。
1 | // As per Kafka producer configuration documentation batch.size may be set to 0 to explicitly disable |
在分区的实现类完成分区之后,会调用org.apache.kafka.clients.producer.internals.RecordAccumulator#append
方法,使用accumulator
组件把消息添加到缓存中。
在accumulator
中,根据每一个Partition维护一个Deque来保存消息:
1 | Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>()); |
每个Deque就会存放对应所有的发送批次,也就是一个或多个ProducerBatch
。有一条消息需要被发送时,会加到Deque中的某个ProducerBatch
的recordsBuilder
变量的DataOutputStream
中,最终被sender
组件发送。以下是ProducerBatch
类中调用org.apache.kafka.common.record.MemoryRecordsBuilder#append(long, byte[], byte[], org.apache.kafka.common.header.Header[])
方法添加具体消息的代码:
1 | private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, |
在accumulator
的设置中,有几个变量值得注意一下。ProducerConfig.BUFFER_MEMORY_CONFIG
用来设置整个accumulator
的缓存大小,可以在构建RecordAccumulator
的时候看到我们使用了这个参数。而另一个使用的参数BATCH_SIZE_CONFIG
就是指定了每一个ProducerBatch
的大小,在一个ProducerBatch
中的消息的总大小要超过这个值时就会触发发送。
接下来,sender
就是KafkaProducer
中用来发送消息的一个单独的线程。从这里可以看到,每个 KafkaProducer
对象都对应一个sender
线程。他会负责将RecordAccumulator
中的消息发送给Kafka。
sender发送器
sender
是用来把accumulator
中的消息发送到Kafka的执行器,是一个独立于Producer的线程,每一个Producer都会有一个sender。在KafkaProducer
类的构造方法里,可以看到对sender
的初始化。因为sender
是最终与服务端交互的,可以看到初始化的参数中有服务端的相关信息:
1 | List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config); |
sender
也并不是一次就把RecordAccumulator
中缓存的所有消息都发送出去,而是每次只拿一部分消息。他只获取RecordAccumulator
中缓存内容达到BATCH_SIZE_CONFIG
大小的ProducerBatch
消息。当然,如果消息比较少,ProducerBatch
中的消息大小长期达不到BATCH_SIZE_CONFIG
的话,sender
也不会一直等待,最多等待LINGER_MS_CONFIG
时长,然后就会将ProducerBatch
中的消息读取出来。
因为Sender
类是一个单独的线程,所以必然继承了Thread类或Runable接口,我们可以在Sender
类中找到run
方法查看其具体的实现逻辑。
在org.apache.kafka.clients.producer.internals.Sender#runOnce
方法中,调用了org.apache.kafka.clients.producer.internals.Sender#sendProducerData
方法。该方法中调用accumulator
的ready
方法判断哪些partitions是已经准备好的,再调用drain
方法获取对应所有准备好的ProducerBatch
,把这些ProducerBatch
加入到inflightBatchList
变量中等待发送:
1 | private long sendProducerData(long now) { |
最后加入到inflightBatchList
变量的消息在sendProduceRequests
方法中交给KafkaClient client
这个selector去执行发送。
因为sender
是独立线程,所以每次Producer执行doSend()
方法最后都会在需要发送的时候调用this.sender.wakeup();
方法去触发selector的发送。
Kafka的生产者缓存机制是Kafka面对海量消息时非常重要的优化机制。合理优化这些参数,对于Kafka集群性能提升是非常重要的。比如,如果你的消息体比较大,那么应该考虑加大batch.size
,尽量提升batch的缓存效率。而如果Producer要发送的消息确实非常多,那么就需要考虑加大total.memory
参数,尽量避免缓存不够造成的阻塞。如果发现生产者发送消息比较慢,那么可以考虑提升max.in.flight.requests.per.connection
参数,这样能加大消息发送的吞吐量。
发送应答机制
和其他消息队列一样,生产者发送的消息需要应答机制才可以确保消息被正确的发送到服务端。
我们在上一小节介绍sender
组件的代码中可以看到,初始化sender
传入了一个ack
参数,这个是获取的配置项acks
的值。官方在ProducerConfig#ACKS_DOC
中给出了相当详细的说明:
- acks=0:生产者不关心Broker是否将消息写入Partition,只管发送消息。此时的吞吐量最高,但是数据安全性最低。
- acks=all/-1:生产者需要等Broker端所有的Partition(Leader和Follower)都写完了消息才会返回结果,此时数据安全性最高,但是每次发送消息需要等待更长的时间。
- acks=1,折中的方案。Broker在Leader Partition写成功数据之后返回结果。
我们在acks为0的时候,无法从RecordMetadata获取offset信息。
在生产环境中,acks=0的可靠性太差,基本不使用。acks=1一般用于日志传输,允许个别数据丢失但是吞吐量巨大的场景。acks=-1一般用于金融相关的敏感数据,必须保证数据安全性。
如果acks=-1,Kafka也不是强制所有的Partition都写完下次才会返回结果,根据Broker配置文件broker.conf
中的参数min.insync.replicas
,控制Broker在写入多少个Partition之后给Producer响应。
生产者消息幂等性
在ProducerConfig#ACKS_DOC
中,最后一句写到:
1 | Note that enabling idempotence requires this config value to be 'all'. |
其中idempotence
就是幂等性。当acks=-1或1时,Producer每次发送消息都是需要获取Broker端返回的RecordMetadata的。如果要保证消息安全,那么对于每次消息的传递,发送给Broker和返回给Producer两次网络传输必须是幂等的。如果Producer没有接收到Broker的返回,那么就会认为这条消息发送失败,此时Producer就会根据重试次数(参数ProducerConfig.RETRIES_CONFIG
)进行重发。此时就可能出现消息被重复发送到Broker的问题,那么Kafka是如何解决的呢?
我们先查看一下幂等性相关的属性:
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
:如果设置为true
则开启幂等性,能够保证消息发送到Broker只有一份。
这里的只有一份官方解释的英文用的是exactly one
。相应的,分布式数据传递过程中有三种数据语义:at-least-once
:至少一次;at-most-once
:最多一次;exactly once
:精确一次。
从字面意思可以知道,最多一次和至少一次都是不精确的,在部分业务场景下是可以使用的,并且只实现一种相对比较简单。但是对于一些敏感数据,必须要求不重复也不丢失,此时就需要去实现精确一次。
在Kafka中at-most-once
可以通过ack=0
实现,at-least-once
可以通过ack=-1
实现,exactly once
就需要设置idempotence
为true
实现。
为了实现exactly-once
,Kafka做了以下几个设计:
- PID:对于每一个Producer在初始化过程中会被分配一个唯一的PID。
- SN:Sequence Number。对于每一个PID,这个Producer针对Partition会维护一个SN。这是一个从0开始单调递增的数字。当Producer要往同一个Partition发送消息时,这个SN就会加1。然后会随着消息一起发往Broker。
- Broker端则会针对每个<PID,Partition>维护一个序列号(SN),只有当对应的SequenceNumber = SN+1时,Broker才会接收消息,同时将SN更新为SN+1。否则,SequenceNumber过小就认为消息已经写入了,不需要再重复写入,直接应答即可。而如果SequenceNumber过大,就会认为中间可能有数据丢失了。对生产者就会抛出一个OutOfOrderSequenceException。
所以Kafka在打开idempotence
之后,Broken就会保证一条消息最多只保存一条,实现at-most-once
,再加上acks=1或-1可以实现at-least-once
,这样就整体上保证了exactly-once
。
生产者消息事务
消息幂等性可以保证一条消息发送到对应的Partition是安全的,那么如果是多条消息要发送到不同的Partition,此时要保证这一组消息的原子性,即全部成功才成功,就需要用到事务。
使用事务的英文transaction
可以在KafkaProducer
类中搜到如下几个方法:
1 | // 1 初始化事务 |
其中在initTransactions
方法的说明中,官方写到如果要使用事务,那么必须给生产者设置transactional.id
。如果当前一个Producer的事务没有提交,而另一个新的Producer保持相同的transactional.id
,这时旧的生产者会立即失效,无法继续发送消息。如果当前一个Producer宕机了但是事务没有提交,新的transaction.id
相同的Producer会对旧事务补齐,要么提交事务,要么终止事务。这样新的Producer就可以继续正常工作。
所以,使用以下的方法去发送消息是比较安全的。
1 | public class MyTransactionProducer { |
生产者的事务消息机制保证了Producer发送消息的安全性,但是,他并不保证已经提交的消息就一定能被所有消费者消费。
SpringBoot集成Kafka
这部分的应用本来就非常简单,而且他的本质也是在框架中构建Producer和Consumer。当了解了 kafka的核心消息流转流程,对这些应用参数就可以进行合理的组装。
pom中引用Kafka
1 | <dependency> |
配置springboot配置文件
1 | spring.application.name=springboot-client |
这些配置文件在基础的客户端中其实都可以找到对应的说明,在学习了基础的之后再学习配置并不难。
应用中使用框架注入的KafkaTemplate发送消息
1 |
|
使用@KafkaListener注解声明消息消费者
1 |
|