自学内容网 自学内容网

Kafka-Controller选举

一、上下文

Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程,这个时候每个broker都不是controller角色,下面我们就来看下它是如何选举出来的吧

二、设置ZooKeeper

‌ZooKeeper是一个开源的分布式协调服务,主要用于分布式系统中各节点的协调和管理。Kafka的Controller选举也一样用到了它。

  override def startup(): Unit = {
    //....
    initZkClient(time)
    configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))
    //....
  }

  private def initZkClient(time: Time): Unit = {
    //config.zkConnect
    //zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafka
    info(s"Connecting to zookeeper on ${config.zkConnect}")
    _zkClient = KafkaZkClient.createZkClient("Kafka server", time, config, zkClientConfig)
    //如果需要,在ZK中预先创建顶级路径。
    _zkClient.createTopLevelPaths()
  }

  def createTopLevelPaths(): Unit = {
    //创建 Persistent 持久化的 zk 路径
    ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists)
  }

  //确保ZK中存在持久路径
  def makeSurePersistentPathExists(path: String): Unit = {
    createRecursive(path, data = null, throwIfPathExists = false)
  }

1、KafkaZkClient

KafkaZkClient是在Kafka.zookeeper.ZooKeeperClient之上提供更高级别的Kafka特定操作。

实现说明:此类包括各种组件(Controller, Configs, Old Consumer等)的方法,在某些情况下会从调用包中返回类的实例。

  def createZkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = {
    //...
    KafkaZkClient(...)
  }

2、AdminZkClient

它提供与ZooKeeper交互的管理员相关方法。

class AdminZkClient(...){

  //创建topic
  def createTopic(...){...}

  //获取broker元数据
  def getBrokerMetadatas(...){...}

  //创建主题并可选地验证其参数。请注意,TopicCommand也使用此方法。
  def createTopicWithAssignment(...){...}

  //验证主题创建参数
  def validateTopicCreate(...){...}

  //删除topic 
  //为给定主题创建删除路径
  def deleteTopic(...){...}

  //使用可选的副本分配向现有主题添加分区。请注意,TopicCommand使用此方法。
  def addPartitions(...){...}

  //将broker从实体名称解析为整数id
  def parseBroker(...){...}

  //.....
}

3、ZkConfigRepository

zookeeper的配置仓库,也就是kafka在zookeeper中配置信息。

class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {
  override def config(configResource: ConfigResource): Properties = {
    //....
    //从zookeeper的目录下读取数据,并封装成实体(topic、broker、client-id、user、user/clients/client-id、ip)
    adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)
  }
}

4、ZkData

object ZkData {

  //这些是kafka broker 启动时应该存在的持久ZK路径。
  val PersistentZkPaths: Seq[String] = Seq(
    ConsumerPathZNode.path, // old consumer path
    BrokerIdsZNode.path,
    TopicsZNode.path,
    ConfigEntityChangeNotificationZNode.path,
    DeleteTopicsZNode.path,
    BrokerSequenceIdZNode.path,
    IsrChangeNotificationZNode.path,
    ProducerIdBlockZNode.path,
    LogDirEventNotificationZNode.path
  ) ++ ConfigType.ALL.asScala.map(ConfigEntityTypeZNode.path)
}

//旧的consumer在zk上的路径
object ConsumerPathZNode {
  def path = "/consumers"
}

object BrokerIdsZNode {
  def path = s"${BrokersZNode.path}/ids"
  def encode: Array[Byte] = null
}

object TopicsZNode {
  def path = s"${BrokersZNode.path}/topics"
}

object ConfigEntityChangeNotificationZNode {
  def path = s"${ConfigZNode.path}/changes"
}

object DeleteTopicsZNode {
  def path = s"${AdminZNode.path}/delete_topics"
}

object BrokerSequenceIdZNode {
  def path = s"${BrokersZNode.path}/seqid"
}

object IsrChangeNotificationZNode {
  def path = "/isr_change_notification"
}

object ProducerIdBlockZNode {
  val CurrentVersion: Long = 1L

  def path = "/latest_producer_id_block"

  def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = {
    Json.encodeAsBytes(Map("version" -> CurrentVersion,
      "broker" -> producerIdBlock.assignedBrokerId,
      "block_start" -> producerIdBlock.firstProducerId.toString,
      "block_end" -> producerIdBlock.lastProducerId.toString).asJava
    )
  }

object LogDirEventNotificationZNode {
  def path = "/log_dir_event_notification"
}

三、验证元数据属性集成是否有效

1、meta.properties文件是否始终设置了cluster.id

2、meta.properties文件是否始终设置了node.id或者 broker.id

initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)

