https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals
In a Kafka cluster, one of the brokers serves as the controller, which is
responsible for managing the states of partitions and replicas and for
performing administrative tasks like reassigning partitions.
Controller是为了加入replica机制而创建的,0.7时broker之间没有很强的关联,而由于现在每个topic
partition都需要考虑,将replicas放在哪个broker上,并要处理例如reassignment或delete等操作,所以需要有个master来协调,于是就加入了controller
ControllerContext
其中关键是记录所有topics的partitions和replicas间关系,包括assignment和leadship关系

package kafka.controller
class ControllerContext(val zkClient: ZkClient,
val zkSessionTimeout: Int) {
var controllerChannelManager: ControllerChannelManager = null
val controllerLock: ReentrantLock = new ReentrantLock()
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
val brokerShutdownLock: Object = new Object
var epoch: Int = KafkaController.InitialControllerEpoch - 1
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
val correlationId: AtomicInteger = new AtomicInteger(0)
//Topics,partitions和replica间的关系
var allTopics: Set[String] = Set.empty
var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet
private var liveBrokersUnderlying: Set[Broker] = Set.empty
private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
PartitionStateMachine
加入replica是很复杂的设计,其中重要的一点就是要考虑partitions和replicas中不同情况下的处理
这里先看看Partition的各种state,以及state之间的状态机
NonExistentPartition,不存在或被删掉的p,前一个状态是OfflinePartition
NewPartition,
刚被创建,但还没有完成leader election,
前一个状态是NonExistentPartition
OnlinePartition,具有leader,正常状态,前一个状态是NewPartition/OfflinePartition
OfflinePartition,leader
die的时候partition会变为offline,前一个状态是NewPartition/OnlinePartition
这里NewPartition和OfflinePartition都是没有leader的状态,为何要区别开来?见下

/**
* This class represents the state machine for partitions. It defines the states that a partition can be in, and
* transitions to move the partition to another legal state. The different states that a partition can be in are -
* 1. NonExistentPartition: This state indicates that the partition was either never created or was created and then
* deleted. Valid previous state, if one exists, is OfflinePartition
* 2. NewPartition : After creation, the partition is in the NewPartition state. In this state, the partition should have
* replicas assigned to it, but no leader/isr yet. Valid previous states are NonExistentPartition
* 3. OnlinePartition : Once a leader is elected for a partition, it is in the OnlinePartition state.
* Valid previous states are NewPartition/OfflinePartition
* 4. OfflinePartition : If, after successful leader election, the leader for partition dies, then the partition
* moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
*/
sealed trait PartitionState { def state: Byte }
case object NewPartition extends PartitionState { val state: Byte = 0 }
case object OnlinePartition extends PartitionState { val state: Byte = 1 }
case object OfflinePartition extends PartitionState { val state: Byte = 2 }
case object NonExistentPartition extends PartitionState { val state: Byte = 3 }
核心函数handleStateChanges,对于每个topicAndPartition调用状态机函数handleStateChange,然后把结果告诉每个brokers

/**
* This API is invoked by the partition change zookeeper listener
* @param partitions The list of partitions that need to be transitioned to the target state
* @param targetState The state that the partitions should be moved to
*/
def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
callbacks: Callbacks = (new CallbackBuilder).build) {
try {
brokerRequestBatch.newBatch()
partitions.foreach { topicAndPartition =>
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
}catch {
}
}
状态机函数的具体逻辑,
其他的状态变化都比较简单
唯有,在变成OnlinePartition的时候,需要区分new或offline两种状况,下面具体看下

/**
* This API exercises the partition‘s state machine. It ensures that every state transition happens from a legal
* previous state to the target state. Valid state transitions are:
* NonExistentPartition -> NewPartition:
* --load assigned replicas from ZK to controller cache
*
* NewPartition -> OnlinePartition
* --assign first live replica as the leader and all live replicas as the isr; write leader and isr to ZK for this partition
* --send LeaderAndIsr request to every live replica and UpdateMetadata request to every live broker
*
* OnlinePartition,OfflinePartition -> OnlinePartition
* --select new leader and isr for this partition and a set of replicas to receive the LeaderAndIsr request, and write leader and isr to ZK
* --for this partition, send LeaderAndIsr request to every receiving replica and UpdateMetadata request to every live broker
*
* NewPartition,OnlinePartition,OfflinePartition -> OfflinePartition
* --nothing other than marking partition state as Offline
*
* OfflinePartition -> NonExistentPartition
* --nothing other than marking the partition state as NonExistentPartition
* @param topic The topic of the partition for which the state transition is invoked
* @param partition The partition for which the state transition is invoked
* @param targetState The end state that the partition should be moved to
*/
private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
leaderSelector: PartitionLeaderSelector,
callbacks: Callbacks) {
val topicAndPartition = TopicAndPartition(topic, partition)
val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
try {
targetState match {
case NewPartition =>
// pre: partition did not exist before this
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) // 判断前个状态是否valid
assignReplicasToPartitions(topic, partition) // 从zk读取分配的replicas
partitionState.put(topicAndPartition, NewPartition) // 将partition state设为NewPartition
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",") // 获取AR
// post: partition has been assigned replicas
case OnlinePartition =>
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
partitionState(topicAndPartition) match {
case NewPartition =>
// initialize leader and isr path for new partition
initializeLeaderAndIsrForPartition(topicAndPartition) // 初始化leader
case OfflinePartition =>
electLeaderForPartition(topic, partition, leaderSelector) // 选取新的leader
case OnlinePartition => // invoked when the leader needs to be re-elected
electLeaderForPartition(topic, partition, leaderSelector)
case _ => // should never come here since illegal previous states are checked above
}
partitionState.put(topicAndPartition, OnlinePartition)
// post: partition has a leader
case OfflinePartition =>
// pre: partition should be in New or Online state
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
// should be called when the leader for a partition is no longer alive
partitionState.put(topicAndPartition, OfflinePartition) // 修改state
// post: partition has no alive leader
case NonExistentPartition =>
// pre: partition should be in Offline state
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
partitionState.put(topicAndPartition, NonExistentPartition) // 修改state
// post: partition state is deleted from all brokers and zookeeper
}
} catch {
}
}
对于初始化leader只需要取出liveAssignedReplicas的head
而对于offline,需要优先ISR中的,然后才是AR中的
(大部分情况下,AR和ISR是一样的, 只有在onPartitionReassignment的时候,会有所不同,具体看后面的例子)
NewPartition->OnlinePartition

