自学内容网 自学内容网

kubernetes存储架构之PV controller源码解读

kubernetes存储之PV controller源码解读

摘要

本文介绍kubernetes存储架构的基础,并重点对PV controller的源码进行了学习

引入

从一个业务场景出发,假如你所在的公司,有个物理机,上面部署了web服务器,随着业务增长,本地磁盘空间不够使用了。业务使用方想你所在的基础设施部门申请加1T的块设备存储空间。于是你给出了解决方案:

  • 方案一:给物理机添加一块新磁盘

实施步骤如下:

  1. 获取一块磁盘(买、借都可以)

  2. 将磁盘插入服务器插槽

  3. 物理机OS层面识别磁盘,并格式化后再mount到目录

  • 方案二: 假如已经一个ceph存储集群了,也可以从ceph存储划一个卷,映射给物理机使用

实施步骤如下:

  1. rbd create 创建一个卷
  2. rbd map将卷映射给物理机
  3. 物理机OS层面识别磁盘,并格式化后再mount到目录

再进一步设想,假如业务使用的容器POD,而不是物理机呢?

方案一:我们把pod当成物理机,与物理机一样,在pod内部安装ceph rbd客户端,再把卷rbd map给pod使用,最后进入pod 把卷mount到用户需要的目录。一切都手动完成,效率低、也是非主流方式。

方案二:使用kubernetes+ceph csi,实现为pod添加持久化存储的自动化

kubernetes存储架构

存储基础

在kubernetes环境中,通过PVC与PV对存储卷进行抽象。

PV与PVC

  • PersistentVolumeClaim (简称PVC): 是用户存储的请求。它和Pod类似。Pod消耗Node资源,PVC消耗PV资源。Pod可以请求特定级别的资源(CPU和MEM)。PVC可以请求特定大小和访问模式的PV。

  • PersistentVolume (简称PV): 由管理员设置的存储,它是集群的一部分。就像节点(Node)是集群中的资源一样,PV也是集群中的资源。它包含存储类型,存储大小和访问模式。它的生命周期独立于Pod,例如当使用它的Pod销毁时对PV没有影响。

PV提供方式

  • 静态PV:集群管理员创建许多PV,它们包含可供集群用户使用的实际存储的详细信息。
  • 动态PV:当管理员创建的静态PV都不匹配用户创建的PersistentVolumeClaim时,集群会为PVC动态的配置卷。此配置基于StorageClasses:PVC必须请求存储类(storageclasses),并且管理员必须已创建并配置该类,以便进行动态创建。

PV 的访问模式

  • ReadWriteOnce - 卷以读写方式挂载到单个节点
  • ReadOnlyMany - 卷以只读方式挂载到多个节点
  • ReadWriteMany - 卷以读写方式挂载到多个节点

PVC回收策略

  • Retain - 手动回收。在删除pvc后PV变为Released不可用状态, 若想重新被使用,需要管理员删除pv,重新创建pv,删除pv并不会删除存储的资源,只是删除pv对象而已;若想保留数据,请使用该Retain。
  • Recycle - 基本擦洗(rm -rf /thevolume/)。 删除pvc自动清除PV中的数据,效果相当于执行 rm -rf /thevolume/。删除pvc时,pv的状态由Bound变为Available。此时可重新被pvc申请绑定。
  • Delete - 删除存储上的对应存储资源。关联的存储资产(如AWS EBS,GCE PD,Azure磁盘或OpenStack Cinder卷)将被删除。NFS不支持delete策略。

PV的状态

  • Available(可用状态) - 一块空闲资源还没有被任何声明绑定
  • Bound(绑定状态) - 声明分配到PVC进行绑定,PV进入绑定状态
  • Released(释放状态) - PVC被删除,PV进入释放状态,等待回收处理
  • Failed(失败状态) - PV执行自动清理回收策略失败

图片来自于网络
(图片来自于网络)

PVC的状态

  • Pending(等待状态) - 等待绑定PV
  • Bound(绑定状态) - PV已绑定PVC
  • Lost(绑定丢失) - 再次绑定PV后进入Bound状态

在这里插入图片描述
(图片来自于网络)

PVC yaml 字段描述

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  annotations:
    # 这个注解用于标记 PV 是否已经成功绑定到一个 PVC。当 PV 和 PVC 成功绑定时,Kubernetes 会在 PV 上添加这个注解
    # Kubernetes pv controller 控制器会检查这个注解来确定绑定过程的状态,从而进行相应的管理操作。
    pv.kubernetes.io/bind-completed: "yes"
    # 这个注解用于标记 PV 是否是由 Kubernetes 的存储控制器自动绑定到 PVC 的。如果 PV 是通过 StorageClass 动态创建的,或者是由控制器自动绑定的,这个注解会被设置为 "yes"。
    pv.kubernetes.io/bound-by-controller: "yes"
    # 这个注解用于指定创建 PV 的存储预配置器(storageClass)的名称。在动态存储管理中,存储预配置器会根据 PVC 的请求自动创建和配置 PV,而不需要管理员手动创建 PV
    volume.beta.kubernetes.io/storage-provisioner: rbd.csi.obs.com
  creationTimestamp: 2024-08-05T06:29:03Z
  finalizers:
  - kubernetes.io/pvc-protection
  # PVC 的名称
  name: basepvc-dg03dev8jxdh
  # PVC所在的命名空间
  namespace: public
  resourceVersion: "11508437467"
  selfLink: /api/v1/namespaces/public/persistentvolumeclaims/basepvc-dg03dev80407964jxdh-swap
  # 用于唯一表示PVC,同时用于管理 PVC 与 PersistentVolume (PV) 之间的绑定关系。
  uid: 020270a5-52f4-11ef-b4be-e8ebd398881c
spec:
  # PVC 的访问模式
  accessModes:
  - ReadWriteOnce
  # 如果是克隆卷,这里用于标识由哪个快照创建而来的
  dataSource: null
  resources:
    requests:
      # 请求卷的空间大小
      storage: 51Gi
  storageClassName: csi-virt-rbd
  # block 或 filesystem,是否需要文件系统格式化
  volumeMode: Filesystem
  # 绑定PV的名称,规范是"pvc-<pvc的uid>"
  volumeName: pvc-020270a5-52f4-11ef-b4be-e8ebd398881c
status:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 51Gi
  # PVC当前的状态  
  phase: Bound

PV yaml 字段描述

---
apiVersion: v1
kind: PersistentVolume
metadata:
  annotations:
    # 这个注解用于指定创建 PV 的存储预配置器的名称。
    pv.kubernetes.io/provisioned-by: rbd.csi.obs.com
  creationTimestamp: 2024-08-05T06:29:03Z
  finalizers:
  - kubernetes.io/pv-protection
  labels:
    baymax.io/rbd-cluster-name: ceph10
  name: pvc-020270a5-52f4-11ef-b4be-e8ebd398881c
  resourceVersion: "10132283575"
  selfLink: /api/v1/persistentvolumes/pvc-020270a5-52f4-11ef-b4be-e8ebd398881c
  uid: 0220ec79-52f4-11ef-b4be-e8ebd398881c
spec:
  accessModes:
  - ReadWriteOnce
  capacity:
    storage: 51Gi
  # claimRef 用于记录 PV 绑定到的 PVC 的信息,包括 PVC 的名称、命名空间和 UID。
  claimRef:
    apiVersion: v1
    kind: PersistentVolumeClaim
    # pvc的名称
    name: basepvc-dg03dev80407964jxdh-swap
    namespace: public
    resourceVersion: "10132283544"
    # pvc的uid
    uid: 020270a5-52f4-11ef-b4be-e8ebd398881c
  # PV 对应的 volume卷由csi进行生命周期管理  
  csi:
  # 存储驱动的名称
    driver: rbd.csi.obs.com
    # 卷格式化时指定的文件系统类型
    fsType: ext4
    volumeAttributes:
      adminKeyring: AQBdZFtkGyvfxxxxxxxxxxxxxxxxxxxxxxx (密码隐藏)
      baymax.io/rbd-pvc-name: basepvc-dg03dev80407964jxdh-swap
      baymax.io/rbd-pvc-namespace: public
      clustername: ceph10
      monitors: 10.x.x.x:6789,10.x.x.x:6789,10.x.x.x:6789 (IP隐藏)
      storage.kubernetes.io/csiProvisionerIdentity: 1678171935067-8081-rbd.csi.obs.com
      userKeyring: AQBKqlxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx (密码隐藏)
    # volumeHandle 是由 CSI 存储驱动程序生成的唯一标识符,用于标识唯一标识一个特定的存储卷。  
    # 在 PV 和 PVC 成功绑定后,volumeHandle 确保 Kubernetes 能够正确地将 PV 与 PVC 关联起来。
    volumeHandle: csi-rbd-pvc-020270a5-52f4-11ef-b4be-e8ebd398881c
  # 回收策略  
  persistentVolumeReclaimPolicy: Delete
  # storageClass的名称
  storageClassName: csi-virt-rbd
  volumeMode: Filesystem