四、动态broker初始化

动态broker配置存储在ZooKeeper中,可以在两个级别定义:

1、每个代理的配置持久化在/configs/brokers/{brokerId} 这些可以使用AdminClient使用资源名称brokerId进行描述/更改

2、整个集群的默认值持续存在于/configs/brokers/<default> 这些可以使用AdminClient使用空资源名称进行描述/更改。

broker配置的优先级顺序为:

1、DYNAMIC_BROKER_CONFIG:存储在ZK中的/configs/brokers/{brokerId}

2、DYNAMIC_DEFAULT_BROKER_CONFIG: 存储在ZK中的//configs/brokers/<default>

3、STATIC_BROKER_CONFIG:启动代理时使用的属性,通常来自server.properties文件

4、DEFAULT_CONFIG:KafkaConfig中定义的默认配置

config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt = None)

五、启动KafkaController

_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()

1、KafkaController结构

class KafkaController(...){

  //事件管理
  private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,controllerContext.stats.rateAndTimeMetrics)

  //如果brokerid = 当前的controllerid 那么就返回true
  def isActive: Boolean = activeControllerId == config.brokerId

  @volatile private var brokerInfo = initialBrokerInfo
  @volatile private var _brokerEpoch = initialBrokerEpoch

  //启动
  def startup(): Unit = {
    zkClient.registerStateChangeHandler(new StateChangeHandler {
      //ControllerHandler = "controller-state-change-handler"
      override val name: String = StateChangeHandlers.ControllerHandler
      override def afterInitializingSession(): Unit = {
        eventManager.put(RegisterBrokerAndReelect)
      }
      override def beforeInitializingSession(): Unit = {
        val queuedEvent = eventManager.clearAndPut(Expire)

        //阻止新会话的初始化,直到处理过期事件,这确保在创建新会话之前已处理所有挂起的事件
        queuedEvent.awaitProcessing()
      }
    })
    eventManager.put(Startup)
    eventManager.start()
  }


  override def process(event: ControllerEvent): Unit = {

    event match {
      //.....
      case RegisterBrokerAndReelect =>
          processRegisterBrokerAndReelect()
      case Startup =>
          processStartup()
      //.....
    }
  }
}

1、ControllerEventManager结构

class ControllerEventManager(...){

  //用串行化队列代替锁
  private val queue = new LinkedBlockingQueue[QueuedEvent]

  //ControllerEventThreadName = "controller-event-thread"
  private[controller] var thread = new ControllerEventThread(ControllerEventThreadName)

  def start(): Unit = thread.start()

  def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {
    val queuedEvent = new QueuedEvent(event, time.milliseconds())
    queue.put(queuedEvent)
    queuedEvent
  }

  class ControllerEventThread(name: String)  extends ShutdownableThread(...){

    override def doWork(): Unit = {
      //从队列获取事件,主要是controller相关的事件
      val dequeued = pollFromEventQueue()
      dequeued.event match {
        case controllerEvent =>
          def process(): Unit = dequeued.process(processor)
        }
      }
    }
  }
}

ControllerEventManager中的ControllerEventThread的父类是ShutdownableThread,它里面有真正的run()且调起了doWork(),doWork()又调起了process(),因此真正执行的是process()

public abstract class ShutdownableThread extends Thread {

    public abstract void doWork();

    public void run() {
        while (isRunning())
          doWork();
    }
}

这是一个死循环,也就是后面只要往队列中添加事件,会自动执行对应方法。从KafkaController的startup()中我们知道放了两个事件:RegisterBrokerAndReelect和Startup,下面我们来看看它们里面做了什么

2、RegisterBrokerAndReelect事件处理

  private def processRegisterBrokerAndReelect(): Unit = {
    _brokerEpoch = zkClient.registerBroker(brokerInfo)
    processReelect()
  }

1、向zookeeper注册broker

class KafkaZkClient private[zk] (...{

  def registerBroker(brokerInfo: BrokerInfo): Long = {
    //brokers/ids/brokerid
    val path = brokerInfo.path
    //创建 对应的 brokerid 的 临时znode节点,说明:当该brokers挂掉后会随之消失
    val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)
    info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: " +
      s"${brokerInfo.broker.endPoints.map(_.connectionString).mkString(",")}, czxid (broker epoch): ${stat.getCzxid}")
    //返回czxid (broker epoch)
    stat.getCzxid
  }

}

