前言
在前面我们学习到 Kafka 生产者在发送时,消息会先流入到消息收集器 RecordAccumulator,随后再由另外的线程—— Sender 线程将累积的消息发往 Kafka 服务端。本篇文章一起学习下 Sender 线程的工作流程。
Sender 线程的启动可以在 KafkaProducer 的构造函数中找到:
1 | //... |
以下内容基于Kafka 2.5.1版本
它实现 Runnable 接口,而其主要逻辑也包含在 run 方法中:
1 | //... |
run 方法中会循环调用 runOnce 方法,我们直接来看 runOnce 方法的实现。
runOnce 方法
1 | void runOnce() { |
sendProducerData 请求发起
1 | private long sendProducerData(long now) { |
其大致流程如下:
获取缓存中的集群元数据信息;
获取可发送请求的节点信息,这里调用的是消息收集器的 RecordAccumulator#ready 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
// 记录下一次需要调用 ready 方法的时间间隔
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
// 是否有线程正等待释放空间
boolean exhausted = this.free.queued() > 0;
// 遍历所有的消息批次
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
Deque<ProducerBatch> deque = entry.getValue();
synchronized (deque) {
// When producing to a large number of partitions, this path is hot and deques are often empty.
// We check whether a batch exists first to avoid the more expensive checks whenever possible.
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
TopicPartition part = entry.getKey();
Node leader = cluster.leaderFor(part);
// 判断 leader 是否存在,不存在则无法发送消息,同时需要更新集群元数据信息
if (leader == null) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {
// 消息批次等待发送(自上一次尝试发送的时间至今)的时间
long waitedTimeMs = batch.waitedTimeMs(nowMs);
// 是否重试退避时间内
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
// 消息发送前的最大留存时间
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
// 是否存在消息已满的批次
boolean full = deque.size() > 1 || batch.isFull();
// 消息批次在队列中是否超时
boolean expired = waitedTimeMs >= timeToWaitMs;
// 是否应该发送该消息批次
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
// 如果可以的话,就放入 readyNodes
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}这里通过遍历所有的消息批次,为了能得到能够发送消息的节点集合,其中能满足发送条件的有以下几种:
- 消息批次队列满了;
- 消息批次在队列的时间超时了;
- 有其他线程下等待释放空间;
- 生产者关闭了;
- 是不触发了 flush (立即发送)操作;
更新集群元数据信息,这里只是作了标记;
检查 readyNodes 集合里的节点是否能发送请求,通过 NetworkClient#ready 判断:
1
2
3
4
5
6
7
8
9
10
11
12
13public boolean ready(Node node, long now) {
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);
// 是否已经建立了连接,并且当前已发送但未响应的请求未达到上限
if (isReady(node, now))
return true;
// 初始化连接
if (connectionStates.canConnect(node.idString(), now))
// if we are interested in sending to a node and we don't have a connection to it, initiate one
initiateConnect(node, now);
return false;
}获取待发送的消息集合。这里调用消息收集器 RecordAccumulator#drain 方法获取:
1
2
3
4
5
6
7
8
9
10
11public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
// 收集的结果为Map<Node,List< ProducerBatch>
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;
// 获取当前节点的分区集合
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<ProducerBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
// drainIndex 记录了上次停止的位置,这样每次不会从0开始
// 避免造成总是发送前几个分区的情况,造成靠后的分区的饥饿
int start = drainIndex = drainIndex % parts.size();
do {
// 分区信息
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
this.drainIndex = (this.drainIndex + 1) % parts.size();
// Only proceed if the partition has no in-flight batches.
// 是否需要保证发送顺序
if (isMuted(tp, now))
continue;
Deque<ProducerBatch> deque = getDeque(tp);
if (deque == null)
continue;
synchronized (deque) {
// invariant: !isMuted(tp,now) && deque != null
// 只取第一个消息批次
ProducerBatch first = deque.peekFirst();
if (first == null)
continue;
// first != null
// 是否重试避时间内
boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
// Only drain the batch if it is not during backoff period.
if (backoff)
continue;
// 是否超过请求大小限制,是的话单次请求只发送该消息批次
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due to
// compression; in this case we will still eventually send this batch in a single request
break;
} else {
if (shouldStopDrainBatchesForPartition(first, tp))
break;
// 省略事务相关...
batch.close();
size += batch.records().sizeInBytes();
ready.add(batch);
batch.drained(now);
}
}
} while (start != drainIndex);
return ready;
}最终实际上会将原本<分区,Deque<ProducerBatch>>的保存形式转变成<Node,List< ProducerBatch>的形式返回,因为对于网络连接来说,生产者只需要与具体的 broker 节点建立的连接,也就是向具体的 broker 节点发送消息。
将待发送的消息批次记录到 inFlightBatches 中;
如果需要保证消息发送顺序(max.in.flight.requests.per.connection 为 1)的话,记录对应的分区到 muted 集合;
处理过期的消息批次。其中 getExpiredInflightBatches 方法用于获取 inFlightBatches 集合中已经过期的消息批次,而 expiredBatches 方法用于获取消息收集器 RecordAccumulator 中过期的消息批次;另外,这里的过期指的是消息在流入消息收集器后的过期时间(由 delivery.timeout.ms 配置指定);获取到过期的消息批次集合后,接着逐个通过 failBatch 处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private void failBatch(ProducerBatch batch,
long baseOffset,
long logAppendTime,
RuntimeException exception,
boolean adjustSequenceNumbers) {
if (transactionManager != null) {
transactionManager.handleFailedBatch(batch, exception, adjustSequenceNumbers);
}
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
// 超时调用回调
if (batch.done(baseOffset, logAppendTime, exception)) {
maybeRemoveAndDeallocateBatch(batch);
}
}这里的处理直接将该消息批次标记为完成,而后调用其中每条消息的回调方法。
计算 pollTimeout,该时间也是最近一个消息批次的过期时长,也是一次 runOnce 循环等待的最长时间 ;
调用 sendProduceRequests 方法:
1
2
3
4private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}逐个调用 sendProduceRequest:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
// 按分区填充 produceRecordsByPartition 和 recordsByPartition
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
//...
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
// 创建回调
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
// 创建客户端请求对象 clientRequest
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
// 把 clientRequest 发送给 NetworkClient ,完成消息的预发送
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
最终生成相应的客户端请求对象,并写入到 NetworkClient ,后续交由 NetworkClient 完成网络I/O。
那当收到响应后,Sender 又是如何处理的呢
handleProduceResponse 处理响应
1 | private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) { |
对于有正常的响应,会通过 completeBatch 方法处理:
1 | private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, |
可以看到,handleProduceResponse 主要是针对各种预期异常进行区分并处理,对于正常响应的情况则调用 completeBatch 处理:
1 | private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) { |
其实现也是调用batch的done()方法,然后从集合中删除batch并释放batch占用的内存。
至此,关于 Sender 线程中,消息的发送和消息的处理就先学习到这里了。