status:
  phase: Bound

** storageClass yaml 字段描述**

# kubectl get storageclasses.storage.k8s.io  ceph-nbd
NAME       PROVISIONER            AGE
ceph-nbd   ceph-nbd.csi.obs.com   53d
# kubectl get storageclasses.storage.k8s.io  ceph-nbd -oyaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"storage.k8s.io/v1","kind":"StorageClass","metadata":{"annotations":{},"name":"ceph-nbd"},"provisioner":"ceph-nbd.csi.obs.com","reclaimPolicy":"Delete"}
  creationTimestamp: 2024-10-25T02:03:28Z
  name: ceph-nbd
  resourceVersion: "10961600994"
  selfLink: /apis/storage.k8s.io/v1/storageclasses/ceph-nbd
  uid: 53c697ad-9275-11ef-88f0-e8ebd3986310
provisioner: ceph-nbd.csi.obs.com
# volume回收模式
reclaimPolicy: Delete
# 绑定策略,立即还是延迟
volumeBindingMode: Immediate

kubernetes存储结构

在kubernetes中,与存储相关的组件如下:

在这里插入图片描述
(图片来自于网络)

  • PV Controller: 负责 PV/PVC 的绑定、生命周期管理,并根据需求进行数据卷的 Provision/Delete 操作;

  • AD Controller:负责存储设备的 Attach/Detach 操作,将设备挂载到目标节点;

  • Volume Manager:管理卷的Mount/Unmount操作、卷设备的格式化以及挂载到一些公用目录上的操作;

  • Volume Plugins:它主要是对上面所有挂载功能的实现;

PV Controller、AD Controller、Volume Manager 主要是进行操作的调用,而具体操作则是由 Volume Plugins 实现的。

接下来,本文将对PV controller的源码进行分析,深入学习PV controller是如何实现的。

PV controller 的启动

如果对 kube-controller-manager 组件有一定了解,kube-controller-manager 包括了多种controller,包括node contrller、deployment controller、service controller、Daemonset controller、PV controller和AD controller等。

在kube-controller-manager进程启动时,会依次启动这些controller.

pv controller 启动流程如下:

main()—> NewHyperKubeCommand()—>kubecontrollermanager.NewControllerManagerCommand()—>Run() —> StartControllers() —> NewControllerInitializers() —> startPersistentVolumeBinderController(ctx) —>NewController()—> go volumeController.Run(ctx.Stop)

kube-controller-manager 在启动Run()时,会调用startPersistentVolumeBinderController(ctx)函数,在函数内部会调用函数NewController(),先实例化一个PV controller,再通过协程启动PV controller: go volumeController.Run(ctx.Stop)

源码路径: k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go

备注: 本文代码是v1.12版本

NewController

PV controller 结构体包括的如下字段,重要的有:各种lister,pv controller通过lister 可以从本地cache中查询资源的信息和状态;包括2个本地cache缓存空间,用来分别保存k8s已有的volume和claims的信息与状态。另外定义了2个消息队列,当有新的claim和volume 对应的创建、删除、更新等事件时,就将对应事件放到队列,等待PV controller进一步处理。

在这里插入图片描述

startPersistentVolumeBinderController(ctx) 源码中使用这个函数实例化一个PV controller并通过协程方式启动。

// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go


func startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) {
params := persistentvolumecontroller.ControllerParameters{
KubeClient:                ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),
SyncPeriod:                ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,
VolumePlugins:             ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),
Cloud:                     ctx.Cloud,
ClusterName:               ctx.ComponentConfig.KubeCloudShared.ClusterName,
VolumeInformer:            ctx.InformerFactory.Core().V1().PersistentVolumes(),
ClaimInformer:             ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
ClassInformer:             ctx.InformerFactory.Storage().V1().StorageClasses(),
PodInformer:               ctx.InformerFactory.Core().V1().Pods(),
NodeInformer:              ctx.InformerFactory.Core().V1().Nodes(),
EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,
}
  // 实例化一个pv controller
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
if volumeControllerErr != nil {
return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)
}
  // 协程方式启动 PV controller
go volumeController.Run(ctx.Stop)
return nil, true, nil
}

NewController() 创建一个新的pv controller

// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go


// NewController creates a new PersistentVolume controller
func NewController(p ControllerParameters) (*PersistentVolumeController, error) {
  // 初始化一个 事件记录器
eventRecorder := p.EventRecorder
if eventRecorder == nil {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: p.KubeClient.CoreV1().Events("")})
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})
}

  // 实例化一个PV controller
controller := &PersistentVolumeController{
volumes:           newPersistentVolumeOrderedIndex(),
claims:            cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
kubeClient:        p.KubeClient,
eventRecorder:     eventRecorder,
runningOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
cloud:             p.Cloud,
enableDynamicProvisioning:     p.EnableDynamicProvisioning,
clusterName:                   p.ClusterName,
createProvisionedPVRetryCount: createProvisionedPVRetryCount,
createProvisionedPVInterval:   createProvisionedPVInterval,
claimQueue:                    workqueue.NewNamed("claims"),
volumeQueue:                   workqueue.NewNamed("volumes"),
resyncPeriod:                  p.SyncPeriod,
}

// Prober is nil because PV is not aware of Flexvolume.
if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, nil /* prober */, controller); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)
}

  /// 定义volume事件处理函数,当有新的volume事件时会放到消息队列 controller.volumeQueue 中去
p.VolumeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc:    func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
},
)
controller.volumeLister = p.VolumeInformer.Lister()
controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced

  // 定义claim事件处理函数,当有新的volume事件时会放到消息队列 controller.volumeQueue 中去
p.ClaimInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc:    func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
},
)
controller.claimLister = p.ClaimInformer.Lister()
controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced

controller.classLister = p.ClassInformer.Lister()
controller.classListerSynced = p.ClassInformer.Informer().HasSynced
controller.podLister = p.PodInformer.Lister()
controller.podListerSynced = p.PodInformer.Informer().HasSynced
controller.NodeLister = p.NodeInformer.Lister()
controller.NodeListerSynced = p.NodeInformer.Informer().HasSynced
return controller, nil
}

ctrl.Run()

Run() 是pv controller启动入口, run() 的作用是启动了三个工作协程:

ctrl.resync: 定期重新同步控制器的状态,确保控制器的状态与集群中的实际状态一致,具体的实现是:定时循环查询出pv和pvc列表,然后放入到队列volumeQueue和claimQueue中,让volumeWorker和claimWorker进行消费。

ctrl.volumeWorker:通过消费队列volumeQueue,来处理与 PersistentVolume (PV) 相关的操作,包括创建、绑定、释放和删除 PV

ctrl.claimWorker:通过消费队列claimQueue,来处理与 PersistentVolumeClaim (PVC) 相关的操作,包括创建、绑定和删除 PVC。
任务:

// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go

func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ctrl.claimQueue.ShutDown()
defer ctrl.volumeQueue.ShutDown()

klog.Infof("Starting persistent volume controller")
defer klog.Infof("Shutting down persistent volume controller")

if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {
return
}

ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)

  // 核心代码! 启动三个协程
go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)
go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
go wait.Until(ctrl.claimWorker, time.Second, stopCh)

metrics.Register(ctrl.volumes.store, ctrl.claims)

<-stopCh
}

简单图示

在这里插入图片描述

ctrl.resync

