我们都知道,在使用 Kakfa 客户端发送消息时,只需要指定主题和消息的内容,再调用发送方法即可。那发送方法中具体包含了哪些逻辑呢,本文结合源码一起来看下。
1 | // 构建消息 |
以下内容基于Kafka 2.5.1版本
构建消息对象
实际上,Kafka 中的消息对象并不只有主题和消息内容两个属性:
1 | public class ProducerRecord<K, V> { |
其中 key 用来指定消息的键,可以用于计算分区号并且让消息发往特定的分区,同一个 key 的消息会被发往同一个分区,另外,由于 Kafka 可以保证同一个分区中的消息是有序的,相同 key 的消息的消费在一定程度上也能保证有序。
发送消息
构建消息后,接下来就是发送了:
1 | public Future<RecordMetadata> send(ProducerRecord<K, V> record) { |
从 send 方法的返回值不难看出,该方法是一个异步方法,如果希望使用同步发送,则可以在拿到返回值后使用 Future#get 方法等发送结果。
发送结果是一个 RecordMetadata 对象:
1 | public final class RecordMetadata { |
在 RecordMetadata 中记录了消息的元数据信息,如消息的主题、分区、分区中消息的偏移量、时间戳等。
另外,Kafka 也提供了具有回调参数的 send 方法:
1 | public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { |
回调函数包含两个参数:
1 | public interface Callback { |
当发送的消息被服务端确认成功后,消息的 metadata 将被返回;当发送出现异常时,可以通过 exception 拿到具体的异常,此时 metadata 对象中除 topicPartition 外的其他属性都将是-1。
来到 doSend 方法,消息发送的主逻辑得以展现:
1 | private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { |
可以看到,发送消息主要分为几个步骤:
- 等待获取 Kafka 集群元数据信息,这里会把最大阻塞时间参数 maxBlockTimeMs 传进去,方法返回后重新计算剩余等待时间 remainingWaitMs ;
- 使用 Serializer 器对消息的 key 和 value 序列化;
- 获取消息发往的目标分区 partition;
- 预估序列化后的消息大小,如果超过了限制,则抛出异常;
- 调用 RecordAccumulator#append 方法,将消息放入消息收集器中的消息批次;
- 如果消息批次满了,或者创建了新的批次,则唤醒 Sender 线程,后续由 Sender 线程从 RecordAccumulator 中批量发送消息到 Kafka。
生产者拦截器
在 KafkaProducer 里面,维护了一个生产者拦器集合 ProducerInterceptors :
1 | public interface ProducerInterceptor<K, V> extends Configurable { |
其 ProducerInterceptor 接口提供了三个方法,其中 onSend 方法可以用来对发送前的消息进行相应的定制化操作;onAcknowledgement 方法先于用户的 Callback 执行,可以用于对kafka集群响应进行预处理;close 方法则可以用于在关闭拦截器时执行一些资源的清理工作。
获取目标分区
在获取目标分区时:
1 | private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { |
如果指定了目标分区,则使用指定的;否则通过分区器 Partitioner#partition 计算得出,默认的分区器由 DefaultPartitioner 提供实现:
1 | public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { |
获取目标分区时分两种情况 :
- 如果消息存在 key ,则使用 murmur2 算法取 key 的 hash 值,然后对分区总数取模,得到目标分区;
- 如果消息没有 key ,则由 StickyPartitionCache 进一步获取。
那 StickyPartitionCache 的实现是什么呢?
1 | public int partition(String topic, Cluster cluster) { |
indexCache 中维护了 topic 到 分区的映射,如果为空的话,则通过 nextPartition 方法重新获取:
1 | public int nextPartition(String topic, Cluster cluster, int prevPartition) { |
初看起来 indexCache 中只要有值,那么向同一个 topic 发送消息时会一直使用同一个分区,其实不然,在 doSend 方法中,我们可以看到当需要创建一个新的消息批次时,也会触发 nextPartition 方法的执行。那么这样做的目的是什么呢?
前面我们提到发送消息时,会先流入消息收集器 RecordAccumulator 中将消息先缓存起来,后续由 Sender 线程发送,而触发发送的条件有两种:
- 消息批次被填满;
- 消息发送的等待时间超过了 linger.ms 的配置。
而 StickyPartitionCache 的作用其实是 “黏性选择”,它能尽可能地将消息发往同一个分区,使消息批次能尽快的填满被发送出去,这样就可以一定程度上降低消息发送的延迟,同时也降低了发送的频次。
总结
本文主要对 Kafka 客户端的消息发送方法作了主要逻辑的梳理,同时对 Kafka 的默认分区器作了简单的分析,以及使用 StickyPartitionCache 带来的好处。发送流程中涉及到的 RecordAccumulator 、Sender 线程将在后续文章中一起学习下。