首页 > 其他 > 详细

切换 leader

时间:2020-06-21 19:06:05      阅读:68      评论:0      收藏:0      [点我收藏+]

场景: 3 节点集群 (b1, b2, b3),分区 tp1 的 isr[1, 2, 3],leader 是 1,现在希望把 tp1 的 leader 切换为 3,怎么操作?

1. 通过 zk 客户端,修改 zk 上 tp1 的 isr 列表为 [3, 2, 1]

2. 执行命令行 kafka-preferred-replica-election.sh,把需要修改 leader 的分区信息写入到 zk 的 /admin/preferred_replica_election 节点

// kafka.admin.PreferredReplicaLeaderElectionCommand#writePreferredReplicaElectionData
  def writePreferredReplicaElectionData(zkClient: KafkaZkClient,
                                        partitionsUndergoingPreferredReplicaElection: Set[TopicPartition]) {
    try {
      zkClient.createPreferredReplicaElection(partitionsUndergoingPreferredReplicaElection.toSet)
      println("Created preferred replica election path with %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
    } catch {
      case _: NodeExistsException =>
        throw new AdminOperationException("Preferred replica leader election currently in progress for " +
          "%s. Aborting operation".format(zkClient.getPreferredReplicaElection.mkString(",")))
      case e2: Throwable => throw new AdminOperationException(e2.toString)
    }
  }

3. KafkaController 监听 /admin/preferred_replica_election 节点,重新选举 leader,取 isr 中第一个副本为 leader,发送 LeaderAndIsrRequest 请求给其他 broker,通知他们转变副本角色

// kafka.controller.KafkaController.PreferredReplicaLeaderElection
  case object PreferredReplicaLeaderElection extends ControllerEvent {
    override def state: ControllerState = ControllerState.ManualLeaderBalance

    override def process(): Unit = {
      if (!isActive) return

      // We need to register the watcher if the path doesn‘t exist in order to detect future preferred replica
      // leader elections and we get the `path exists` check for free
      if (zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) {
        val partitions = zkClient.getPreferredReplicaElection
        val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
        if (partitionsForTopicsToBeDeleted.nonEmpty) {
          error(s"Skipping preferred replica election for partitions $partitionsForTopicsToBeDeleted since the " +
            "respective topics are being deleted")
        }
        onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
      }
    }
  }
  
// kafka.controller.KafkaController#onPreferredReplicaElection
  private def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) {
    info(s"Starting preferred replica leader election for partitions ${partitions.mkString(",")}")
    try {
      partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
    } catch {
      case e: Throwable => error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")}", e)
    } finally {
      removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
    }
  }

 

切换 leader

原文:https://www.cnblogs.com/allenwas3/p/13173348.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!