1.概述
最近有同学留言在使用Kafka的过程中遇到一些问题,比如在拉取的Topic中的数据时会抛出一些异常,今天笔者就为大家来分享一下Kafka的Fetch流程。
2.内容
2.1 背景
首先,我们来了解一下,Fetch Session的目标。Kafka在1.1.0以后的版本中优化了Fetch问题,引入了Fetch Session,Kafka由Broker来提供服务(通信、数据交互等)。每个分区会有一个Leader Broker,Broker会定期向Leader Broker发送Fetch请求,来获取数据,而对于分区数较大的Topic来说,需要发出的Fetch请求就会很大。这样会有一个问题:
- Follower感兴趣的分区集很少改变,然而每个FetchRequest必须枚举Follower感兴趣的所有分区集合;
- 当上一个FetchRequest只会分区中没有任何改变,仍然必须发回关于该分区的所有元数据,其中包括分区ID、分区的起始Offset、以及能够请求的最大字节数等。
并且,这些问题与系统中现存分区的数量成线性比例,例如,假设Kafka集群中有100000个分区,其中大多数分区很少接收新消息。该系统中的Broker仍然会来回发送非常大的FetchRequest和FetchResponse,即使每秒添加的实际消息数据很少。随着分区数量的增长,Kafka使用越来越多的网络带宽来回传递这些消息。
当Kafka被调整为较低延迟时,这些低效会变得更严重。如果我们将每秒发送的FetchRequest数量增加一倍,我们应该期望在缩短的轮询间隔内有更多的分区没有改变。而且,我们无法在每个FetchRequest和FetchResponse中分摊每个分区发送元数据的所需要的带宽资源,这将会导致Kafka需要使用更多的网络带宽。
2.2 优化
为了优化上述问题,Kafka增加了增量拉取分区的概念,从而减少客户端每次拉取都需要拉取全部分区的问题。Fetch Session与网络编程中的Session类似,可以认为它是有状态的,这里的状态值的是知道它需要拉取哪些分区的数据,比如第一次拉取的分区0中的数据,后续分区0中没有了数据,就不需要拉取分区0了,FetchSession数据结构如下
class FetchSession(val id: Int, // sessionid是随机32位数字,用于鉴权,防止客户端伪造 val privileged: Boolean, // 是否授权 val partitionMap: FetchSession.CACHE_MAP,// 缓存数据CachedPartitionMap val creationMs: Long, // 创建Session的时间 var lastUsedMs: Long, // 上次使用会话的时间,由FetchSessionCache更新 var epoch: Int) // 获取会话序列号
需要注意的是,Fetch Epoch是一个单调递增的32位计数器,它在处理请求N之后,Broker会接收请求N+1,序列号总是大于0,在达到最大值后,它会回到1。
如果Fetch Session支持增量Fetch,那么它将维护增量Fetch中每个分区的信息,关于每个分区,它需要维护:
- Topic名称
- 分区ID
- 该分区的最大字节数
- Fetch偏移量
- HighWaterMark
- FetcherLogStartOffset
- LeaderLogStartOffset
其中,Topic名称、分区ID来自于TopicPartition,最大字节数、Fetch偏移量、FetcherLogStartOffset来自于最近的FetcherRequest,HighWaterMark、LocalLogStartOffset来自于本地的Leader。因为Follower活着Consumer发出的请求都会与分区Leader进行交互,所以FetchSession也是记录在Leader节点上的。
对于客户端来说,什么时候一个分区会被包含到增量的拉取请求中:
- Client通知Broker,分区的maxBytes,fetchOffset,LogStartOffset改变了;
- 分区在之前的增量拉取会话中不存在,Client想要增加这个分区,从而来拉取新的分区;
- 分区在增量拉取会话中,Client要删除。
对于服务端来说,增量分区包含到增量的拉取响应中:
- Broker通知Client分区的HighWaterMark或者brokerLogStartOffset改变了;
- 分区有新的数据
在Fetch.java类中,方法sendFetches(): prepareFetchRequests创建FetchSessionHandler.FetchRequestData。 构建拉取请求通过FetchSessionHandler.Builder,builder.add(partition, PartitionData)会添加next: 即要拉取的分区。构建时调用Builder.build(),针对Full进行拉取,代码片段如下:
FetchSessionHandler.java
if (nextMetadata.isFull()) { // epoch为0或者-1 if (log.isDebugEnabled()) { log.debug("Built full fetch {} for node {} with {}.", nextMetadata, node, partitionsToLogString(next.keySet())); } sessionPartitions = next; // next为之前动态增加的分区 next = null; // 本地全量拉取,下次next为null Map<TopicPartition, PartitionData> toSend = Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata); }
收到响应结果后,调用FetchSessionHandler.handleResponse()方法。 假如第一次是全量拉取,响应结果没有出错时,nextMetadata.isFull()仍然为true。 服务端创建了一个新的session(随机的唯一ID),客户端的Fetch SessionId会设置为服务端返回的sessionId, 并且epoch会增加1。这样下次客户端的拉取就不再是全量,而是增量了(toSend, toForget两个集合容器,分别表示要拉取的和不需要拉取的)。 当服务端正常处理(这次不会生成新的session),客户端也正常处理响应,则sessionId不会增加,但是epoch会增加1。
public boolean handleResponse(FetchResponse<?> response) { if (response.error() != Errors.NONE) { log.info("Node {} was unable to process the fetch request with {}: {}.", node, nextMetadata, response.error()); // 当集群session超过最大阀值,会出现这个异常信息 if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) { nextMetadata = FetchMetadata.INITIAL; } else { nextMetadata = nextMetadata.nextCloseExisting(); } return false; } if (nextMetadata.isFull()) { if (response.responseData().isEmpty() && response.throttleTimeMs() > 0) { // Normally, an empty full fetch response would be invalid. However, KIP-219 // specifies that if the broker wants to throttle the client, it will respond // to a full fetch request with an empty response and a throttleTimeMs // value set. We don't want to log this with a warning, since it's not an error. // However, the empty full fetch response can't be processed, so it's still appropriate // to return false here. if (log.isDebugEnabled()) { log.debug("Node {} sent a empty full fetch response to indicate that this " + "client should be throttled for {} ms.", node, response.throttleTimeMs()); } nextMetadata = FetchMetadata.INITIAL; return false; } String problem = verifyFullFetchResponsePartitions(response); if (problem != null) { log.info("Node {} sent an invalid full fetch response with {}", node, problem); nextMetadata = FetchMetadata.INITIAL; return false; } else if (response.sessionId() == INVALID_SESSION_ID) { if (log.isDebugEnabled()) log.debug("Node {} sent a full fetch response{}", node, responseDataToLogString(response)); nextMetadata = FetchMetadata.INITIAL; return true; } else { // The server created a new incremental fetch session. if (log.isDebugEnabled()) log.debug("Node {} sent a full fetch response that created a new incremental " + "fetch session {}{}", node, response.sessionId(), responseDataToLogString(response)); nextMetadata = FetchMetadata.newIncremental(response.sessionId()); return true; } } else { String problem = verifyIncrementalFetchResponsePartitions(response); if (problem != null) { log.info("Node {} sent an invalid incremental fetch response with {}", node, problem); nextMetadata = nextMetadata.nextCloseExisting(); return false; } else if (response.sessionId() == INVALID_SESSION_ID) { // The incremental fetch session was closed by the server. if (log.isDebugEnabled()) log.debug("Node {} sent an incremental fetch response closing session {}{}", node, nextMetadata.sessionId(), responseDataToLogString(response)); nextMetadata = FetchMetadata.INITIAL; return true; } else { // The incremental fetch session was continued by the server. // We don't have to do anything special here to support KIP-219, since an empty incremental // fetch request is perfectly valid. if (log.isDebugEnabled()) log.debug("Node {} sent an incremental fetch response with throttleTimeMs = {} " + "for session {}{}", node, response.throttleTimeMs(), response.sessionId(), responseDataToLogString(response)); nextMetadata = nextMetadata.nextIncremental(); return true; } } }
Broker处理拉取请求是,会创建不同类型的FetchContext,类型如下:
- SessionErrorContext:拉取会话错误(例如,epoch不相等)
- SessionlessFetchContext:不需要拉取会话
- IncrementalFetchContext:增量拉取
- FullFetchContext:全量拉取
在KafkaApis#handleFetchRequest()中,代码片段如下:
val fetchContext = fetchManager.newContext( fetchRequest.metadata, fetchRequest.fetchData, fetchRequest.toForget, fetchRequest.isFromFollower) // ...... if (fetchRequest.isFromFollower) { // We've already evaluated against the quota and are good to go. Just need to record it now. unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) val responseSize = KafkaApis.sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader) quotas.leader.record(responseSize) trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData.size}, " + s"metadata=${unconvertedFetchResponse.sessionId}") requestHelper.sendResponseExemptThrottle(request, createResponse(0), Some(updateConversionStats))
}
2.3 Fetch Session缓存
因为Fetch Session使用的是Leader上的内存,所以我们需要限制在任何给定时间内的内存量,因此,每个Broker将只创建有限数量的增量Fetch Session。以下,有两个公共参数,用来配置Fetch Session的缓存:
- max.incremental.fetch.session.cache.slots:用来限制每台Broker上最大Fetch Session数量,默认1000
- min.incremental.fetch.session.eviction.ms:从缓存中逐步增量获取会话之前等待的最短时间,默认120000
这里需要注意的时候,该属性属于read-only。Kafka Broker配置中有三种类型,它们分别是:
类型 | 说明 |
read-only | 修改参数值后,需要重启Broker才能生效 |
per-broker | 修改参数值后,只会在对应的Broker上生效,不需要重启,属于动态参数 |
cluster-wide | 修改参数值后,整个集群范围内会生效,不需要重启,属于动态参数 |
当服务器收到创建增量Fetch Session请求时,它会将新的Session与先有的Session进行比较,只有在下列情况下,新Session才会有效:
- 新Session在Follower里面;
- 现有Session已停止,且超过最小等待时间;
- 现有Session已停止,且超过最小等待时间,并且新Session有更多的分区。
这样可以实现如下目标:
- Follower优先级高于消费者;
- 随着时间的推移,非活跃的Session将被替换;
- 大请求(从增量中收益更多)被优先处理;
- 缓存抖动是有限的,避免了昂贵的Session重建时。
2.4 公共接口
新增了如下错误类型:
- FetchSessionIdNotFound:当客户端请求引用服务器不知道的Fetch Session时,服务器将使用此错误代码进行响应。如果存在客户端错误,或者服务器退出了Fetch Session,也会出现这种错误;
- InvalidFetchSessionEpochException:当请求的Fetch Session Epoch与预期不相同时,服务器将使用此错误代码来进行响应。
2.5 FetchRequest元数据含义
请求SessionID | 请求SessionEpoch | 含义 |
0 | -1 | 全量拉取(没有使用或者创建Session时) |
0 | 0 | 全量拉取(如果是新的Session,Epoch从1开始) |
$ID | 0 | 关闭表示为$ID的增量Fetch Session,并创建一个新的全量Fetch(如果是新的Session,Epoch从1开始) |
$ID | $EPOCH | 如果ID和EPOCH是正确的,创建一个增量Fetch |
2.6 FetchResponse元数据含义
Request SessionID | 含义 |
0 | 没有Fetch Session是创建新的 |
$ID | 下一个请求会使增量Fetch请求,并且SessionID是$ID |
3.总结
Client和Broker的Fetch过程可以总结如下图所示:
4.结束语
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。