作用是定时从cache中获取 pvc列表数据并放入到消费队列 ctrl.claimQueue,之后ctrl.claimWorker 循环任务来消费处理内部数据;同时定时从cache中获取 PV列表数据,并放入消费队列ctrl.volumeQueue,之后ctrl.volumeWorker 循环任务来消费处理内部数据

//kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go

func (ctrl *PersistentVolumeController) resync() {
klog.V(4).Infof("resyncing PV controller")

  // 遍历pvc ,将pvc 放入 ctrl.claimQueue,等待处理
pvcs, err := ctrl.claimLister.List(labels.NewSelector())
if err != nil {
klog.Warningf("cannot list claims: %s", err)
return
}
  // 将 pvc 放入消费队列 ctrl.claimQueue
for _, pvc := range pvcs {
ctrl.enqueueWork(ctrl.claimQueue, pvc)
}

  // 遍历pv ,将 pv 放入 ctrl.volumeQueue,等待处理
pvs, err := ctrl.volumeLister.List(labels.NewSelector())
if err != nil {
klog.Warningf("cannot list persistent volumes: %s", err)
return
}
  // 将 pv 放入消费队列 ctrl.volumeQueue
for _, pv := range pvs {
ctrl.enqueueWork(ctrl.volumeQueue, pv)
}
}

简单图示:

在这里插入图片描述

这是一个典型的”生产者与消费者“模型,ctrl.resync负责将数据放入2个队列,之后ctrl.volumeWorker与ctrl.claimWorker消费队列里面的数据。

ctrl.volumeWorker

volumeWorker会不断循环消费volumeQueue队列里面的数据,然后获取到相应的PV执行updateVolume操作。

volumeWorker()函数的代码比较长,我们展开分析一下。


// volumeWorker processes items from volumeQueue. It must run only once,
// syncVolume is not assured to be reentrant.
func (ctrl *PersistentVolumeController) volumeWorker() {
workFunc := func() bool {
    // 从消费队列 ctrl.volumeQueue 取一个 PV(或称为volume)
keyObj, quit := ctrl.volumeQueue.Get()
if quit {
return true
}
defer ctrl.volumeQueue.Done(keyObj)
key := keyObj.(string)
glog.V(5).Infof("volumeWorker[%s]", key)

_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
return false
}
    // 通过 volume name 查询本地cache,从cache中获取到 volume 对象
volume, err := ctrl.volumeLister.Get(name)
if err == nil {
// The volume still exists in informer cache, the event must have
// been add/update/sync
      // 如果 volume 存在,则用updateVolume() 函数处理这个volume
ctrl.updateVolume(volume)
return false
}
if !errors.IsNotFound(err) {
glog.V(2).Infof("error getting volume %q from informer: %v", key, err)
return false
}

// The volume is not in informer cache, the event must have been
// "delete"
volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
if err != nil {
glog.V(2).Infof("error getting volume %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the volume from its cache
glog.V(2).Infof("deletion of volume %q was already processed", key)
return false
}
volume, ok := volumeObj.(*v1.PersistentVolume)
if !ok {
glog.Errorf("expected volume, got %+v", volumeObj)
return false
}
    // 如果volume资源不存在,说明需要删除volume,则调用方法ctrl.deleteVolume(volume),让底层删除卷 
ctrl.deleteVolume(volume)
return false
}
  // 不断执行 workFunc() 函数
for {
if quit := workFunc(); quit {
glog.Infof("volume worker queue shutting down")
return
}
}
}

在这里插入图片描述

updateVolume() 方法,新了一个check后调用ctrl.syncVolume(volume)进一步处理。

// updateVolume runs in worker thread and handles "volume added",
// "volume updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) {
// Store the new volume version in the cache and do not process it if this
// is an old version.
new, err := ctrl.storeVolumeUpdate(volume)
if err != nil {
glog.Errorf("%v", err)
}
if !new {
return
}

  // 根据当前 PV 对象的规格对 PV 和 PVC 进行绑定或者解绑
err = ctrl.syncVolume(volume)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
glog.V(3).Infof("could not sync volume %q: %+v", volume.Name, err)
} else {
glog.Errorf("could not sync volume %q: %+v", volume.Name, err)
}
}
}

ctrl.syncVolume(volume)

是一个重要的函数,需要再仔细解读

说明: 代码中的 “volume” 等价于 PV, “claim” 等价于 “PVC”

syncVolume方法为核心方法,主要调谐更新pv的状态:
(1)如果spec.claimRef未设置,则是未使用过的pv,则调用updateVolumePhase函数更新状态设置 phase 为 available;
(2)如果spec.claimRef不为空,则该pv已经与pvc bound过了,此时若对应的pvc不存在,则更新pv状态为released;
(3)如果pv对应的pvc被删除了,调用ctrl.reclaimVolume根据pv的回收策略进行相应操作,如果是retain,则不做操作,如果是delete,则调用volume plugin来删除底层存储,并删除pv对象(当volume plugin为csi时,将走out-tree逻辑,pv controller不做删除存储与pv对象的操作,由external provisioner组件来完成该操作)。


// syncVolume is the main controller method to decide what to do with a volume.
// It's invoked by appropriate cache.Controller callbacks when a volume is
// created, updated or periodically synced. We do not differentiate between
// these events.
func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) error {
glog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))

// [Unit test set 4]
  // 如果 volume.Spec.ClaimRef 为 nil,说明 volume 是未使用的,那么就调用ctrl.updateVolumePhase()函数
  // 将 volume 的状态更新为 Available
if volume.Spec.ClaimRef == nil {
// Volume is unused
glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)
if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
return nil
} else /* pv.Spec.ClaimRef != nil */ {
// Volume is bound to a claim.
    // 如果  volume.Spec.ClaimRef.UID 为 ”“,说明 volume 是未使用的,那么就调用ctrl.updateVolumePhase()函数
  // 将 volume 的状态更新为 Available
if volume.Spec.ClaimRef.UID == "" {
// The PV is reserved for a PVC; that PVC has not yet been
// bound to this PV; the PVC sync will handle it.
glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
return nil
}
glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
// Get the PVC by _name_
    // 通过 volume.Spec.ClaimRef 从本地 cache 中查询到对应的 PVC 对象
var claim *v1.PersistentVolumeClaim
claimName := claimrefToClaimKey(volume.Spec.ClaimRef)
obj, found, err := ctrl.claims.GetByKey(claimName)
if err != nil {
return err
}
    // 如果没有在本地 cache 中查找到 PVC,同时 PV 的 annotation 
    // 又是包括"pv.kubernetes.io/bound-by-controller"的
    // 需要再去 apiserver 做double-check
if !found && metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {
// If PV is bound by external PV binder (e.g. kube-scheduler), it's
// possible on heavy load that corresponding PVC is not synced to
// controller local cache yet. So we need to double-check PVC in
//   1) informer cache
//   2) apiserver if not found in informer cache
// to make sure we will not reclaim a PV wrongly.
// Note that only non-released and non-failed volumes will be
// updated to Released state when PVC does not eixst.
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
        // 从 cache 中查找 pvc
obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)
if err != nil && !apierrs.IsNotFound(err) {
return err
}
found = !apierrs.IsNotFound(err)
if !found {
          // 如果 cache 中不存在则再去 apiserver 查询是否存在
obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil && !apierrs.IsNotFound(err) {
return err
}
found = !apierrs.IsNotFound(err)
}
}
}
    // 如果都没找到 PVC,则抛出错误
if !found {
glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
// Fall through with claim = nil
      // 如果 PVC 找到了, 将 cache 中的 PVC 转换为 PVC 对象
} else {
var ok bool
claim, ok = obj.(*v1.PersistentVolumeClaim)
if !ok {
return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
}
glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))
}
    // 如果 PVC 的 UID 与 PV 中的 UID 不同,说明绑定错误了
if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {
// The claim that the PV was pointing to was deleted, and another
// with the same name created.
glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has different UID, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))
// Treat the volume as bound to a missing claim.
claim = nil
}

    // 如果 PVC 不存在,而且 PV 的状态是 ”Released“ 或 "Failed", 则将 apiserver 和 本地 cache中的PV 状态都更新为 "Released"
