Kafka使用指南2——Java客户端以及Spring集成


目录

  1. Kafka简介和安装以及原生命令行客户端

    这块主要是一个Kafka所包含的内部元素的介绍以及简单的使用。

  2. Java客户端以及Spring集成(本篇)

    Java客户端其实就是原生命令行的一种封装,学习如何使用即可,Spring就是在这个基础上再进行了对象的管理,使用起来并不难。

  3. Kafka集群架构设计

    Kafka在设计之初就是为了高吞吐、高性能、高可扩展,所以它的集群架构是非常值得学习的。

  4. Kafka日志索引详解

    Broker能够高效地处理和保存消息,是Kafka高性能的保障。我们从可见的log文件入手,去研究一下Kafka是如何保证消息高效的流转。

主要内容

这一部分主要是介绍Kafka在Java客户端上的使用,包括Java客户端和Spring的集成,从客户端的角度去更深入的理解Kafka。同时还会有不同调用参数的介绍,用来实现不同的业务细节。


基础的客户端

要把Kafka在Java中使用起来是非常简单的,只需要引入一个maven依赖即可:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>

构建Topic,该Topic有两个分区:

1
[root@localhost ~]# kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 2 --topic test-java-client

消息生产者

整体来说,构建一个生产者Producer的过程分为三个步骤:

  1. 设置Producer的核心属性:Producer所有的可选属性都由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG代表了服务器地址,这边是Kafka服务器的地址。对于大部分重要的属性,在ProducerConfig类中都写了对应参数名+_Doc的注解,介绍还是非常详细的。
  2. 构建消息:作为内容的载体,Kafka的消息结构为Key-Value形式的ProducerRecord类。其中Key会用于在Topic中分配Partition,而Value就是具体的消息内容。
  3. 使用Producer发送消息:发送消息一般有单向发送、同步发送和异步发送三种,对应着不同的安全性和效率。

设置Producer的核心属性

1
2
3
4
5
6
7
8
9
10
Properties props = new Properties();
// 配置kafka的端口,集群可以节点全配置也可以配置其中的部分节点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// 配置个性化拦截器类,可以在发送的三个环节(doSend/onAcknowledgement/close)进行自定义的操作
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.zm.basic.MyInterceptor");
// 配置key的序列化类
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 配置value的序列化类
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = new KafkaProducer<>(props);

构建消息

1
2
private static final String TOPIC = "test-java-client";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer Send Msg" + i);

发送消息

单向发送

这种发送方式不关心服务端的应答,是速度最快的模式,但是如果消息未发送成功也不会管。

1
2
producer.send(record);
System.out.println("message " + i + " sent");

同步发送

这种发送方式会阻塞当前线程,等待发送成功之后才会返回RecordMetadata类,可以从返回对象中拿出对应的分区和offset信息。

1
2
3
4
5
6
RecordMetadata recordMetadata = producer.send(record).get();
String topic = recordMetadata.topic();
int partition = recordMetadata.partition();
long offset = recordMetadata.offset();
String metadata = recordMetadata.toString();
System.out.println("metadata:["+ metadata+"] sent with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);

异步发送

这种发送方式发送消息后不阻塞,服务端有应答后会触发回调函数。同样,返回的RecordMetadata类包含对应的分区和offset信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 需要定义一个CountDownLatch判断消息是否都发送成功
CountDownLatch latch = new CountDownLatch(5);
// 构建消息
//...
// 发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
System.out.println("消息发送失败,"+e.getMessage());
e.printStackTrace();
}else{
String topic = recordMetadata.topic();
int partition = recordMetadata.partition();
long offset = recordMetadata.offset();
String metadata = recordMetadata.toString();
System.out.println("metadata:["+ metadata+"] sent with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);
}
latch.countDown();
}
});
// 消息处理完才停止发送者。
latch.await(); // 会阻塞直到countDown到0
producer.close();

尝试执行生产者发送5条消息,输出控制台如下:

1
2
3
4
5
metadata:[test-java-client-0@0] sent with topic:test-java-client; partition:0;offset:0
metadata:[test-java-client-0@1] sent with topic:test-java-client; partition:0;offset:1
metadata:[test-java-client-1@0] sent with topic:test-java-client; partition:1;offset:0
metadata:[test-java-client-1@1] sent with topic:test-java-client; partition:1;offset:1
metadata:[test-java-client-1@2] sent with topic:test-java-client; partition:1;offset:2

