深度了解flink(十) JobManager(4) ResourceManager HA
ResourceManager(ZK模式)的高可用启动流程
ResourceManager
启动流程在DefaultDispatcherResourceManagerComponentFactory#create
中
public DispatcherResourceManagerComponent create(
Configuration configuration,
ResourceID resourceId,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
DelegationTokenManager delegationTokenManager,
MetricRegistry metricRegistry,
ExecutionGraphInfoStore executionGraphInfoStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
Collection<FailureEnricher> failureEnrichers,
FatalErrorHandler fatalErrorHandler)
throws Exception {
//resourcemanager的选举服务
LeaderRetrievalService resourceManagerRetrievalService = null;
ResourceManagerService resourceManagerService = null;
try {
//resourceManager leader 获取服务
resourceManagerRetrievalService =
highAvailabilityServices.getResourceManagerLeaderRetriever();
//LeaderGatewayRetriever实现了LeaderRetrievalListener接口
final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
new ExponentialBackoffRetryStrategy(
12, Duration.ofMillis(10), Duration.ofMillis(50)));
resourceManagerService =
ResourceManagerServiceImpl.create(
resourceManagerFactory,
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
delegationTokenManager,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
//启动resourceManagerService start方法会调用具体的选举方法
resourceManagerService.start();
//启动etrievalService服务的start方法,
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
return new DispatcherResourceManagerComponent(
dispatcherRunner,
resourceManagerService,
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint,
fatalErrorHandler,
dispatcherOperationCaches);
} catch (Exception exception) {
//省略无关代码
}
}
初始化LeaderRetrievalServer
resourceManagerRetrievalService =
highAvailabilityServices.getResourceManagerLeaderRetriever();
getResourceManagerLeaderRetriever
会调用父类AbstractHaServices
的这个方法
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return createLeaderRetrievalService(getLeaderPathForResourceManager());
}
createLeaderRetrievalService
会调用具体子类的方法,zk模式下会走到ZooKeeperLeaderElectionHaServices
下
@Override
protected LeaderRetrievalService createLeaderRetrievalService(String componentId) {
// Maybe use a single service for leader retrieval
return ZooKeeperUtils.createLeaderRetrievalService(
curatorFrameworkWrapper.asCuratorFramework(),
ZooKeeperUtils.getLeaderPath(componentId),
configuration);
}
继续方法跟进
public static DefaultLeaderRetrievalService createLeaderRetrievalService(
final CuratorFramework client, final String path, final Configuration configuration) {
return new DefaultLeaderRetrievalService(
createLeaderRetrievalDriverFactory(client, path, configuration));
最终调用DefaultLeaderRetrievalService的构造方法进行初始化,入参是一个LeaderRetrievalDriverFactory
接口,zk模式下是ZooKeeperLeaderRetrievalDriverFactory
初始化LeaderRetrievalListener
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
new ExponentialBackoffRetryStrategy(
12, Duration.ofMillis(10), Duration.ofMillis(50)));
RpcGatewayRetriever
UML类图如下
LeaderGatewayRetriever#notifyNewLeaderAddress
对LeaderRetrievalListener#notifyLeaderAddress
做了实现
@Override
public void notifyNewLeaderAddress(
CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {
final CompletableFuture<T> newGatewayFuture = createGateway(newLeaderAddressFuture);
final CompletableFuture<T> oldGatewayFuture =
atomicGatewayFuture.getAndSet(newGatewayFuture);
newGatewayFuture.whenComplete(
(t, throwable) -> {
if (throwable != null) {
oldGatewayFuture.completeExceptionally(throwable);
} else {
oldGatewayFuture.complete(t);
}
});
}
创建ResourceManagerService
resourceManagerService =
ResourceManagerServiceImpl.create(
resourceManagerFactory,
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
delegationTokenManager,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
ResourceManagerService
负责管理ResourceManager的生命周期,也负责haServer的启动
创建LeaderElection
private ResourceManagerServiceImpl(
ResourceManagerFactory<?> resourceManagerFactory,
ResourceManagerProcessContext rmProcessContext)
throws Exception {
//省略其他方法
//LeaderElection进行了初始化
this.leaderElection =
rmProcessContext.getHighAvailabilityServices().getResourceManagerLeaderElection();
LeaderElection选举
resourceManagerService.start();
跳转到start
方法
public void start() throws Exception {
synchronized (lock) {
if (running) {
LOG.debug("Resource manager service has already started.");
return;
}
running = true;
}
LOG.info("Starting resource manager service.");
leaderElection.startLeaderElection(this);
}
leaderElection.startLeaderElection(this)
zk模式下走的是DefaultLeaderElection
的方法,
@Override
public void startLeaderElection(LeaderContender contender) throws Exception {
Preconditions.checkNotNull(contender);
parentService.register(componentId, contender);
}
开始对参选者进行注册
protected void register(String componentId, LeaderContender contender) throws Exception {
checkNotNull(componentId, "componentId must not be null.");
checkNotNull(contender, "Contender must not be null.");
synchronized (lock) {
if (leaderElectionDriver == null) {
createLeaderElectionDriver();
}
//省略无关代码
}
register
会判断选举的driver是否存在,如果不存在,则根据高可用的模式进行选举驱动的创建
@Override
public ZooKeeperLeaderElectionDriver create(
LeaderElectionDriver.Listener leaderElectionListener) throws Exception {
return new ZooKeeperLeaderElectionDriver(curatorFramework, leaderElectionListener);
}
ZooKeeperLeaderElectionDriver初始化
public ZooKeeperLeaderElectionDriver(
CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)
throws Exception {
//参数校验
this.curatorFramework = Preconditions.checkNotNull(curatorFramework);
this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);
//使用ZooKeeperUtils.generateLeaderLatchPath方法基于curatorFramework的命名空间生成一个ZooKeeper节点路径,该路径通常用于领导者选举的锁。
this.leaderLatchPath =
ZooKeeperUtils.generateLeaderLatchPath(curatorFramework.getNamespace());
//使用CuratorFramework和之前生成的路径创建一个LeaderLatch实例。LeaderLatch是Curator提供的一个领导者选举实现。
this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());
//使用ZooKeeperUtils.createTreeCache方法创建一个TreeCache实例,用于监听ZooKeeper中特定路径(这里是根路径"/")下的节点变化
this.treeCache =
ZooKeeperUtils.createTreeCache(
curatorFramework,
"/",
new ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());
treeCache
.getListenable()
.addListener(
(client, event) -> {
switch (event.getType()) {
case NODE_ADDED:
case NODE_UPDATED:
Preconditions.checkNotNull(
event.getData(),
"The ZooKeeper event data must not be null.");
handleChangedLeaderInformation(event.getData());
break;
case NODE_REMOVED:
Preconditions.checkNotNull(
event.getData(),
"The ZooKeeper event data must not be null.");
handleRemovedLeaderInformation(event.getData().getPath());
break;
}
});
leaderLatch.addListener(this);
curatorFramework.getConnectionStateListenable().addListener(listener);
leaderLatch.start();
treeCache.start();
}
ZooKeeperLeaderElectionDriver.handleStateChange
状态变化时候根据不同状态打印日志
private void handleStateChange(ConnectionState newState) {
switch (newState) {
case CONNECTED:
LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");
break;
case SUSPENDED:
LOG.warn("Connection to ZooKeeper suspended, waiting for reconnection.");
break;
case RECONNECTED:
LOG.info(
"Connection to ZooKeeper was reconnected. Leader election can be restarted.");
break;
case LOST:
// Maybe we have to throw an exception here to terminate the JobManager
LOG.warn(
"Connection to ZooKeeper lost. None of the contenders participates in the leader election anymore.");
break;
}
}
DefaultLeaderElectionService.notifyLeaderContenderOfLeadership
private void notifyLeaderContenderOfLeadership(String componentId, UUID sessionID) {
if (!leaderContenderRegistry.containsKey(componentId)) {
LOG.debug(
"The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",
sessionID,
leaderElectionDriver);
return;
} else if (!sessionID.equals(issuedLeaderSessionID)) {
LOG.debug(
"An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.",
sessionID,
issuedLeaderSessionID);
return;
}
Preconditions.checkState(
!confirmedLeaderInformation.hasLeaderInformation(componentId),
"The leadership should have been granted while not having the leadership acquired.");
LOG.debug(
"Granting leadership to the contender registered under component '{}' with session ID {}.",
componentId,
issuedLeaderSessionID);
leaderContenderRegistry.get(componentId).grantLeadership(issuedLeaderSessionID);
}
原文地址:https://blog.csdn.net/qq_40689430/article/details/143418429
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!