/**
* Invoked on the NewPartition->OnlinePartition state change. When a partition is in the New state, it does not have
* a leader and isr path in zookeeper. Once the partition moves to the OnlinePartition state, it‘s leader and isr
* path gets initialized and it never goes back to the NewPartition state. From here, it can only go to the
* OfflinePartition state.
* @param topicAndPartition The topic/partition whose leader and isr path is to be initialized
*/
private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
liveAssignedReplicas.size match {
case 0 => // 没有live的replica...报错
case _ =>
// make the first replica in the list of assigned replicas, the leader
val leader = liveAssignedReplicas.head // 只是将liveAssignedReplicas的第一个作为leader
val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
controller.epoch)
}
OfflinePartition,OnlinePartition->OnlinePartition

/**
* Invoked on the OfflinePartition,OnlinePartition->OnlinePartition state change.
* It invokes the leader election API to elect a leader for the input offline partition
* @param topic The topic of the offline partition
* @param partition The offline partition
* @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.)
*/
def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
val topicAndPartition = TopicAndPartition(topic, partition)
// handle leader election for the partitions whose leader is no longer alive
try {
// elect new leader or throw exception
val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) // 使用PartitionLeaderSelector.selectLeader来选leader
val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
} catch {
}
}
OfflinePartitionLeaderSelector

package kafka.controller.PartitionLeaderSelector
/**
* Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest):
* 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live
* isr as the new isr.
* 2. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr.
* 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException
* Replicas to receive LeaderAndIsr request = live assigned replicas
* Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
*/
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
case Some(assignedReplicas) =>
val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
case true =>
debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
.format(topicAndPartition, liveAssignedReplicas.mkString(",")))
liveAssignedReplicas.isEmpty match {
case true =>
throw new NoReplicaOnlineException(("No replica for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas))
case false =>
ControllerStats.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicas.head // 其次选取AR中的replica
warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There‘s potential data loss."
.format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
}
case false =>
val newLeader = liveBrokersInIsr.head // 优先选取ISR中的replica
debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
.format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
}
info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
(newLeaderAndIsr, liveAssignedReplicas)
case None =>
throw new NoReplicaOnlineException("Partition %s doesn‘t have".format(topicAndPartition) + "replicas assigned to it")
}
}
}
未完。。。
Apache Kafka源码分析 – Controller,布布扣,bubuko.com
Apache Kafka源码分析 – Controller
原文:http://www.cnblogs.com/fxjwind/p/3569518.html