if claim == nil {
// If we get into this block, the claim must have been deleted;
// NOTE: reclaimVolume may either release the PV back into the pool or
// recycle it or do nothing (retain)

// Do not overwrite previous Failed state - let the user see that
// something went wrong, while we still re-try to reclaim the
// volume.
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
// Also, log this only once:
glog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)
if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
// Nothing was saved; we will fall back into the same condition
// in the next call to this method
return err
}
}

if err = ctrl.reclaimVolume(volume); err != nil {
// Release failed, we will fall back into the same condition
// in the next call to this method
return err
}
return nil
      // 如果 claim 不为 nil 而且 claim.Spec.VolumeName == "",则再检查 volumeMode 是否匹配
      // 如果不匹配,则写 evenRecorder 后返回
} else if claim.Spec.VolumeName == "" {
if isMisMatch, err := checkVolumeModeMisMatches(&claim.Spec, &volume.Spec); err != nil || isMisMatch {
// Binding for the volume won't be called in syncUnboundClaim,
// because findBestMatchForClaim won't return the volume due to volumeMode mismatch.
volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)
ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)
claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)
// Skipping syncClaim
return nil
}
      // 如果 volumeMode 是匹配的,则继续往下走: 判断 PV 是否有 annotation: "pv.kubernetes.io/bound-by-controller" 
      // 如果有这个annotation, 后续会交给 PVC sync 来处理
      // 如果没有这个annotation, 等待用户自行修复
if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {
// The binding is not completed; let PVC sync handle it
glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)
} else {
// Dangling PV; try to re-establish the link in the PVC sync
glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)
}
// In both cases, the volume is Bound and the claim is Pending.
// Next syncClaim will fix it. To speed it up, we enqueue the claim
// into the controller, which results in syncClaim to be called
// shortly (and in the right worker goroutine).
// This speeds up binding of provisioned volumes - provisioner saves
// only the new PV and it expects that next syncClaim will bind the
// claim to it.
      // 把 PVC 加入 claimQueue 队列,等待 syncClaim() 任务来处理
ctrl.claimQueue.Add(claimToClaimKey(claim))
return nil
      // 如果 claim 不为空,而且 claim.Spec.VolumeName == volume.Name,说明 binding 是正确的。
      // 这是将volume 的状态更新为 Bound
} else if claim.Spec.VolumeName == volume.Name {
// Volume is bound to a claim properly, update status if necessary
glog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)
if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {
// Nothing was saved; we will fall back into the same
// condition in the next call to this method
return err
}
return nil
} else {
// Volume is bound to a claim, but the claim is bound elsewhere
      
      // 如果PV 的 annotation 包括 "pv.kubernetes.io/provisioned-by" 并且回收策略 volume.Spec.PersistentVolumeReclaimPolicy 是 Delete时,
      // 再判断 PV 的phase 如果不是 Released 和 Failed,则更新 PV 的Phase为 Released
if metav1.HasAnnotation(volume.ObjectMeta, annDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {
// This volume was dynamically provisioned for this claim. The
// claim got bound elsewhere, and thus this volume is not
// needed. Delete it.
// Mark the volume as Released for external deleters and to let
// the user know. Don't overwrite existing Failed status!
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {
// Also, log this only once:
glog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)
          // 更新 PV 的 phase
if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {
// Nothing was saved; we will fall back into the same condition
// in the next call to this method
return err
}
}
        // 之后再 调用 ctrl.reclaimVolume(volume) 对PV 进行回收处理
if err = ctrl.reclaimVolume(volume); err != nil {
// Deletion failed, we will fall back into the same condition
// in the next call to this method
return err
}
return nil
        // 执行 unbindVolume()操作
} else {
// Volume is bound to a claim, but the claim is bound elsewhere
// and it's not dynamically provisioned.
if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {
// This is part of the normal operation of the controller; the
// controller tried to use this volume for a claim but the claim
// was fulfilled by another volume. We did this; fix it.
glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)
if err = ctrl.unbindVolume(volume); err != nil {
return err
}
return nil
} else {
// The PV must have been created with this ptr; leave it alone.
glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)
// This just updates the volume phase and clears
// volume.Spec.ClaimRef.UID. It leaves the volume pre-bound
// to the claim.
if err = ctrl.unbindVolume(volume); err != nil {
return err
}
return nil
}
}
}
}
}

由于代码较长,梳理出主要逻辑的流程图大致如下:

在这里插入图片描述

updateVolumePhase()

  1. 更新etcd中 volume 的phase状态
  2. 更新本地informer cache中的phase状态
// updateVolumePhase saves new volume phase to API server.
func (ctrl *PersistentVolumeController) updateVolumePhase(volume *v1.PersistentVolume, phase v1.PersistentVolumePhase, message string) (*v1.PersistentVolume, error) {
glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s", volume.Name, phase)
  // 如果 Phase 已经满足,就什么也不做
if volume.Status.Phase == phase {
// Nothing to do.
glog.V(4).Infof("updating PersistentVolume[%s]: phase %s already set", volume.Name, phase)
return volume, nil
}

   // 核心代码!
volumeClone := volume.DeepCopy()
volumeClone.Status.Phase = phase
volumeClone.Status.Message = message

  // 核心代码!请求 api server 对 volume 的状态进行修改
newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().UpdateStatus(volumeClone)
if err != nil {
glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err)
return newVol, err
}
  // 之后再修改本地 informer 中 cache 中的数据
_, err = ctrl.storeVolumeUpdate(newVol)
if err != nil {
glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
return newVol, err
}
glog.V(2).Infof("volume %q entered phase %q", volume.Name, phase)
return newVol, err
}

unbindVolume(volume)

作用是将 一个 Bound 状态的 volume 执行 unbound 操作

  1. 将本地cache中的 volume 进行unbond操作:1) volumeClone.Spec.ClaimRef = nil 2) volumeClone.Annotations 中删除 “pv.kubernetes.io/bound-by-controller” 字段; 3) volume phase 修改为 “Available”
  2. 发起http请求,对apiserver中的 volume 进行unbound操作(同上)。
// unbindVolume rolls back previous binding of the volume. This may be necessary
// when two controllers bound two volumes to single claim - when we detect this,
// only one binding succeeds and the second one must be rolled back.
// This method updates both Spec and Status.
// It returns on first error, it's up to the caller to implement some retry
// mechanism.
func (ctrl *PersistentVolumeController) unbindVolume(volume *v1.PersistentVolume) error {
glog.V(4).Infof("updating PersistentVolume[%s]: rolling back binding from %q", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))

// Save the PV only when any modification is necessary.
volumeClone := volume.DeepCopy()

if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {
// The volume was bound by the controller.
    // 将volume中的 ClaimRef 置为 nil
volumeClone.Spec.ClaimRef = nil
    
    // 从删除volume的annotation对应字段
delete(volumeClone.Annotations, annBoundByController)
if len(volumeClone.Annotations) == 0 {
// No annotations look better than empty annotation map (and it's easier
// to test).
volumeClone.Annotations = nil
}
} else {
// The volume was pre-bound by user. Clear only the binging UID.
    // 如果是用户手动创建的volume 则把 UID 设置为""
volumeClone.Spec.ClaimRef.UID = ""
}

newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Update(volumeClone)
if err != nil {
glog.V(4).Infof("updating PersistentVolume[%s]: rollback failed: %v", volume.Name, err)
return err
}
_, err = ctrl.storeVolumeUpdate(newVol)
if err != nil {
glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
return err
}
glog.V(4).Infof("updating PersistentVolume[%s]: rolled back", newVol.Name)

// Update the status
_, err = ctrl.updateVolumePhase(newVol, v1.VolumeAvailable, "")
return err
}

ctrl.reclaimVolume