可以看到消息被均匀的发送到了分区0和分区1中。

消息消费者

使用Kafka提供的Consumer类进行快速的消息消费,主要过程也分为三个步骤:

  1. 设置Consumer的核心属性:Consumer所有的可选属性都由ConsumerConfig类管理。同样比如ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG代表了服务器地址,这边是Kafka服务器的地址。对于大部分重要的属性,在ConsumerConfig类中也都写了对应参数名+_Doc的注解,介绍还是非常详细的。
  2. 拉取消息:Kafka采用消费者主动pull的模式去拉取消息,由消费者去决定自己拉取哪一部分消息。
  3. 处理消息并提交偏移量:消费者拉取到消息之后就可以由业务处理消息了,处理完成之后需要向Broker提交偏移量Offset,这样才能告诉Broker消息消费成功,否则Broker会认为消息消费失败。

设置Consumer的核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Properties props = new Properties();
// 配置kafka地址,和生产者一样
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// 每个消费者要指定一个group
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-1");
// 配置key序列化类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置value序列化类
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 1.直接设置订阅的主题
consumer.subscribe(Arrays.asList(TOPIC));
// 2.如需可以自行调整Offset
// 设置分配分区
// List<TopicPartition> topicPartitionList = new ArrayList<>();
// topicPartitionList.add(new TopicPartition(TOPIC,0));
// topicPartitionList.add(new TopicPartition(TOPIC,1));
// consumer.assign(topicPartitionList);
// 自行调整Offset从头开始消费
// consumer.seekToBeginning(topicPartitionList);

拉取消息并处理

1
2
3
4
5
6
7
while (true) {
// 100毫秒超时时间
ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition = " + record.partition() + "; offset = " + record.offset() + "; key = " + record.key() + "; value= " + record.value());
}
}

提交Offset

1
2
3
4
//同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
consumer.commitSync();
//异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
consumer.commitAsync();

尝试使用两个消费者(因为有个两个分区,可以启动两个消费者,数量关系具体在下一个小节说明)消费刚刚生产者发送的5条消息:

1
2
3
4
5
6
7
8
consumer1:
partition = 0; offset = 2; key = 0; value= MyProducer0
partition = 0; offset = 3; key = 2; value= MyProducer2

consumer2:
partition = 1; offset = 3; key = 1; value= MyProducer1
partition = 1; offset = 4; key = 3; value= MyProducer3
partition = 1; offset = 5; key = 4; value= MyProducer4

可以看到消费者1分配到去消费分区0的消息,而消费者2分配到去消费分区1的消息。如果只启动一个消费者的话,那么该消费者会消费到两个分区所有的消息。

从生产者和消费者的属性设置来理解Kafka工作机制

消费者分组机制

我们在第一篇文章就提到了每一个消费者都有一个对应的消费组,可能一个消费组下有多个消费者,也可能只有一个消费者。

在Consumer的属性中,有一个ConsumerConfig.GROUP_ID_CONFIG属性就是用来标识消费组的,在需要使用消费者管理功能或者Kafka提供的Offset管理策略时,ConsumerConfig.GROUP_ID_CONFIG是必填的。还有一个GROUP_INSTANCE_ID_DOC属性可以给消费者一个唯一的标识,一个消费组只会有一个该标识的消费者,这个属性设置可以有效的防止特殊性情况下消费组内不必要的rebalance(比如重启)。

生产者往一个Topic发送消息时,会尽量均匀地把消息发送的每个Partition中。Partition接收到消息会推送给对应的消费组,每个消费组只会推送一份,也就是一个消费组中只有一个消费组可以接收到消息,而不同的消费组可以都接受到消息。与之相关的还有Offset偏移量,这个偏移量记录了一个消费组在Partition中的消费进度。

我们将两个消费者的GROUP_INSTANCE_ID_DOC属性分别设置为id1id2,启动消费者。使用--describe命令查看消费组信息,可以看到test-group-1消费组下两个消费者的消费情况。

1
2
3
4
[root@localhost ~]# kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group-1
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group-1 test-java-client 1 3 3 0 id2-c0130d1e-45f2-4020-8903-846919537124 /192.168.56.1 consumer-test-group-1-id2
test-group-1 test-java-client 0 2 2 0 id1-5fd9b499-7329-4aaa-86f8-08f779a404f7 /192.168.56.1 consumer-test-group-1-id1

