Kafka 依赖 Zookeeper 来维护集群成员的信息:
每个 broker 都有一个唯一标识符 ID,这个标识符可以在配置文件里指定,也可以自动生成。
在 broker 启动的时候,它通过在 Zookeeper 的 /brokers/ids
路径上创建临时节点,把自己的 ID 注册 Zookeeper。
Kafka 组件会订阅 Zookeeper 的 /brokers/ids
路径,当有 broker 加入集群或退出集群时,这些组件就可以获得通知。
在 broker 停机、出现网络分区或长时间垃圾回收停顿时,会导致其 Zookeeper 会话失效,导致其在启动时创建的临时节点会自动被移除。
监听 broker 列表的 Kafka 组件会被告知该 broker 已移除,然后处理 broker 崩溃的后续事宜。
在完全关闭一个 broker 之后,如果使用相同的 ID 启动另一个全新的 broker,它会立即加入集群,并拥有与旧 broker 相同的分区和主题。
controller 其实就是一个 broker,它除了具有一般 broker 的功能之外,还负责分区 leader 的选举。
为了在整个集群中指定一个唯一的 controller,broker 集群需要进行选举,该过程依赖以下两个 Zookeeper 节点:
// 临时节点 controller(保存最新的 controller 节点信息,保证唯一性)
object ControllerZNode {
def path = "/controller"
def encode(brokerId: Int, timestamp: Long): Array[Byte] = {
Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString).asJava)
}
def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js =>
js.asJsonObject("brokerid").to[Int]
}
}
// 永久节点 controller_epoch(保存最新 controller 对应的任期号,用于避免脑裂)
object ControllerEpochZNode {
def path = "/controller_epoch"
def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes(UTF_8)
def decode(bytes: Array[Byte]): Int = new String(bytes, UTF_8).toInt
}
broker 启动后会发起一轮选举,选举通过 Zookeeper 提供的创建节点功能来实现:
/controller
,创建成功的 broker 将成为 controller。/controller_epoch
中的任期号,其他 broker 可以根据任期号忽略已过期 controller 的消息。Watcher
实时监控/controller
节点。broker 中的 KafkaController 对象负责发起选举:
private def elect(): Unit = {
// 检查集群中是否存在可用 controller (activeControllerId == -1)
try {
// 当前 broker 通过 KafkaZkClient 发起选举,并选举自己为新的 controller
val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
// 如果 broker 当选,则更新对应的 controller 相关信息
controllerContext.epoch = epoch
controllerContext.epochZkVersion = epochZkVersion
activeControllerId = config.brokerId
info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} and epoch zk version is now ${controllerContext.epochZkVersion}")
onControllerFailover() // 选举成功后触发维护操作
} catch {
case e: ControllerMovedException =>
maybeResign()
if (activeControllerId != -1) // 其他 broker 被选为 controller
debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
else // 本轮选举没有产生 controller
warn("A controller has been elected but just resigned, this will result in another round of election", e)
case t: Throwable =>
error(s"Error while electing or becoming controller on broker ${config.brokerId}. Trigger controller movement immediately", t)
triggerControllerMove()
}
}
KafkaZkClient 中更新 Zookeeper 的逻辑如下:
def registerControllerAndIncrementControllerEpoch(controllerId: Int): (Int, Int) = {
val timestamp = time.milliseconds()
// 从 /controller_epoch 获取当前 controller 对应的 epoch 与 zkVersion
// 若 /controller_epoch 不存在则尝试创建
val (curEpoch, curEpochZkVersion) = getControllerEpoch
.map(e => (e._1, e._2.getVersion))
.getOrElse(maybeCreateControllerEpochZNode())
// 创建 /controller 并原子性更新 /controller_epoch
val newControllerEpoch = curEpoch + 1
val expectedControllerEpochZkVersion = curEpochZkVersion
debug(s"Try to create ${ControllerZNode.path} and increment controller epoch to $newControllerEpoch with expected controller epoch zkVersion $expectedControllerEpochZkVersion")
// 处理 /controller 节点已存在的情况,直接返回最新节点信息
def checkControllerAndEpoch(): (Int, Int) = {
val curControllerId = getControllerId.getOrElse(throw new ControllerMovedException(
s"The ephemeral node at ${ControllerZNode.path} went away while checking whether the controller election succeeds. " +
s"Aborting controller startup procedure"))
if (controllerId == curControllerId) {
val (epoch, stat) = getControllerEpoch.getOrElse(
throw new IllegalStateException(s"${ControllerEpochZNode.path} existed before but goes away while trying to read it"))
// 如果最新的 epoch 与 newControllerEpoch 相等,则可以推断 zkVersion 与当前 broker 已知的 zkVersion 一致
if (epoch == newControllerEpoch)
return (newControllerEpoch, stat.getVersion)
}
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
}
// 封装 zookeeper 请求
def tryCreateControllerZNodeAndIncrementEpoch(): (Int, Int) = {
val response = retryRequestUntilConnected(
MultiRequest(Seq(
// 发送 CreateRequest 创建 /controller 临时节点
CreateOp(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.EPHEMERAL),
// 发送 SetDataRequest 更新 /controller_epoch 节点信息
SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion)))
)
response.resultCode match {
case Code.NODEEXISTS | Code.BADVERSION => checkControllerAndEpoch()
case Code.OK =>
val setDataResult = response.zkOpResults(1).rawOpResult.asInstanceOf[SetDataResult]
(newControllerEpoch, setDataResult.getStat.getVersion)
case code => throw KeeperException.create(code)
}
}
// 向 zookeepr 发起请求
tryCreateControllerZNodeAndIncrementEpoch()
}
一个 Kafka 分区本质上就是一个备份日志,通过利用多份相同的冗余副本replica
保持系统高可用性。
Kafka 把分区的所有副本均匀地分配到所有 broker上,并从这些副本中挑选一个作为 leader 副本对外提供服务。
而其他副本被称为 follower 副本,不对外提供服务,只能被动地向 leader 副本请求数据,保持与 leader 副本的同步。
当 controller 发现一个 broker 加入集群时,它会使用 broker.id 来检查新加入的 broker 是否包含现有分区的副本。
如果有,controller 就把变更通知发送所有 broker,新 broker 中的分区作为 follower 副本开始从 leader 那里复制消息。
Kafka 为每个主题维护了一组同步副本集合in-sync replicas
(其中包含 leader 副本)。
只有被 ISR 中的所有副本都接收到的那部分生产者写入的消息才对消费者可见,这意味着 ISR 中的所有副本都会与 leader 保持同步状态。
为了避免出现新 leader 数据不完整导致分区数据丢失的情况,只有 ISR 中 follower 副本才有资格被选举为 leader。
若 follower 副本无法在 replica.lag.time.max.ms
毫秒内向 leader 请求数据,那么该 follower 就会被视为不同步,leader 会将其剔除出 ISR。
leader 会在 ISR 集合发生变更时,会在/isr_change_notification
下创建一个永久节点并写入变更信息。
当监控/isr_change_notification
的 controller 接收到通知后,会更新其他 broker 的元数据,最后删除已处理过的节点。
当出现瞬时峰值流量,只要 follower 不是持续性落后,就不会反复地在 ISR 中移进、移出,避免频繁访问 Zookeeper 影响性能。
创建主题时,Kafka 会为每个分区选定一个初始分区 leaderpreferred leader
,其对应的副本被称为首选副本preferred replica
。
controller 在创建主题时会保证 leader 在 broker 之间均衡分布,因此当 leader 按照初始的首选副本分布时,broker 间的负载均衡状态最佳。
然而 broker 失效是难以避免的,重启后的首选副本只能作为 follower 副本加入 ISR 中,不能再对外提供服务。
随着集群的不断运行,leader 不均衡现象会愈发明显:集群中的一小部分 broker 上承载了大量的分区 leader 副本。
可以设置 auto.leader.rebalance.enable = true
解决这一问题:
leader.imbalance.per.broker.percentage
时会自动执行一次 leader 均衡操作。
当一个新的 broker 刚加入集群时,不会自动地分担己有 topic 的负载,它只会对后续新增的 topic 生效。
如果要让新增 broker 为己有的 topic 服务,用户必须手动地调整现有的 topic 的分区分布,将一部分分区搬移到新增 broker 上。这就是所谓的分区重分配reassignment
操作。
除了处理 broker 扩容导致的不均衡之外,再均衡还能用于处理 broker 存储负载不均衡的情况,在单个或多个 broker 之间的日志目录之间重新分配分区。 用于解决多个代理之间的存储负载不平衡。
触发分区 leader 选举的几种场景:
当上述几种情况发生时,controller 会遍历所有相关的主题分区并从为其指定新的 leader。
然后向所有包含相关主题分区的 broker 发送更新请求,其中包含了最新的 leader 与 follower 副本分配信息。
更新完毕后,新 leader 会开始处理来自生产者和消费者的请求,而follower 开始从新 leader 那里复制消息。
分区状态信息在对应的节点信息:
// 节点 /brokers/topics/{topic-name}/partitions/{partition-no}/state 保存分区最新状态信息的
object TopicPartitionStateZNode {
def path(partition: TopicPartition) = s"${TopicPartitionZNode.path(partition)}/state"
def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = {
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
Json.encodeAsBytes(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
"controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr.asJava).asJava)
}
def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
Json.parseBytes(bytes).map { js =>
val leaderIsrAndEpochInfo = js.asJsonObject
val leader = leaderIsrAndEpochInfo("leader").to[Int]
val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
val zkPathVersion = stat.getVersion
LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)
}
}
}
PartitionStateMachine 管理分区选举的代码:
private def doElectLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = {
// 请求 Zookeeper 获取 partition 当前状态
val getDataResponses = try {
zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
case e: Exception =>
return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
}
val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
getDataResponses.foreach { getDataResponse =>
val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
val currState = partitionState(partition)
if (getDataResponse.resultCode == Code.OK) {
// 剔除状态已失效或不存在的 partition
TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
case Some(leaderIsrAndControllerEpoch) =>
if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
s"already written by another controller. This probably means that the current controller $controllerId went through " +
s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
} else {
validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
}
case None =>
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn‘t exist for partition $partition in $currState state")
failedElections.put(partition, Left(exception))
}
} else if (getDataResponse.resultCode == Code.NONODE) {
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn‘t exist for partition $partition in $currState state")
failedElections.put(partition, Left(exception))
} else {
failedElections.put(partition, Left(getDataResponse.resultException.get))
}
}
// 如果全部 partition 均失效,则跳过此次选举
if (validLeaderAndIsrs.isEmpty) {
return (failedElections.toMap, Seq.empty)
}
// 根据指定的选举策略选择 partition leader
val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
// Elect leaders for new or offline partitions.
case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(validLeaderAndIsrs, allowUnclean)
leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
// Elect leaders for partitions that are undergoing reassignment.
case ReassignPartitionLeaderElectionStrategy =>
leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
// Elect preferred leaders.
case PreferredReplicaPartitionLeaderElectionStrategy =>
leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
// Elect leaders for partitions whose current leaders are shutting down.
case ControlledShutdownPartitionLeaderElectionStrategy =>
leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
}
partitionsWithoutLeaders.foreach { electionResult =>
val partition = electionResult.topicPartition
val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
}
// 将选举结果同步到 TopicPartitionStateZNode 对应的 Zookeeper 节点
val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
finishedUpdates.forKeyValue { (partition, result) =>
result.foreach { leaderAndIsr =>
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
}
}
(finishedUpdates ++ failedElections, updatesToRetry)
}
日志复制中的一些重要偏移概念:
base offset
:副本所含第一条消息的 offsethigh watermark
:副本最新一条己提交消息的 offsetlog end offset
:副本中下一条待写入消息的 offset
每个副本会同时维护 HW 与 LEO 值:
Kafka 中的复制流程大致如下:
在前面我们提到 follower 在重启后会对日志进行截断,这可能导致消息会丢失:
假设某个分区分布在 A 和 B 两个 broker 上,且最开始时 B 是分区 leader
为了解决这一问题,Kafka 为每一届 leader 分配了一个唯一的 epoch,由其追加到日志的消息都会包含这个 epoch。
然后每个副本都在本地维护一个 epoch 快照文件,并在其中保存 (epoch, offset)
:
回到之前的场景,增加了 leader epoch 之后的行为如下:
LeaderEpochRequest
请求最新的 leader epoch更多的细节可以参考这篇文章。
创建主题时,Kafka 会为主题的每个分区在文件系统中创建了一个对应的子目录,命名格式为主题名-分区号
,每个日志子目录的文件构成如下:
[lhop@localhost log]$ tree my-topic-*
my-topic-0
├── 00000000000050209130.index
├── 00000000000050209130.log
├── 00000000000050209130.snapshot
├── 00000000000050209130.timeindex
└── leader-epoch-checkpoint
my-topic-1
├── 00000000000048329826.index
├── 00000000000048329826.log
├── 00000000000048329826.timeindex
└── leader-epoch-checkpoint
其中的 leader-epoch-checkpoint
文件用于存储 leader epoch 快照,用于协助崩溃的副本执行恢复操作,在此就不详细展开。我们重点关注剩余的两类文件。
日志段文件(.log)的文件保存着真实的 Kafka 记录。
Kafka 使用该文件第一条记录对应的 offset 来命名此文件。
每个日志段文件是有上限大小的,由 broker 端参数log.segment.bytes
控制。
除了键、值和偏移量外,消息里还包含了消息大小、校验和、消息格式版本号、压缩算法和时间戳。时间戳可以是生产者发送消息的时间,也可以是消息到达 broker 的时间,这个是可配置的。
如果生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩在一起。broker 会原封不动的将消息存入磁盘,然后再把它发送给消费者。消费者在解压这个消息之后,会看到整个批次的消息,它们都有自己的时间戳和偏移量。
这意味着 broker 可以使用zero-copy
技术给消费者发送消息,同时避免了对生产者已经压缩过的消息进行解压和再压缩。
位移索引文件(.index)与时间戳索引(.timeindex)是两个特殊的索引文件:
它们都属于稀疏索引文件,每写入若干条记录后才增加一个索引项。写入间隔可以 broker 端参数 log.index.interval.bytes
设置。
索引文件严格按照时间戳顺序保存,因此 Kafka 可以利用二分查找算法提高查找速度。
原文:https://www.cnblogs.com/buttercup/p/14398617.html