// reclaimVolume implements volume.Spec.PersistentVolumeReclaimPolicy and
// starts appropriate reclaim action.
func (ctrl *PersistentVolumeController) reclaimVolume(volume *v1.PersistentVolume) error {
switch volume.Spec.PersistentVolumeReclaimPolicy {
    
  // 如果回收策略是 Retain ,则什么也不做
case v1.PersistentVolumeReclaimRetain:
glog.V(4).Infof("reclaimVolume[%s]: policy is Retain, nothing to do", volume.Name)
    
  // 如果回收策略是 recycle, 调用plugin 执行 volume 的清理操作(清理后,该PV后续可以被重复使用)
case v1.PersistentVolumeReclaimRecycle:
glog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name)
opName := fmt.Sprintf("recycle-%s[%s]", volume.Name, string(volume.UID))
ctrl.scheduleOperation(opName, func() error {
ctrl.recycleVolumeOperation(volume)
return nil
})
  // 如果回收策略是 Delete,调用plugin 执行 volume 的 delete操作
case v1.PersistentVolumeReclaimDelete:
glog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name)
opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID))
startTime := time.Now()
    // 调用plugin 执行 volume 的 delete操作
ctrl.scheduleOperation(opName, func() error {
pluginName, err := ctrl.deleteVolumeOperation(volume)
timeTaken := time.Since(startTime).Seconds()
metrics.RecordVolumeOperationMetric(pluginName, "delete", timeTaken, err)
return err
})

default:
// Unknown PersistentVolumeReclaimPolicy
if _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeUnknownReclaimPolicy", "Volume has unrecognized PersistentVolumeReclaimPolicy"); err != nil {
return err
}
}
return nil
}


ctrl.deleteVolumeOperation()


// deleteVolumeOperation deletes a volume. This method is running in standalone
// goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) deleteVolumeOperation(volume *v1.PersistentVolume) (string, error) {
glog.V(4).Infof("deleteVolumeOperation [%s] started", volume.Name)

// This method may have been waiting for a volume lock for some time.
// Previous deleteVolumeOperation might just have saved an updated version, so
// read current volume state now.
newVolume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})
if err != nil {
glog.V(3).Infof("error reading persistent volume %q: %v", volume.Name, err)
return "", nil
}
needsReclaim, err := ctrl.isVolumeReleased(newVolume)
if err != nil {
glog.V(3).Infof("error reading claim for volume %q: %v", volume.Name, err)
return "", nil
}
if !needsReclaim {
glog.V(3).Infof("volume %q no longer needs deletion, skipping", volume.Name)
return "", nil
}
  // 核心! 执行 delete volume 操作
pluginName, deleted, err := ctrl.doDeleteVolume(volume)
if err != nil {
// Delete failed, update the volume and emit an event.
glog.V(3).Infof("deletion of volume %q failed: %v", volume.Name, err)
if vol.IsDeletedVolumeInUse(err) {
// The plugin needs more time, don't mark the volume as Failed
// and send Normal event only
ctrl.eventRecorder.Event(volume, v1.EventTypeNormal, events.VolumeDelete, err.Error())
} else {
// The plugin failed, mark the volume as Failed and send Warning
// event
if _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, events.VolumeFailedDelete, err.Error()); err != nil {
glog.V(4).Infof("deleteVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)
// Save failed, retry on the next deletion attempt
return pluginName, err
}
}

// Despite the volume being Failed, the controller will retry deleting
// the volume in every syncVolume() call.
return pluginName, err
}
if !deleted {
// The volume waits for deletion by an external plugin. Do nothing.
return pluginName, nil
}

glog.V(4).Infof("deleteVolumeOperation [%s]: success", volume.Name)
// Delete the volume
  // 当底层 volume 被删除后,则将apiserver中的 volume 
if err = ctrl.kubeClient.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil {
// Oops, could not delete the volume and therefore the controller will
// try to delete the volume again on next update. We _could_ maintain a
// cache of "recently deleted volumes" and avoid unnecessary deletion,
// this is left out as future optimization.
glog.V(3).Infof("failed to delete volume %q from database: %v", volume.Name, err)
return pluginName, nil
}
return pluginName, nil
}




// doDeleteVolume finds appropriate delete plugin and deletes given volume, returning
// the volume plugin name. Also, it returns 'true', when the volume was deleted and
// 'false' when the volume cannot be deleted because of the deleter is external. No
// error should be reported in this case.
func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolume) (string, bool, error) {
glog.V(4).Infof("doDeleteVolume [%s]", volume.Name)
var err error
  // 查到插件 volume plugin
plugin, err := ctrl.findDeletablePlugin(volume)
if err != nil {
return "", false, err
}
if plugin == nil {
// External deleter is requested, do nothing
glog.V(3).Infof("external deleter for volume %q requested, ignoring", volume.Name)
return "", false, nil
}

// Plugin found
pluginName := plugin.GetPluginName()
glog.V(5).Infof("found a deleter plugin %q for volume %q", pluginName, volume.Name)
spec := vol.NewSpecFromPersistentVolume(volume, false)
  
  // 创建一个plugin 的 deleter
deleter, err := plugin.NewDeleter(spec)
if err != nil {
// Cannot create deleter
return pluginName, false, fmt.Errorf("Failed to create deleter for volume %q: %v", volume.Name, err)
}
   
  //调用 Delete()方法执行 存储层面volume的删除
opComplete := util.OperationCompleteHook(pluginName, "volume_delete")
err = deleter.Delete()
opComplete(&err)
if err != nil {
// Deleter failed
return pluginName, false, err
}

glog.V(2).Infof("volume %q deleted", volume.Name)
return pluginName, true, nil
}



// findDeletablePlugin finds a deleter plugin for a given volume. It returns
// either the deleter plugin or nil when an external deleter is requested.
func (ctrl *PersistentVolumeController) findDeletablePlugin(volume *v1.PersistentVolume) (vol.DeletableVolumePlugin, error) {
// Find a plugin. Try to find the same plugin that provisioned the volume
var plugin vol.DeletableVolumePlugin
if metav1.HasAnnotation(volume.ObjectMeta, annDynamicallyProvisioned) {
provisionPluginName := volume.Annotations[annDynamicallyProvisioned]
if provisionPluginName != "" {
      // 通过 volume 中 annotation的 "pv.kubernetes.io/provisioned-by" 查找对应的 plugin 名称
plugin, err := ctrl.volumePluginMgr.FindDeletablePluginByName(provisionPluginName)
if err != nil {
if !strings.HasPrefix(provisionPluginName, "kubernetes.io/") {
// External provisioner is requested, do not report error
return nil, nil
}
return nil, err
}
return plugin, nil
}
}

  // 如果上面没找到再去 volume 的 Spec中查找
// The plugin that provisioned the volume was not found or the volume
// was not dynamically provisioned. Try to find a plugin by spec.
spec := vol.NewSpecFromPersistentVolume(volume, false)
plugin, err := ctrl.volumePluginMgr.FindDeletablePluginBySpec(spec)
if err != nil {
// No deleter found. Emit an event and mark the volume Failed.
return nil, fmt.Errorf("Error getting deleter volume plugin for volume %q: %v", volume.Name, err)
}
return plugin, nil
}

ctrl.claimWorker

ctrl.claimWorker() 方法是通过协程方式启动,对claim(PVC) 进行生命周期管理,即负责处理 PVC 的绑定、创建和删除等操作。

(1) 先从本地informer cache中查询claim, 如果 informer cache中存在, 表示PVC是新来的事件待处理。 交给updateClaim()进行处理;

(2) 如果 informer cache中没有, 表示PVC是已经被删除了的。需要做删除处理

// claimWorker processes items from claimQueue. It must run only once,
// syncClaim is not reentrant.
func (ctrl *PersistentVolumeController) claimWorker() {
workFunc := func() bool {
keyObj, quit := ctrl.claimQueue.Get()
if quit {
return true
}
defer ctrl.claimQueue.Done(keyObj)
key := keyObj.(string)
glog.V(5).Infof("claimWorker[%s]", key)

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
return false
}
claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
if err == nil {
// The claim still exists in informer cache, the event must have
// been add/update/sync
      // 核心代码! 如果 informer cache中存在, 表示PVC是新来的事件待处理。 交给updateClaim()进行处理
ctrl.updateClaim(claim)
return false
}
if !errors.IsNotFound(err) {
glog.V(2).Infof("error getting claim %q from informer: %v", key, err)
return false
}

// The claim is not in informer cache, the event must have been "delete"
claimObj, found, err := ctrl.claims.GetByKey(key)
if err != nil {
glog.V(2).Infof("error getting claim %q from cache: %v", key, err)
return false
}
if !found {
// The controller has already processed the delete event and
// deleted the claim from its cache
glog.V(2).Infof("deletion of claim %q was already processed", key)
return false
}
claim, ok := claimObj.(*v1.PersistentVolumeClaim)
if !ok {
glog.Errorf("expected claim, got %+v", claimObj)
return false
}
    // 核心代码! 如果 informer cache中没有, 表示PVC是已经被删除了的。需要做删除处理
ctrl.deleteClaim(claim)
return false
}
  // 核心代码! 持续循环地从 claimQueue 里面获取到的 PersistentVolumeClaim,并进行相应处理