其中CURRENT-OFFSET就是当前消费的偏移量,LEO在上一篇文章中提到过,就是总的消息偏移量,LAG是还剩多少消息未消费。

CURRENT-OFFSET需要消费组消费之后主动向Broker提交,提交完成之后Broker会更新这个Partition的消费进度,表示这个消息已经被处理完成。若消费者没有提交Offset,那么Broker会认为这个消息没有处理过,会往对应消费组重新推送,不过会尽量推送给这个消费组中的其他消费者。

在文章上一节中的示例代码中,我们使用了手动提交Offset的方法,也可以通过设置ENABLE_AUTO_COMMIT_CONFIGtrue来开启自动提交。

Offset的存储

Offset是和Partition和Group相关的,所以一个Partition一个Group只能记录一个Offset。由此可以推导出,一个Partition只能有一个消费者进行消费。所以在一个有4个副本的Topic下,一个消费组最多能有4个消费者实例,多的消费者是无法订阅到消息的。

Offset是在服务端管理的,但是Offset的更新是由客户端发起的。AUTO_OFFSET_RESET_CONFIG参数可以设置服务端在客户端提交的Offset不存在(如数据文件被删除)时的处理方式。设置为earliest将Offset设置为最早的Offset,设置为latest将Offset设置为最新的Offset,设置为none向客户端抛出异常。

Offset的更新

在文章上一节中的示例代码中,我们展示了手动提交Offset的两种方法:同步提交和异步提交。

同步提交可以保证提交成功Offset之后才确认处理完成,但是会导致消息速度降低。

异步提交可以提交处理效率,但是如果处理完成之后提交Offset失败,会导致重复消费。

当然追求效率还可以在接收到消息马上异步提交,同时处理消息,但是如果消息处理失败就会丢失这条消息。

消费者各种处理Offset的方式各有优缺点,甚至还可以将Offset自行用Redis处理,根据自己的业务处理来更新Offset向前进。

生产者拦截机制

在消息生产者中,INTERCEPTOR_CLASSES_CONFIG属性可以设置自定义拦截器类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MyInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord record) {
System.out.println("发送时拦截:key=" + record.key() + ",partition=" + record.partition() + ",topic=" + record.topic() + ",value=" + record.value());
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("发送确认时拦截:partition=" + metadata.partition() + ",topic=" + metadata.topic() + ",offset=" + metadata.offset());
}

@Override
public void close() {
System.out.println("关闭连接时拦截");
}

@Override
public void configure(Map<String, ?> configs) {
System.out.println("处理配置项");
}
}

在设置了拦截器之后,我们使用异步发送重新调用生产者发送消息。根据控制台打印的顺序,可以清楚地看到拦截器在整个流程中进行拦截的节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
处理配置项
发送时拦截:key=0,partition=null,topic=test-java-client,value=MyProducer0
发送时拦截:key=1,partition=null,topic=test-java-client,value=MyProducer1
发送时拦截:key=2,partition=null,topic=test-java-client,value=MyProducer2
发送时拦截:key=3,partition=null,topic=test-java-client,value=MyProducer3
发送时拦截:key=4,partition=null,topic=test-java-client,value=MyProducer4
发送确认时拦截:partition=0,topic=test-java-client,offset=4
metadata:[test-java-client-0@4] sent with topic:test-java-client; partition:0;offset:4
发送确认时拦截:partition=0,topic=test-java-client,offset=5
metadata:[test-java-client-0@5] sent with topic:test-java-client; partition:0;offset:5
发送确认时拦截:partition=1,topic=test-java-client,offset=6
metadata:[test-java-client-1@6] sent with topic:test-java-client; partition:1;offset:6
发送确认时拦截:partition=1,topic=test-java-client,offset=7
metadata:[test-java-client-1@7] sent with topic:test-java-client; partition:1;offset:7
发送确认时拦截:partition=1,topic=test-java-client,offset=8
metadata:[test-java-client-1@8] sent with topic:test-java-client; partition:1;offset:8
关闭连接时拦截

可以设置多个拦截器,用逗号隔开。