2、开始选举

class KafkaController(...){

  private def processReelect(): Unit = {
    maybeResign()
    elect()
  }

  private def maybeResign(): Unit = {
    val wasActiveBeforeChange = isActive
    //在zk上注册节点改变事件,当controller改变时触发,为下面的选举做铺垫
    zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
    activeControllerId = zkClient.getControllerId.getOrElse(-1)
    if (wasActiveBeforeChange && !isActive) {
      //当当前broker辞去controller职务时触发
      onControllerResignation()
    }
  }

  private def elect(): Unit = {
    //获取 活动状态 contoller ,如果 集群已经启动了很长时间,新增了一台broker,那么此时会获得 当下的controller ,
    //如果此时集群刚刚启动,那么此时没有 活动状态的 controller ,返回的结果就是  -1
    activeControllerId = zkClient.getControllerId.getOrElse(-1)
    /*
     * 我们可以在初始启动和handleDeleted ZK回调期间到达这里。由于潜在的竞争条件,当我们到达这里时,控制器可能已经被选中了。如果此代理已经是控制器,则此检查将防止以下createEphemeralPath方法进入无限循环。
     */
    if (activeControllerId != -1) {
      //如果当下已经有 activeControllerId 那么就停止选举 ,否则继续往下走
      //Broker $activeControllerId 已被选为控制器,因此停止选举过程
      debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
      return
    }

    //try中会发生如下情况
    //1、正常运行:当选controller
    //2、异常:
    //    1、ControllerMovedException 
    //       1、其他broker成功当选controller
    //       2、controller已经当选,但刚刚离职,需要重新选举
    //    2、Throwable  该节点当选controller,但是就职时出错了。删除该controller,重新选举
    //       
    try {
      val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
      controllerContext.epoch = epoch
      controllerContext.epochZkVersion = epochZkVersion
      activeControllerId = config.brokerId

      //${config.brokerId}已成功当选为控制器。Epoch增加到${controllerContext.eepoch},Epoch zk版本现在是${controller Context.eepoch ZkVersion}”
      info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
        s"and epoch zk version is now ${controllerContext.epochZkVersion}")
       
      //成功当选controller,并开始履行作为该角色的责任
      onControllerFailover()
    } catch {
      case e: ControllerMovedException =>
        //重新开始监听目录变化
        maybeResign()

        if (activeControllerId != -1)
          debug(s"代理$activeControllerId被选为控制器,而不是代理${config.brokerId}", e)
        else
          warn("管制员已经当选,但刚刚辞职,这将导致另一轮选举", e)
      case t: Throwable =>
        error(s"在代理${config.brokerId}上选择或成为控制器时出错。立即触发控制器移动", t)
        triggerControllerMove()
    }
  }

}

在选举前zkClient注册的 controllerChangeHandler 事件其实就是观察 controller目录的变化

class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
  //controller目录
  override val path: String = ControllerZNode.path

  override def handleCreation(): Unit = eventManager.put(ControllerChange)
  override def handleDeletion(): Unit = eventManager.put(Reelect)
  override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}

六、总结

1、设置zookeeper,如:zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafka并创建持久化目录:consumers、brokers/ids、brokers/topics、config/changes、admin/delete_topics、brokers/seqid、isr_change_notification、latest_producer_id_block、log_dir_event_notification

2、验证元数据属性集成是否有效,主要时看每个broker是否有了唯一的id

3、将每个broker的id注册到zookeeper

4、启动KafkaController

5、启动ControllerEventThread线程并不断消费LinkedBlockingQueue中事件

6、向队列注册RegisterBrokerAndReelect事件、Startup事件

7、首先处理RegisterBrokerAndReelect事件

8、向zookeeper注册broker,并建立临时znode

9、注册controllerChangeHandler 事件其实就是观察 controller目录的变化

10、每个broker开始向zookeeper将自己注册为controller

11、正常情况下只有一个broker成功注册成功,其他broker抛出ControllerMovedException继续监控controller目录的变化

12、如果选举controller成功,但是在就职时失败会里面进行卸任工作,并进行新一轮选举


原文地址:https://blog.csdn.net/lu070828/article/details/143669653

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!