for {
if quit := workFunc(); quit {
glog.Infof("claim worker queue shutting down")
return
}
}
}

重点函数包括了ctrl.updateClaim(claim)ctrl.deleteClaim(claim)

ctrl.updateClaim(claim)

ctrl.updateClaim(claim) 用于处理 claim 的 ”创建“、”更新“以及周期性更新的事件

// updateClaim runs in worker thread and handles "claim added",
// "claim updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) {
// Store the new claim version in the cache and do not process it if this is
// an old version.
  // 先检查 claim 的版本号,如果不是"新的",就不处理
new, err := ctrl.storeClaimUpdate(claim)
if err != nil {
glog.Errorf("%v", err)
}
if !new {
return
}
  // 核心代码!如果通过上面检查,则使用函数ctrl.syncClaim(claim)进一步处理
err = ctrl.syncClaim(claim)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
glog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err)
} else {
glog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err)
}
}
}

ctrl.syncClaim(claim)

syncClaim是 pv controler的主要方法,它决定了具体如何处理claim。该方法又根据annotation中是否包括"pv.kubernetes.io/bind-completed",再决定将claim进一步交个ctrl.syncUnboundClaim(claim) 或 ctrl.syncBoundClaim(claim) 处理

// syncClaim is the main controller method to decide what to do with a claim.
// It's invoked by appropriate cache.Controller callbacks when a claim is
// created, updated or periodically synced. We do not differentiate between
// these events.
// For easier readability, it was split into syncUnboundClaim and syncBoundClaim
// methods.
func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error {
glog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))

if !metav1.HasAnnotation(claim.ObjectMeta, annBindCompleted) {
    // 如果不包括annotation "pv.kubernetes.io/bind-completed",这个annotation用于标记 bind 是否完成。
    // 此时没有这个annotation,说明是还没有Bind操作的claim,比如处理pengding状态的claim,接下来交给 ctrl.syncUnboundClaim(claim) 处理
return ctrl.syncUnboundClaim(claim)
} else {
    // 如果包括annotation "pv.kubernetes.io/bind-completed",说明已经 bind 过的claim,接下来交给 ctrl.syncBoundClaim(claim) 处理
return ctrl.syncBoundClaim(claim)
}
}

ctrl.syncBoundClaim()

ctrl.syncBoundClaim() 作为 pv controller 最重要的方法,必须仔细分析。代码的逻辑如下:

  1. 如果 claim.Spec.VolumeName 的值为空,说明 claim 之前 Bound过的但是这个值确为空了,这种情况就将claim状态改为"lost"并抛出event并返回
  2. 从本地informer cache中查找对应的 claim.Spec.VolumeName
    • 如果没找对应的volume,说明claim bind了一个不存在的volume。这种情况就将claim状态改为"lost"并抛出event并返回
    • 如果找到对应的volume
      • 如果volume.Spec.ClaimRef == nil,说明claim 已经bind 了volume, 但volume确没有bind对应的claim,这种情况就调用 ctrl.bind(volume, claim)方法让volume 绑定 claim
      • 如果volume.Spec.ClaimRef.UID == claim.UID,说明逻辑都是正常的。接下来任然做一次ctrl.bind(),但是大多数情况会直接返回(因为所有的操作都已经做完了)
      • 其他情况,比如volume.Spec.ClaimRef.UID 不等于 claim.UID,说明bind()错了。这种情况就将claim状态改为"lost"并抛出event并返回
// syncBoundClaim is the main controller method to decide what to do with a
// bound claim.
func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {
// HasAnnotation(pvc, annBindCompleted)
// This PVC has previously been bound
// OBSERVATION: pvc is not "Pending"
// [Unit test set 3]
  
  // 第1种情况: 如果 claim.Spec.VolumeName 的值为空,说明 claim 之前 Bound过的但是这个值异常了,抛出event并返回 
if claim.Spec.VolumeName == "" {
// Claim was bound before but not any more.
if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {
return err
}
return nil
}
  
  // 找到对应的 volume 
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
if err != nil {
return err
}
  // 第2种情况: 如果claim.Spec.VolumeName 有值,claim之前Bound过的,但 volume 不存在了或许被删除了,抛出event并返回 
if !found {
// Claim is bound to a non-existing volume.
if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {
return err
}
return nil
} else {
volume, ok := obj.(*v1.PersistentVolume)
if !ok {
return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)
}

glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
    // 第3种情况:如果claim.Spec.VolumeName 有值,而且volume也存在,但是volume对应的值volume.Spec.ClaimRef为nil
    // 可能有2种场景:1)claim bound了但是volume 确是unbound的。2)claim bound了但是controller 没有来得及updated volume
    // 不论具体是哪种场景,接下来都再次执行一次ctrl.bind()操作
if volume.Spec.ClaimRef == nil {
// Claim is bound but volume has come unbound.
// Or, a claim was bound and the controller has not received updated
// volume yet. We can't distinguish these cases.
// Bind the volume again and set all states to Bound.
glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))
if err = ctrl.bind(volume, claim); err != nil {
// Objects not saved, next syncPV or syncClaim will try again
return err
}
return nil
      // 第4种情况: 如果claim.Spec.VolumeName 有值,而且volume也存在,而且volume对应的值volume.Spec.ClaimRef 不为nil 
      // 而且 volume.Spec.ClaimRef.UID == claim.UID;说明逻辑都是正常的。接下来任然做一次ctrl.bind()
} else if volume.Spec.ClaimRef.UID == claim.UID {
// All is well
// NOTE: syncPV can handle this so it can be left out.
// NOTE: bind() call here will do nothing in most cases as
// everything should be already set.
glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))
if err = ctrl.bind(volume, claim); err != nil {
// Objects not saved, next syncPV or syncClaim will try again
return err
}
return nil
      // 其他情况: volume.Spec.ClaimRef.UID 不等于 claim.UID,说明bind()错了。这时抛出event
} else {
// Claim is bound but volume has a different claimant.
// Set the claim phase to 'Lost', which is a terminal
// phase.
if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {
return err
}
return nil
}
}
}

ctrl.bind(volume, claim)


// bind saves binding information both to the volume and the claim and marks
// both objects as Bound. Volume is saved first.
// It returns on first error, it's up to the caller to implement some retry
// mechanism.
func (ctrl *PersistentVolumeController) bind(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) error {
var err error
var updatedClaim *v1.PersistentVolumeClaim
var updatedVolume *v1.PersistentVolume

  // 将 volume bind 到 claim
if updatedVolume, err = ctrl.bindVolumeToClaim(volume, claim); err != nil {
return err
}
volume = updatedVolume
  // 更新 volume 的phase 为Bound 状态
if updatedVolume, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {
return err
}
volume = updatedVolume
   
  // 将 volume claim 到 volume
if updatedClaim, err = ctrl.bindClaimToVolume(claim, volume); err != nil {
return err
}
claim = updatedClaim

  // 更新 claim 的phase 为Bound 状态
if updatedClaim, err = ctrl.updateClaimStatus(claim, v1.ClaimBound, volume); err != nil {
return err
}
claim = updatedClaim

return nil
}