拦截器可以用来给消息批量赋值,比如给一个对象设置一个发送时间的属性。

消息序列化机制

在设置Producer的属性的时候,有两个属性ProducerConfig.KEY_SERIALIZER_CLASS_CONFIGProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,分别对应了消息Key的序列化和Value的序列化。

我们在使用消息队列时,虽然消息是String的可能性非常大,但是还是有很多场景需要传输pojo类型。如果需要把数据进行网络传输到Kafka,那么就需要把这个对象转换成byte[]进行传输,对于常见的类型如String,Double,Long等都实现了默认的序列化类,而业务上自己的对象就需要自行编写序列化类。

假设拥有一个User对象,姓名:张三,年龄:18,性别:男。我们可以简单粗暴的使用json转换,但是更加效率的方法可以是将数据拼成一个字符串,如value=男018张三。性别只会有男/女,年龄也不会超过999,而姓名放在最后是因为长度不确定,可以从年龄的后一位一直读到最后。经过这样的处理,value值就会尽可能的缩小,提高传输效率和Kafka存储效率。

其他对于非定长的属性也可以使用数据长度+数据的形式,如value=4:male,2:18,7:zhangsan,8:engineer,表示性别:男,年龄:18,姓名:张三,职业:工程师。

在生产者设置了序列化方式之后,对应的在消费者也需要反序列化,根据value的特定格式去将byte[]还原成业务对象。

序列化机制在高并发场景中非常重要的优化机制,除了Kafka,像Netty在进行网络调用时也会定制化序列化机制防止粘包。

消息分区路由机制

在前面的示例中,多分区的Topic的每个分区总是会分到数量均衡的消息,那么消息是如何分区发送的,而消费者又是怎么选择分区去消费的呢?

我们把问题分为两个:

  • Producer根据Key将消息分发到不同的Partition上
  • Consumer选择消费特定的Partition的消息

分发路由

首先我们来看生产者端。在Producer中我们可以指定一个Partitioner对象来对消息进行分配。使用ProducerConfig.PARTITIONER_CLASS_CONFIG参数来指定个性化的分发实现类。

Kafka之前提供了三种实现机制,分别是RoundRobinPartitionerDefaultPartitionerUniformStickyPartitioner,目前后两个已经被标记为过时,默认使用BuiltInPartitioner(默认实现类没有继承Partitioner接口)。

默认的BuiltInPartitioner使用了Sticky的策略,如果在给一个生产者分配了一个分区后,会尽可能一直使用这个分区。等待该分区的batch.size(默认16K)已满,或者这个分区的消息已完成linger.ms(默认0毫秒,表示如果batch.size迟迟没有满后的等待时间)后批量发送消息,之后重新选择分区。

RoundRobinPartitioner是在各个Partition中进行轮询发送,这种方式没有考虑到消息大小以及各个Broker性能差异,用得比较少。

另外可以自行指定一个Partitioner实现类,定制分区逻辑。在Partitioner接口中,核心要实现的就是partition方法。根据相关信息,选择一个Partition。比如用key对partition的个数取模之类的。而Topic下的所有Partition信息都在cluster参数中。

1
2
3
4
5
6
7
8
9
10
11
12
// 获取所有的Partition信息。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

// 默认的BuiltInPartitioner
BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());

// RoundRobinPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
// 具体实现
}

消费绑定

消费者端可以使用ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG参数来指定个性化的分配策略。查看抽象类AbstractPartitionAssignor的实现类,可以看到Kafka默认实现了4种策略:

  • RangeAssignor:范围策略,比如一个Topic有10个Partiton(partition 0~9),一个消费者组下有三个 Consumer(consumer13)。Range策略就会将分区03分给一个Consumer,46给一个Consumer,79给一个Consumer。
  • RoundRobinAssignor:轮询分配策略,可以理解为在Consumer中一个一个轮流分配分区。比如0,3,6,9分区给一个Consumer,1,4,7分区给一个Consumer,然后2,5,8给一个Consumer。
  • StickyAssignor:粘性策略,第一,在开始分区时,尽量保持分区的分配均匀。比如按照Range策略分(这一步实际上是随机的)。 第二、分区的分配尽可能的与上一次分配的保持一致。比如在range分区的情况下,第三个Consumer的服务宕机了,那么按照sticky策略,就会保持consumer1和consumer2原有的分区分配情况。然后将consumer3分配的7~9分区尽量平均的分配到另外两个consumer上。这种粘性策略可以很好的保持Consumer的数据稳定性,不需要将所有的Comsumer重新分配。

