kafka3.0
为什么要抛弃使用了十年的 ZooKeeper?
ZooKeeper 是 Hadoop 的一个子项目,一般用来管理较大规模、结构复杂的服务器集群,具有自己的配置文件语法、管理工具和部署模式。Kafka 最初由 LinkedIn 开发,随后于 2011 年初开源,2014 年由主创人员组建企业 Confluent。
Broker 是 Kafka 集群的骨干,负责从生产者(producer)到消费者(consumer)的接收、存储和发送消息。在当前架构下,Kafka 进程在启动的时候需要往 ZooKeeper 集群中注册一些信息,比如 BrokerId,并组建集群。ZooKeeper 为 Kafka 提供了可靠的元数据存储,比如 Topic/ 分区的元数据、Broker 数据、ACL 信息等等。
同时 ZooKeeper 充当 Kafka 的领导者,以更新集群中的拓扑更改;根据 ZooKeeper 提供的通知,生产者和消费者发现整个 Kafka 集群中是否存在任何新 Broker 或 Broker 失败。大多数的运维操作,比如说扩容、分区迁移等等,都需要和 ZooKeeper 交互。
也就是说,Kafka 代码库中有很大一部分是负责实现在集群中多个 Broker 之间分配分区(即日志)、分配领导权、处理故障等分布式系统的功能。而早已经过业界广泛使用和验证过的 ZooKeeper 是分布式代码工作的关键部分。
假设没有 ZooKeeper 的话,Kafka 甚至无法启动进程。腾讯云中间件 – 微服务产品中心技术总监韩欣对 InfoQ 说,“在以前的版本中,ZooKeeper 可以说是 Kafka 集群的灵魂。”
但严重依赖 ZooKeeper,也给 Kafka 带来了掣肘。Kafka 一路发展过来,绕不开的两个话题就是集群运维的复杂度以及单集群可承载的分区规模,韩欣表示,比如腾讯云 Kafka 维护了上万节点的 Kafka 集群,主要遇到的问题也还是这两个。
首先从集群运维的角度来看,Kafka 本身就是一个分布式系统。但它又依赖另一个开源的分布式系统,而这个系统又是 Kafka 系统本身的核心。这就要求集群的研发和维护人员需要同时了解这两个开源系统,需要对其运行原理以及日常的运维(比如参数配置、扩缩容、监控告警等)都有足够的了解和运营经验。否则在集群出现问题的时候无法恢复,是不可接受的。所以,ZooKeeper 的存在增加了运维的成本。
其次从集群规模的角度来看,限制 Kafka 集群规模的一个核心指标就是集群可承载的分区数。集群的分区数对集群的影响主要有两点:ZooKeeper 上存储的元数据量和控制器变动效率。
Kafka 集群依赖于一个单一的 Controller 节点来处理绝大多数的 ZooKeeper 读写和运维操作,并在本地缓存所有 ZooKeeper 上的元数据。分区数增加,ZooKeeper 上需要存储的元数据就会增加,从而加大 ZooKeeper 的负载,给 ZooKeeper 集群带来压力,可能导致 Watch 的延时或丢失。
当 Controller 节点出现变动时,需要进行 Leader 切换、Controller 节点重新选举等行为,分区数越多需要进行越多的 ZooKeeper 操作:比如当一个 Kafka 节点关闭的时候,Controller 需要通过写 ZooKeeper 将这个节点的所有 Leader 分区迁移到其他节点;新的 Controller 节点启动时,首先需要将所有 ZooKeeper 上的元数据读进本地缓存,分区越多,数据量越多,故障恢复耗时也就越长。
Kafka 单集群可承载的分区数量对于一些业务来说,又特别重要。韩欣举例补充道,“腾讯云 Kafka 主要为公有云用户以及公司内部业务提供服务。我们遇到了很多需要支持百万分区的用户,比如腾讯云 Serverless、腾讯云的 CLS 日志服务、云上的一些客户等,他们面临的场景是一个客户需要一个 topic 来进行业务逻辑处理,当用户量达到百万千万量级的情况下,topic 带来的膨胀是非常恐怖的。在当前架构下,Kafka 单集群无法稳定承载百万分区稳定运行。这也是我对新的 KIP-500 版本感到非常兴奋的原因。”
Kafka中zk的作用
/brokers/ids:临时节点,保存所有broker节点信息,存储broker的物理地址、版本信息、启动时间 等,节点名称为brokerID,broker定时发送心跳到zk,如果断开则该brokerID会被删除
/brokers/topics:临时节点,节点保存broker节点下所有的topic信息,每一个topic节点下包含一个固 定的partitions节点,partitions的子节点就是topic的分区,每个分区下保存一个state节点、保存着当 前leader分区和ISR的brokerID,state节点由leader创建,若leader宕机该节点会被删除,直到有新的 leader选举产生、重新生成state节点
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]:维护消费者和分区的注册关系
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]:分区消息的消费进度Offset
client通过topic找到topic树下的state节点、获取leader的brokerID,到broker树中找到broker的物理 地址,但是client不会直连zk,而是通过配置的broker获取到zk中的信息
总结:
Zookeeper加kafka 的架构,有三层角色,一层是zookeeper ,提供基础的状态持久化和状态通知服务。一层是controller ,基于zookeeper 提供的服务,给松散的broker 提供统一的状态服务,但它本身是没有状态服务的,它是依赖zookeeper 的服务来做主控。一层是broker ,无状态服务,因为他们无状态,无法自发组织起来,所以需要controller 为他们做主控。其中有一个broker 兼职了controller 角色。
这个架构本来没什么问题,但是如果要优化,也是可以的。zookeeper 本来提供了状态服务,但是因为它不是kafka 的一部分,所以kafka 不得不设计一个controller 来做主控。假如controller 本身就可以提供状态服务,那上面的三层架构就可以简化成两层,也就是一层controller ,提供主控服务,一层broker ,无状态工作服务。
Kafka 抛弃zookeeper 就是做了这样的优化,自己开发了一个基于raft 共识算法的一致性服务kraft,为集群提供之前zookeeper 的状态服务的同时,也为broker 提供主控服务,也就是controller。这样相比之前的架构还有一个很大的优点,那就是controller 的故障切换会非常快,而且切换时间几乎不随集群规模而线性增长。
原因是,以前的架构,controller 只有一个,如果做controller 的故障切换,那么新controller 需要全量从zookeeper同步集群所有元数据信息并构建到内存,来为做主控服务做准备,这些元数据信息包括topic 和分区信息,如果是一个大规模集群,topic 和分区很多,这个过程将会很耗时,也就会造成更久的停机时间。而kraft 的架构,controller有多个,只是只有一个是leader 提供主控服务,其他的作为follower ,会实时同步leader的元数据信息,也就是元数据在多个controller 里面是几乎保持一致的(raft 协议保证的),所以故障切换的时候,几乎不需要再同步元数据,就可以完成controller 切换。
Kafka的rebalance机制
consumer group中的消费者与topic下的partion重新匹配的过程 何时会产生rebalance:
-
consumer group中的成员个数发生变化
-
consumer消费超时
-
group订阅的topic个数发生变化
-
group订阅的topic的分区数发生变化
-
理解 Rebalance
-
Rebalance 是 Kafka 消费者组(Consumer Group)中的一个重要概念。当消费者组中的消费者成员数量发生变化(比如有新的消费者加入或者旧的消费者离开),或者订阅的主题分区(Topic Partition)数量发生变化时,Kafka 会进行 Rebalance 操作。
-
其目的是重新分配主题分区到消费者,以保证每个消费者处理的分区数量相对均衡,从而高效地消费消息。例如,一个消费者组有 3 个消费者,订阅了一个有 6 个分区的主题,在理想情况下,每个消费者会分配到 2 个分区。但如果有一个消费者退出,那么剩下的 2 个消费者就会重新分配分区,可能每个消费者会分到 3 个分区。
-
-
Rebalance 引发的问题
-
数据重复消费:在 Rebalance 过程中,消费者可能会在尚未提交偏移量(Offset)的情况下重新分配分区。这就可能导致部分消息被重复消费。比如,消费者 A 正在处理分区 1 中的消息,还没来得及提交偏移量就发生了 Rebalance,分区 1 被重新分配给消费者 B,那么消费者 B 可能会从分区 1 的起始位置开始消费,从而导致部分消息被重复处理。
-
数据丢失:如果消费者在 Rebalance 之前已经提交了偏移量,但实际上还没有完成消息的处理,那么在 Rebalance 之后,这些未处理完的消息可能会被新分配到该分区的消费者忽略,从而导致数据丢失。
-
性能影响:Rebalance 过程本身会消耗一定的资源和时间。频繁的 Rebalance 会导致消费者频繁地暂停消息消费,重新分配分区,从而影响整个系统的消费性能。
-
-
解决 Rebalance 问题的策略
-
合理配置参数
-
max.poll.interval.ms:这个参数定义了消费者两次调用
poll
方法的最大间隔时间。如果超过这个时间,消费者会被认为已经失效,从而可能引发 Rebalance。在实际应用中,需要根据消息处理的复杂度来合理设置这个参数。例如,如果消息处理时间较长,应该适当增大这个参数的值,避免消费者因为处理消息时间过长而被踢出消费者组导致 Rebalance。假设消息处理的平均时间是 5 分钟,那么可以将max.poll.interval.ms
设置为大于 5 分钟的值,如 300000(5 分钟换算成毫秒)。 -
session.timeout.ms:这个参数定义了消费者在没有发送心跳(Heartbeat)到协调器(Coordinator)的最长时间间隔。如果消费者在这个时间内没有发送心跳,协调器会认为消费者已经失效,进而触发 Rebalance。一般情况下,可以适当增大这个参数的值来减少因为网络抖动等原因导致的虚假失效情况。但是,也不能设置得过大,否则会导致消费者真正失效后不能及时被发现。通常可以将其设置为 10 – 30 秒左右,具体要根据网络环境和应用场景来调整。
-
-
手动提交偏移量
-
-
自动提交偏移量可能会在 Rebalance 时导致数据丢失或重复消费的问题。通过手动提交偏移量,可以更好地控制消息消费的进度。在代码中,可以使用
commitSync
或commitAsync
方法来手动提交偏移量。 -
例如,在 Java 中使用 Kafka 消费者 API:
-
java
-
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class KafkaConsumerRebalanceExample { public static void main(String[] args) { // 配置消费者属性 Properties props = new Properties(); props.put(“bootstrap.servers”, “localhost:9092”); props.put(“group.id”, “test-group”); props.put(“enable.auto.commit”, “false”); // 关闭自动提交偏移量 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(“test-topic”)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (TopicPartition partition : records.partitions()) { long lastOffset = -1; for (ConsumerRecord<String, String> record : records.records(partition)) { // 处理消息 System.out.println(“Received message: ” + record.value() + ” from partition ” + partition.partition()); lastOffset = record.offset(); } // 手动提交偏移量 Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); consumer.commitSync(offsets); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } } -
-
消费者分区分配策略优化
-
Kafka 提供了多种分区分配策略,如
Range
、RoundRobin
等。默认是Range
策略,在某些情况下可能会导致分区分配不均衡。可以根据实际需求选择更合适的分配策略。 -
例如,
RoundRobin
策略可以使分区在消费者之间更均匀地分配。如果消费者组中的消费者处理能力相对均衡,使用RoundRobin
策略可以更好地实现负载均衡。可以通过设置partition.assignment.strategy
参数来选择分配策略。在消费者配置属性中添加partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
来启用RoundRobin
分配策略。
-
-
Kafka的性能好在什么地方
Kafka不基于内存,而是硬盘存储,因此消息堆积能力更强
顺序写:利用磁盘的顺序访问速度可以接近内存,kafka的消息都是append操作,partition是有序的, 节省了磁盘的寻道时间,同时通过批量操作、节省写入次数,partition物理上分为多个segment存储, 方便删除
传统:
读取磁盘文件数据到内核缓冲区
将内核缓冲区的数据copy到用户缓冲区
将用户缓冲区的数据copy到socket的发送缓冲区
将socket发送缓冲区中的数据发送到网卡、进行传输
零拷贝:
直接将内核缓冲区的数据发送到网卡传输
使用的是操作系统的指令支持
Kafka不太依赖JVM,主要理由操作系统的pageCache,如果生产消费速率相当,则直接用pageCache 交换数据,不需要经过磁盘IO
简单配置
KRaft介绍
定义与用途:Raft协议是一种一致性算法,用于在分布式系统中保持整个系统的一致性。它允许分布式系统中各节点在出现故障时可以针对一系列的数值达成一致,以可靠、复制、冗余、容错而闻名。
核心角色:Raft协议包含三种角色,分别是Leader(领导者)、Follower(跟随者)和Candidate(候选者)。其中,Leader负责处理客户端请求,Follower只被动响应来自Leader和Candidate的请求,Candidate则主要用于选举新的Leader。
Leader选举:通过心跳机制和选举超时机制来触发,选举出Leader节点来对外提供服务。
日志复制:Leader将接收到的客户端请求包装为日志,并复制给其他Follower节点,以保持状态的一致性。
安全性措施:通过一系列规则来确保服务的可用性和数据的一致性。
应用实例:Raft协议被广泛应用于各种分布式系统中,如etcd、TiKV/TiDB等。
KRaft协议
背景与来源:KRaft协议并非一个广泛认知的、与Raft协议并列的协议。在某些上下文中,KRaft可能指的是Kafka Raft Metadata mode的缩写,这是Kafka为了增强其元数据管理而引入的一种基于Raft协议的机制。然而,这并不是一个独立的、与Raft协议并列的协议。
与Raft的关系:如果KRaft指的是Kafka中的这种机制,那么它与Raft协议的关系是应用与基础的关系。即,Kafka利用了Raft协议的一致性和容错性来增强其元数据管理的能力。
应用场景:在Kafka中,KRaft模式被设计为一种替代ZooKeeper的方案,用于管理Kafka集群的元数据,如主题、分区、副本等。
KRaft协议(如果指的是Kafka Raft Metadata mode)是Kafka中基于Raft协议的一种元数据管理机制,它并不是一个独立的协议,而是Raft协议在Kafka中的一个应用实例。
Raft协议则是一种广泛应用的分布式一致性算法,它提供了在分布式系统中保持系统一致性的方法。
因此,从严格意义上讲,KRaft协议并不是与Raft协议并列的一个协议,而是Raft协议在特定应用场景(如Kafka元数据管理)中的实现或变种。
KRaft选举机制
Raft是一种强领导者模型,通过领导者实现各个节点日志一致的一种共识算法。在Raft中,节点有三种身份:领导者(Leader)、跟随者(Follower)和候选人(Candidate)。
跟随者(Follower):在Raft中只有领导者才会与客户端交互,因此在不发生选举时,跟随者仅默默地处理来自领导者发送的消息,充当数据冗余的作用。当领导者心跳超时,跟随者就会主动推荐自己当选候选人。
候选人(Candidate):成为候选人之后,就会向其他节点发送请求投票消息,以获取其他节点的投票。如果获得了大多数选票,则当选领导者。
领导者(Leader):领导者是与客户端交互的唯一角色,负责处理请求、管理日志的复制,并不断地发送心跳信息给跟随者,以刷新跟随者节点的超时时间,防止跟随者发起新的选举。
KRaft选举原理:
在Kafka中,KRaft选举原理基于Raft算法,但进行了适应Kafka环境的调整。以下是KRaft选举的关键点:
节点状态:在KRaft协议下,Quorum(仲裁)中的一个节点可以处于以下四种状态之一:Candidate(候选者)、Leader(领导者)、Follower(跟随者)和Observer(观察者,没有投票权的Follower,通常不考虑)。
选举触发条件:当满足以下三个条件之一时,Quorum中的某个节点就会触发选举:
向Leader发送Fetch请求后,在超时阈值(如quorum.fetch.timeout.ms)之后仍然没有得到Fetch响应,表示Leader疑似失败。
从当前Leader收到了EndQuorumEpoch请求,表示Leader已退位。
Candidate状态下,在超时阈值(如quorum.election.timeout.ms)之后仍然没有收到多数票,也没有Candidate赢得选举,表示此次选举作废,需要重新进行选举。
投票流程:选举过程中的投票流程与经典Raft协议相同。候选人发送Vote请求来获取选票,当某个候选人获得多数票时,它将成为新的领导者。领导者随后会发送BeginQuorumEpoch消息来告知其他节点当前的Leader信息。
日志复制与一致性:在KRaft中,领导者负责将日志复制给跟随者。为了保持一致性,KRaft协议采用了拉模式(Pull-based)进行日志复制。跟随者通过发送Fetch请求从领导者那里拉取日志。这种方式可以将一致性检查的工作放在领导者端,并允许更快速地启动一个新的跟随者(直接从偏移量0开始复制日志)。
任期与日志管理:在KRaft中,每次选举都会有一个新的任期(Epoch)开始。领导者在任期开始时可能会提交一条空的日志来确保日志的一致性。如果新的领导者在提交属于它的任期的日志之前崩溃,那么之前的日志仍然被视为未提交。这有助于防止数据丢失和确保数据的一致性。