syncUnboundClaim

  1. 如果claim.Spec.VolumeName == " ",说明该PVC 是一个还没有完成bingding的新的PVC,此时PVC处于"Pengding"状态

    • 这时就从集群中存在的 volume 中查询是否匹配的 claim(空间大小满足同时access模式满足而且处于”Available“ 状态的), 根据是否没找到了匹配的volume,分2中情况分配处理:

    • 如果没找到匹配的volume:

      • 再检查 PVC 对象 中的storageClass,看storageClass的volumeBindingMode 是否启用了延迟bingding,如果启用了,就返回,等该PVC消费者例如pod,需要时才会对这个claim进行bind处理。

      • 如果没有启用延迟bingding,也就是用了默认的Immediate模式,就判断 storageClass 是否设置为"",如果不为空就用claim中设置的 storageClass ,调用ctrl.provisionClaim(claim)方法创建一个新的 volume

    • 如果找到匹配的volume,则调用ctrl.bind(volume, claim)将volume与claim进行绑定,如果绑定正常就正常退出,如果bind错误就返回错误。

  2. 如果claim.Spec.VolumeName != " ",说明 claim 指定了绑定到一个特定的 VolumeName。那接下来先去本地cache中查找对应名称的 volume

    • 如果本地cache中没有找到,说明这时可能 volume 还没创建,那就将claim 状态设置为 “Pengding”,等下一个循环周期再处理,有可能下一个循环周期有volume 就创建好了。

    • 如果本地cache中找到了指定名称的 volume,那就检查volume 的字段volume.Spec.ClaimRef是不是为nil:

      • 如果为nil,说明 volume 是没有被使用的、处于"Availabel"的 volume ,接下来就将volume 与 claim 绑定,完成绑定后,claim 是 “Bound” 状态, pv 也是 "Bound"状态,这时就可以正常退出

      • 如果不为nil,调用方法 isVolumeBoundToClaim(volume, claim) 判断volume 是否已经绑定给这个 claim 了,“也就是说claim 指定要的是 volume,volume也要的是该claim,双向奔赴”,如果已经绑定了,则在次调用ctrl.bind(volume, claim),完成绑定


// syncUnboundClaim is the main controller method to decide what to do with an
// unbound claim.
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {
// This is a new PVC that has not completed binding
// OBSERVATION: pvc is "Pending"
  
  // claim.Spec.VolumeName == "",说明该PVC 是一个还没有完成bingding的新的PVC,此时PVC处于"Pengding"状态
if claim.Spec.VolumeName == "" {
// User did not care which PV they get.
    // 检查 PVC 对象 中的storageClass,看storageClass的volumeBindingMode 是否启用了延迟bingding(默认值为Immediate)
delayBinding, err := ctrl.shouldDelayBinding(claim)
if err != nil {
return err
}

// [Unit test set 1]
    // 从集群中存在的 volume 中查询是否匹配的 claim
volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
if err != nil {
glog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)
return fmt.Errorf("Error finding PV for claim %q: %v", claimToClaimKey(claim), err)
}
    
    // volume == nil,说明没有找到匹配claim的volume
if volume == nil {
glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))
// No PV could be found
// OBSERVATION: pvc is "Pending", will retry
switch {
      
      // 检查 storageClass 中的volumeBindingMode字段是否设置了delayBinding,如果为真,这里只是抛出event,不会再继续为PVC 创建对应的 PV
      // 需要等到消费者完成准备(比如需要挂载该PVC的pod已经OK了),才会去创建PV,并做绑定
case delayBinding:
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding")
      // 判断 storageClass 是否设置为"",如果不为空就用claim中设置的 storageClass 来创建一个新的 volume 
case v1helper.GetPersistentVolumeClaimClass(claim) != "":
        // ctrl.provisionClaim(claim)方法 用于创建新的 volume
if err = ctrl.provisionClaim(claim); err != nil {
return err
}
return nil
        // 如果没有找到,而且也没设置 storageClass ,则抛出event
default:
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")
}

// Mark the claim as Pending and try to find a match in the next
// periodic syncClaim
      
      // 将 claim 状态设置为 "Pending",等待下一个循环周期到来时会再次处理
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
      // 如果 volume != nil,说明找到匹配claim的处于”Available“ 状态的 volume,这种情况就调用ctrl.bind(volume, claim),将volume 与 claim 绑定
} else /* pv != nil */ {
// Found a PV for this claim
// OBSERVATION: pvc is "Pending", pv is "Available"
glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), volume.Name, getVolumeStatusForLogging(volume))
if err = ctrl.bind(volume, claim); err != nil {
// On any error saving the volume or the claim, subsequent
// syncClaim will finish the binding.
return err
}
// OBSERVATION: claim is "Bound", pv is "Bound"
      // 完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出
return nil
}
    
    
    
    // 如果 pvc.Spec.VolumeName != nil ,说明 claim 指定了绑定到对应的 VolumeName;那边接下来先去本地cache中查找对应名称的 volume
} else /* pvc.Spec.VolumeName != nil */ {
// [Unit test set 2]
// User asked for a specific PV.
glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)
if err != nil {
return err
}
    
    // 如果本地cache中没有找到,说明这是可能 volume 还没创建,那就将claim 状态设置为 "Pengding",等下一个循环周期再处理,有可能下一个循环周期 
    // 有可能下一个循环周期 volume 就创建好了
if !found {
// User asked for a PV that does not exist.
// OBSERVATION: pvc is "Pending"
// Retry later.
glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)
      // 更新 claim 的状态为 'Pengding'
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
      // 如果本地cache中找到了指定名称的 volume,那就检查volume 的字段volume.Spec.ClaimRef不是为nil
      //如果为nil,说明 volume 是没有被使用的,处于"Availabel"的 volume ,接下来就将volume 与 claim 绑定
} else {
volume, ok := obj.(*v1.PersistentVolume)
if !ok {
return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)
}
glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))
      // volume.Spec.ClaimRef == nil, 则将volume 与 claim 绑定
if volume.Spec.ClaimRef == nil {
// User asked for a PV that is not claimed
// OBSERVATION: pvc is "Pending", pv is "Available"
glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))
        // 绑定前做一些检查工作
if err = checkVolumeSatisfyClaim(volume, claim); err != nil {
glog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)
//send an event
msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)
//volume does not satisfy the requirements of the claim
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
          // 执行 claim 与 volume的绑定
} else if err = ctrl.bind(volume, claim); err != nil {
// On any error saving the volume or the claim, subsequent
// syncClaim will finish the binding.
return err
}
// OBSERVATION: pvc is "Bound", pv is "Bound"
        // 如果完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出
return nil
        
        // 接下来看看,如果volume.Spec.ClaimRef != nil时,
        // 而且isVolumeBoundToClaim(volume, claim)判断volume 如果已经绑定给 claim 了,而此时应为PVC 是 "Pengding"的,所以调用ctrl.bind(volume, claim),完成绑定
} else if isVolumeBoundToClaim(volume, claim) {
// User asked for a PV that is claimed by this PVC
// OBSERVATION: pvc is "Pending", pv is "Bound"
glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))

// Finish the volume binding by adding claim UID.
if err = ctrl.bind(volume, claim); err != nil {
return err
}
// OBSERVATION: pvc is "Bound", pv is "Bound"
         // 如果完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出
return nil
} else {
// User asked for a PV that is claimed by someone else
// OBSERVATION: pvc is "Pending", pv is "Bound"
if !metav1.HasAnnotation(claim.ObjectMeta, annBoundByController) {
glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))
// User asked for a specific PV, retry later
if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {
return err
}
return nil
} else {
// This should never happen because someone had to remove
// annBindCompleted annotation on the claim.
glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))
return fmt.Errorf("Invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))
}
}
}
}
}

provisionClaim

接下来分析一下provisionClaim是如何为claim,创建volume的。方法调用流程如下:

provisionClaim(claim) —> ctrl.provisionClaimOperation(claim) —> provisionClaimOperation(claim)

provisionClaim(claim)的逻辑是先判断是否启用了动态dynamicProvisoning,如果没启动则退出。如果启用了调用ctrl.provisionClaimOperation(claim)方法继续处理。

// provisionClaim starts new asynchronous operation to provision a claim if
// provisioning is enabled.
func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolumeClaim) error {
if !ctrl.enableDynamicProvisioning {
return nil
}
glog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim))
opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))
startTime := time.Now()
  // 调用ctrl.scheduleOperation()方法,执行provisionClaim; 这里ctrl.scheduleOperation()方法是保证只有一个gorouting运行