默认情况下,Kafka使用了RangeAssignor,CooperativeStickyAssignor两种分配策略。同时我们也可以实现AbstractPartitionAssignor类来个性化分配。比如,我们在配置消费者服务器时性能差异明显,那么我们就可以让性能比较高的消费者处理更多的分区消息。这些各种实现最终的目的都是为了消费者更均匀的消费,提高整体的消费速度。

生产者消息缓存机制

生产者在对消息分区完成之后,就要准备向服务端发送消息了,为了提升发送消息的速度,Kafka的发送的时候也做了优化。

类似于后端执行sql时使用批处理一下,为了减少段时间的网络请求次数,在短时间内发往Kafka的消息会被缓存起来一起批量发送。这种缓存并批量发送的方法在高并发的设计中非常常见。

具体在Kafka客户端中,负责实现这部分功能的是KafkaProducer里的accumulatorsender组件。

accumulator缓存

accumulator是发送的累加器,发送消息都会在这里面缓存起来。在KafkaProducer类的构造方法里,可以看到对accumulator的初始化,其中batchSize表示批量发送(缓存)的最多消息数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// As per Kafka producer configuration documentation batch.size may be set to 0 to explicitly disable
// batching which in practice actually means using a batch size of 1.
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
this.accumulator = new RecordAccumulator(logContext,
batchSize,
this.compressionType,
lingerMs(config),
retryBackoffMs,
retryBackoffMaxMs,
deliveryTimeoutMs,
partitionerConfig,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));

在分区的实现类完成分区之后,会调用org.apache.kafka.clients.producer.internals.RecordAccumulator#append方法,使用accumulator组件把消息添加到缓存中。

accumulator中,根据每一个Partition维护一个Deque来保存消息:

1
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());

每个Deque就会存放对应所有的发送批次,也就是一个或多个ProducerBatch。有一条消息需要被发送时,会加到Deque中的某个ProducerBatchrecordsBuilder变量的DataOutputStream中,最终被sender组件发送。以下是ProducerBatch类中调用org.apache.kafka.common.record.MemoryRecordsBuilder#append(long, byte[], byte[], org.apache.kafka.common.header.Header[])方法添加具体消息的代码:

1
2
3
4
5
6
7
8
9
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {
ensureOpenForRecordAppend();
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - baseTimestamp;
// 使用DefaultRecord往appendStream中写入key和value
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
}

accumulator的设置中,有几个变量值得注意一下。ProducerConfig.BUFFER_MEMORY_CONFIG用来设置整个accumulator的缓存大小,可以在构建RecordAccumulator的时候看到我们使用了这个参数。而另一个使用的参数BATCH_SIZE_CONFIG就是指定了每一个ProducerBatch的大小,在一个ProducerBatch中的消息的总大小要超过这个值时就会触发发送。

接下来,sender就是KafkaProducer中用来发送消息的一个单独的线程。从这里可以看到,每个 KafkaProducer对象都对应一个sender线程。他会负责将RecordAccumulator中的消息发送给Kafka。

sender发送器

sender是用来把accumulator中的消息发送到Kafka的执行器,是一个独立于Producer的线程,每一个Producer都会有一个sender。在KafkaProducer类的构造方法里,可以看到对sender的初始化。因为sender是最终与服务端交互的,可以看到初始化的参数中有服务端的相关信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs,
retryBackoffMaxMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
this.metadata.bootstrap(addresses);
}
this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);

sender也并不是一次就把RecordAccumulator中缓存的所有消息都发送出去,而是每次只拿一部分消息。他只获取RecordAccumulator中缓存内容达到BATCH_SIZE_CONFIG大小的ProducerBatch消息。当然,如果消息比较少,ProducerBatch中的消息大小长期达不到BATCH_SIZE_CONFIG的话,sender也不会一直等待,最多等待LINGER_MS_CONFIG时长,然后就会将ProducerBatch中的消息读取出来。

因为Sender类是一个单独的线程,所以必然继承了Thread类或Runable接口,我们可以在Sender类中找到run方法查看其具体的实现逻辑。

