前言
前面我们提到,当通过 KafkaProducer#send 方法发送消息时,消息会先被写入到消息收集器—— RecordAccumulator 中缓存,当缓存的消息满足一定的条件后,将由 Sender 线程获取批量消息并将其发往 Kafka ,这样的好处是能够减少网络请求的次数,从而提升网络吞量。本篇文章一起来学习下 RecordAccumulator 的内部实现。
以下内容基于Kafka 2.5.1版本
append 方法
1 | public RecordAppendResult append(TopicPartition tp, |
整体流程如下:
获取或创建新的双端队列:
1
2
3
4
5
6
7
8
9
10
11private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
Deque<ProducerBatch> d = this.batches.get(tp);
if (d != null)
return d;
d = new ArrayDeque<>();
Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}其中 batches 类型为 CopyOnWriteMap ,线程安全类,存放了主题到消息批次队列的映射。
尝试往队列中追加数据,此时会取队列的最后一个消息批次,并通过 ProducerBatch#tryAppend 进行追加:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
// 是否超过 buffer 限制
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
// 往 buffer 追加消息
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
// we have to keep every future returned to the users in case the batch needs to be
// split to several new batches and resent.
// 将异步发送结果 Future 收集起来
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}计算消息批次大小(取批次大小和消息大小的最大值),申请新的内存空间 buffer;
再次尝试添加消息;
使用申请到的内存,构建新的消息批次,将消息追加到消息批次,同时将消息批次添加到主题分区对应的队列;
释放无用的 buffer。
这里有几点值得思考:
为什么对同一个锁对象,前后加了再次锁,能不能合并成一个呢?
首先,锁的粒度控制得越小,锁竞争造成的开销也就越小;另外,申请新的内在空间 buffer 是比较耗时操作,在并发情况下,生产者的消息体也并不都是一样的。如果合并成一个同步锁,假使生产者1在发送消息时,因为消息太大而需要申请新的内存空间,而此时的消息批次剩余的内在空间却能满足生产者2发送的消息时,由于生产者1先进来,会新构建一个消息批次,后来的生产者也会使用新构建的消息批次追加消息,而可能导致空间没能很好的利用。
流程的最后,有一个内存释放的操作,这个有作用是什么呢?
同样是并发情况下,如果两个生产者在第一次尝试添加消息失败时,会构建新的消息批次,在先到来的生产者1构建完成后并将消息追加进去,生产者2这里会先再次尝试追加,如果追回成功,那么此时生产者2则不需要构建一个新的消息批次,申请的内存也自然用不上了,所以需要及时地释放。
内存池 BufferPool
前面我们提到,构建一个新的 ProducerBatch 前会先申请一块内存,而内存的来源就是内存池,也叫 BufferPool。
我们都知道线程池、连接池等将资源池化的最大作用就是复用,BufferPool 也不例外,其通过对内存块的复用,可以避免频繁的创建内存和销毁内存,从而避免频繁地 GC。那 BufferPool 是怎么实现的呢,首先来看下其申请内存的方法实现。
allocate
1 | public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { |
首先需要了解的是,BufferPool 的可用空间分为两个部分:
- 池化内存可用区域 free(ByteBuffer 队列):初始值为空,由固定大小 poolableSize(等于消息批次大小)的 ByteBuffer 组成;
- 非池化可用内存大小 nonPooledAvailableMemory:初始值等于总内存大小 totalMemory;
当申请内存,按以下流程分配:
- 如果申请的目标空间等池化内存大小 poolableSize,则优先从池化内存可用区 free 分配;
- 如果池化区没有可用内存块,或者目标空间大小不等于池化内存大小,则从非池空间区域分配;
- 如果剩余空间(池化内存大小+非池化内存大小)能满足目标空间,则释放池化内存块,同时增加非池化内存大小,最后从非池化内存区分配;
- 否则,循环阻塞等待释放足够的可用空间出来;
- 返回分配到的内存块 ByteBuffer。
deallocate
从 BufferPool 中分配的 ByteBuffer 内存块在使用完之后,可以通过 deallocate 方法释放:
1 | public void deallocate(ByteBuffer buffer, int size) { |
总的来说,释放的 ByteBuffer 大小如果满足池化大小,则放入池化内存区域重复利用,否则增加非池化内存大小,而释放的 ByteBuffer 则等待 JVM 的 GC 进行自动回收。
总结
本文针对 Kafka 客户端的 RecordAccumulator 消息收集器的 append 添加消息方法的流程作了简单的分析学习,又借此引出了 BufferPool 内存池,并对申请内存和释放内存作了简单的分析学习,了解到 BufferPool 包括两个部分 free 和nonPooledAvailableMemory,而能够复用的是 free 这一部分。