ctrl.scheduleOperation(opName, func() error {
    // ctrl.provisionClaimOperation(claim) 为核心代码
pluginName, err := ctrl.provisionClaimOperation(claim)
timeTaken := time.Since(startTime).Seconds()
metrics.RecordVolumeOperationMetric(pluginName, "provision", timeTaken, err)
return err
})
return nil
}

provisionClaimOperation() 是创建卷的核心方法,它的逻辑如下

(1) 获取创建claim的storageclass

(2) 为 claim 添加一个 provisioner 的 annotation标识(volume.beta.kubernetes.io/storage-provisioner= class.Provisioner),这个标识表示这个claim 是由哪个plugin实现provision的

(3) 获取 Provisiong 的插件plugin,如果没有plugin==nil表示没有找到插件,则抛出event后退出。

(4) 如果找到plugin:

- 通过claim 的名称组装pv的名称(pv的名称就是"pvc"-claim.UID)
- 通过pvName 去k8s apisever查询是否存在volume, 如果已存在,则不需要再provision volume了,则退出
- 如果volume 不存在,准备开始创建volume,先准备创建volume需要的参数,之后创建一个provisioner 接口,provisioner实现了provision volume 的具体方法
- 执行 provision() 方法,这个方法会调用csi plugin,创建一个新的volume。具体什么volume,则要看plugin类型,比如ceph rbd或nfs等等
- 为创建出来的 volume 关联 pvc 对象(ClaimRef),尝试为volume 创建 pv 对象 (重复多次)

// provisionClaimOperation provisions a volume. This method is running in
// standalone goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) (string, error) {
// 获取创建claim的storageclass
  claimClass := v1helper.GetPersistentVolumeClaimClass(claim)
glog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)

  // 获取 Provisiong 的插件plugin
plugin, storageClass, err := ctrl.findProvisionablePlugin(claim)
if err != nil {
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error())
glog.V(2).Infof("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err)
// The controller will retry provisioning the volume in every
// syncVolume() call.
return "", err
}

var pluginName string
if plugin != nil {
pluginName = plugin.GetPluginName()
}

// Add provisioner annotation so external provisioners know when to start
  // 为 claim 添加一个 provisioner annotation
newClaim, err := ctrl.setClaimProvisioner(claim, storageClass)
if err != nil {
// Save failed, the controller will retry in the next sync
glog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)
return pluginName, err
}
claim = newClaim

  // 如果没有找到plugin,则抛出event后退出。
if plugin == nil {
// findProvisionablePlugin returned no error nor plugin.
// This means that an unknown provisioner is requested. Report an event
// and wait for the external provisioner
msg := fmt.Sprintf("waiting for a volume to be created, either by external provisioner %q or manually created by system administrator", storageClass.Provisioner)
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ExternalProvisioning, msg)
glog.V(3).Infof("provisioning claim %q: %s", claimToClaimKey(claim), msg)
return pluginName, nil
}

// internal provisioning

//  A previous doProvisionClaim may just have finished while we were waiting for
//  the locks. Check that PV (with deterministic name) hasn't been provisioned
//  yet.

  // plugin != nil
  // 通过claim 的名称组装pv的名称(pv的名称就是"pvc"-claim.UID)
pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
  // 通过pvName 去k8s apisever查询是否存在volume, 如果已存在,则不需要再provision volume了,则退出函数,
volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})
if err == nil && volume != nil {
// Volume has been already provisioned, nothing to do.
glog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))
return pluginName, err
}

// Prepare a claimRef to the claim early (to fail before a volume is
// provisioned)
claimRef, err := ref.GetReference(scheme.Scheme, claim)
if err != nil {
glog.V(3).Infof("unexpected error getting claim reference: %v", err)
return pluginName, err
}

// Gather provisioning options
tags := make(map[string]string)
tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace
tags[CloudVolumeCreatedForClaimNameTag] = claim.Name
tags[CloudVolumeCreatedForVolumeNameTag] = pvName

  // 准备创建volume需要的参数
options := vol.VolumeOptions{
PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,
MountOptions:                  storageClass.MountOptions,
CloudTags:                     &tags,
ClusterName:                   ctrl.clusterName,
PVName:                        pvName,
PVC:                           claim,
Parameters:                    storageClass.Parameters,
}

// Refuse to provision if the plugin doesn't support mount options, creation
// of PV would be rejected by validation anyway
if !plugin.SupportsMountOption() && len(options.MountOptions) > 0 {
strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)
glog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())
}

// Provision the volume
  // 开始provision一个volume
  // 创建一个provisioner对象,provisioner实现了创建volume 的具体方法
provisioner, err := plugin.NewProvisioner(options)
if err != nil {
strerr := fmt.Sprintf("Failed to create provisioner: %v", err)
glog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, err
}

var selectedNode *v1.Node = nil
if nodeName, ok := claim.Annotations[annSelectedNode]; ok {
selectedNode, err = ctrl.NodeLister.Get(nodeName)
if err != nil {
strerr := fmt.Sprintf("Failed to get target node: %v", err)
glog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, err
}
}
allowedTopologies := storageClass.AllowedTopologies

opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
  
  // 执行 provision() 方法,这个方法会调用csi plugin,创建一个新的volume。
  // 具体什么volume,则要看plugin类型,比如ceph rbd或nfs等等
volume, err = provisioner.Provision(selectedNode, allowedTopologies)
opComplete(&err)
if err != nil {
// Other places of failure have nothing to do with VolumeScheduling,
// so just let controller retry in the next sync. We'll only call func
// rescheduleProvisioning here when the underlying provisioning actually failed.
ctrl.rescheduleProvisioning(claim)

strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)
glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)
return pluginName, err
}

glog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))

// Create Kubernetes PV object for the volume.
if volume.Name == "" {
volume.Name = pvName
}
// Bind it to the claim
  // 将 volume bind 到 claim
volume.Spec.ClaimRef = claimRef
volume.Status.Phase = v1.VolumeBound
volume.Spec.StorageClassName = claimClass

// Add annBoundByController (used in deleting the volume)
  // 为 volume 添加必要的 annotation
metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annBoundByController, "yes")
metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, plugin.GetPluginName())

// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
glog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)
var newVol *v1.PersistentVolume
if newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
glog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))
err = nil
} else {
glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))

_, updateErr := ctrl.storeVolumeUpdate(newVol)
if updateErr != nil {
// We will get an "volume added" event soon, this is not a big error
glog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)
}
}
break
}
// Save failed, try again after a while.
glog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)
time.Sleep(ctrl.createProvisionedPVInterval)
}

  // err != nil 说明创建volume失败了,失败后需要对残留的volume 进行删除清理
if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)
glog.V(3).Info(strerr)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)

var deleteErr error
var deleted bool
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
      // 删除 volume
_, deleted, deleteErr = ctrl.doDeleteVolume(volume)
if deleteErr == nil && deleted {
// Delete succeeded
glog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)
break
}
if !deleted {
// This is unreachable code, the volume was provisioned by an
// internal plugin and therefore there MUST be an internal
// plugin that deletes it.
glog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())
break
}
// Delete failed, try again after a while.
glog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)
time.Sleep(ctrl.createProvisionedPVInterval)
}

if deleteErr != nil {
// Delete failed several times. There is an orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)
glog.V(2).Info(strerr)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)
}
    // err == nil,表示provision创建成功,这是抛出event并退出
} else {
glog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))
msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)
}
return pluginName, nil
}

总结

PV controller的作用是对PV和PVC资源生命周期管理: 首先,PV控制器负责将PVC与合适的PV进行绑定。它会根据PVC的请求和PV的可用性来匹配合适的存储资源。同时PV控制器管理PV的状态,确保PV在被使用时不会被其他PVC绑定,同时也会处理PV的回收和删除。其次,如果PVC配置了StorageClass,PV控制器可以动态地为PVC创建新的PV。

本文通过源码学习了控制器的原理是如何实现。

参考文献

  1. kube-controller-manager源码分析-PV controller分析

  2. 从零开始入门 K8s | Kubernetes 存储架构及插件使用

  3. kubernetes pv-controller 解析


原文地址:https://blog.csdn.net/xsw164711368/article/details/144647640

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!