org.apache.kafka.clients.producer.internals.Sender#runOnce方法中,调用了org.apache.kafka.clients.producer.internals.Sender#sendProducerData方法。该方法中调用accumulatorready方法判断哪些partitions是已经准备好的,再调用drain方法获取对应所有准备好的ProducerBatch,把这些ProducerBatch加入到inflightBatchList变量中等待发送:

1
2
3
4
5
6
7
8
9
10
11
12
private long sendProducerData(long now) {
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadata, now);
// 省略...
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadata, result.readyNodes, this.maxRequestSize, now);
// 加入inflightBatchList
addToInflightBatches(batches);
// 省略...
// 执行发送
sendProduceRequests(batches, now);
}

最后加入到inflightBatchList变量的消息在sendProduceRequests方法中交给KafkaClient client这个selector去执行发送。

因为sender是独立线程,所以每次Producer执行doSend()方法最后都会在需要发送的时候调用this.sender.wakeup();方法去触发selector的发送。

Kafka的生产者缓存机制是Kafka面对海量消息时非常重要的优化机制。合理优化这些参数,对于Kafka集群性能提升是非常重要的。比如,如果你的消息体比较大,那么应该考虑加大batch.size,尽量提升batch的缓存效率。而如果Producer要发送的消息确实非常多,那么就需要考虑加大total.memory参数,尽量避免缓存不够造成的阻塞。如果发现生产者发送消息比较慢,那么可以考虑提升max.in.flight.requests.per.connection参数,这样能加大消息发送的吞吐量。

发送应答机制

和其他消息队列一样,生产者发送的消息需要应答机制才可以确保消息被正确的发送到服务端。

我们在上一小节介绍sender组件的代码中可以看到,初始化sender传入了一个ack参数,这个是获取的配置项acks的值。官方在ProducerConfig#ACKS_DOC中给出了相当详细的说明:

  • acks=0:生产者不关心Broker是否将消息写入Partition,只管发送消息。此时的吞吐量最高,但是数据安全性最低。
  • acks=all/-1:生产者需要等Broker端所有的Partition(Leader和Follower)都写完了消息才会返回结果,此时数据安全性最高,但是每次发送消息需要等待更长的时间。
  • acks=1,折中的方案。Broker在Leader Partition写成功数据之后返回结果。

我们在acks为0的时候,无法从RecordMetadata获取offset信息。

在生产环境中,acks=0的可靠性太差,基本不使用。acks=1一般用于日志传输,允许个别数据丢失但是吞吐量巨大的场景。acks=-1一般用于金融相关的敏感数据,必须保证数据安全性。

如果acks=-1,Kafka也不是强制所有的Partition都写完下次才会返回结果,根据Broker配置文件broker.conf中的参数min.insync.replicas,控制Broker在写入多少个Partition之后给Producer响应。

生产者消息幂等性

ProducerConfig#ACKS_DOC中,最后一句写到:

1
2
Note that enabling idempotence requires this config value to be 'all'.
If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.

其中idempotence就是幂等性。当acks=-1或1时,Producer每次发送消息都是需要获取Broker端返回的RecordMetadata的。如果要保证消息安全,那么对于每次消息的传递,发送给Broker和返回给Producer两次网络传输必须是幂等的。如果Producer没有接收到Broker的返回,那么就会认为这条消息发送失败,此时Producer就会根据重试次数(参数ProducerConfig.RETRIES_CONFIG)进行重发。此时就可能出现消息被重复发送到Broker的问题,那么Kafka是如何解决的呢?

我们先查看一下幂等性相关的属性:

ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG:如果设置为true则开启幂等性,能够保证消息发送到Broker只有一份。

这里的只有一份官方解释的英文用的是exactly one。相应的,分布式数据传递过程中有三种数据语义:at-least-once:至少一次;at-most-once:最多一次;exactly once:精确一次。

从字面意思可以知道,最多一次和至少一次都是不精确的,在部分业务场景下是可以使用的,并且只实现一种相对比较简单。但是对于一些敏感数据,必须要求不重复也不丢失,此时就需要去实现精确一次。

在Kafka中at-most-once可以通过ack=0实现,at-least-once可以通过ack=-1实现,exactly once就需要设置idempotencetrue实现。

为了实现exactly-once,Kafka做了以下几个设计:

  • PID:对于每一个Producer在初始化过程中会被分配一个唯一的PID。
  • SN:Sequence Number。对于每一个PID,这个Producer针对Partition会维护一个SN。这是一个从0开始单调递增的数字。当Producer要往同一个Partition发送消息时,这个SN就会加1。然后会随着消息一起发往Broker。
  • Broker端则会针对每个<PID,Partition>维护一个序列号(SN),只有当对应的SequenceNumber = SN+1时,Broker才会接收消息,同时将SN更新为SN+1。否则,SequenceNumber过小就认为消息已经写入了,不需要再重复写入,直接应答即可。而如果SequenceNumber过大,就会认为中间可能有数据丢失了。对生产者就会抛出一个OutOfOrderSequenceException。

所以Kafka在打开idempotence之后,Broken就会保证一条消息最多只保存一条,实现at-most-once,再加上acks=1或-1可以实现at-least-once,这样就整体上保证了exactly-once

生产者消息事务

消息幂等性可以保证一条消息发送到对应的Partition是安全的,那么如果是多条消息要发送到不同的Partition,此时要保证这一组消息的原子性,即全部成功才成功,就需要用到事务。

使用事务的英文transaction可以在KafkaProducer类中搜到如下几个方法:

1
2
3
4
5
6
7
8
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 提交事务
void commitTransaction() throws ProducerFencedException;
// 4 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

其中在initTransactions方法的说明中,官方写到如果要使用事务,那么必须给生产者设置transactional.id。如果当前一个Producer的事务没有提交,而另一个新的Producer保持相同的transactional.id,这时旧的生产者会立即失效,无法继续发送消息。如果当前一个Producer宕机了但是事务没有提交,新的transaction.id相同的Producer会对旧事务补齐,要么提交事务,要么终止事务。这样新的Producer就可以继续正常工作。

所以,使用以下的方法去发送消息是比较安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class MyTransactionProducer {
private static final String BOOTSTRAP_SERVERS = "192.168.56.100:9092,192.168.56.101:9092,192.168.56.102:9092";
private static final String TOPIC = "test-java-client";

public static void main(String[] args) {
//PART1:设置发送者相关属性
Properties props = new Properties();
// 此处配置的是kafka的端口
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.zm.basic.MyInterceptor");
// 配置transaction.id
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "id_1");
// 配置key的序列化类
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 配置value的序列化类
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
// 开启事务
producer.initTransactions();
producer.beginTransaction();
CountDownLatch latch = new CountDownLatch(5);
try {
for (int i = 0; i < 5; i++) {
//Part2:构建消息
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);
//Part3:发送消息
producer.send(record);
System.out.println("message " + i + " sent");
}
producer.commitTransaction();
}
catch (Exception e) {
producer.abortTransaction();
}
finally {
producer.close();
}
}
}

生产者的事务消息机制保证了Producer发送消息的安全性,但是,他并不保证已经提交的消息就一定能被所有消费者消费。

SpringBoot集成Kafka

这部分的应用本来就非常简单,而且他的本质也是在框架中构建Producer和Consumer。当了解了 kafka的核心消息流转流程,对这些应用参数就可以进行合理的组装。

pom中引用Kafka

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

配置springboot配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
spring.application.name=springboot-client

###########Kafka集群###########
spring.kafka.bootstrap-servers=192.168.56.100:9092,192.168.56.101:9092,192.168.56.102:9092
###########生产者配置?###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别ack(0或1或all/-1)
spring.kafka.producer.acks=1
# 批量发送的大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
###########【消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto-commit-interval=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer

这些配置文件在基础的客户端中其实都可以找到对应的说明,在学习了基础的之后再学习配置并不难。

应用中使用框架注入的KafkaTemplate发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
public class MyProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;

public MyProducer(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@GetMapping("/kafka/normal/{message}")
public void sendMessage1(@PathVariable("message") String normalMessage) {
kafkaTemplate.send("topic1", normalMessage);
}
}

使用@KafkaListener注解声明消息消费者

1
2
3
4
5
6
7
@Component
public class MyConsumer {
@KafkaListener(topics = {"topic1"})
public void onMessage(ConsumerRecord<?, ?> record) {
System.out.println("消费消息:" + record.topic() + "-" + record.partition() + "-" + record.value());
}
}