自学内容网 自学内容网

k8s-Informer之Reflector的解析

Reflect 概述

Reflector 从 kube-apiserver 中 list&watch 资源对象,用于监听指定资源的 Kubernetes 。当资源对象发生变化时(如:添加和删除等事件),Reflector 会将其这些资源对象的变化包装成Delta并将其丢到DeltaFIFO中。其实就是将 Etcd 的对象及其变化反射到DeltaFIFO中,实时更新本地缓存,确保本地数据和 ETCD 数据一致。

源码位置:k8s.io/client-go/tools/cache/reflector.go
(1)Reflector 它的数据结构如下:

 type Reflector struct {
   name string
   expectedTypeName string
   expectedType reflect.Type // 放到Store中(即DeltaFIFO中)的对象类型
   expectedGVK *schema.GroupVersionKind
   store Store // 与 Watch 源同步的⽬标,会赋值为 DeltaFIFO
   listerWatcher ListerWatcher // ListerWatcher是个interface(含list和watch)
   backoffManager wait.BackoffManager
   initConnBackoffManager wait.BackoffManager
   MaxInternalErrorRetryDuration time.Duration
   resyncPeriod time.Duration // 重新同步周期
   ShouldResync func() bool
   clock clock.Clock
   paginatedResult bool
   lastSyncResourceVersion string
   isLastSyncResourceVersionUnavailable bool
   lastSyncResourceVersionMutex sync.RWMutex
   WatchListPageSize int64
   watchErrorHandler WatchErrorHandler
}

(2)Reflector 初始化

通过 NewReflector 实例化 Reflector 对象,在实例中需要传入的 ListerWatcher 数据接口对象,这个包含核心 List 和 Watch 方法,主要是负责 List 和 Watch 指定的 Kubernetes APIServer 资源。


func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {  
   return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)  
}

// NewNamedReflector same as NewReflector, but with a specified name for logging  
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {  
   realClock := &clock.RealClock{}  
   r := &Reflector{  
      name:          name,  
      listerWatcher: lw,  
      store:         store,  
      initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),  
      resyncPeriod:           resyncPeriod,  
      clock:                  realClock,  
      watchErrorHandler:      WatchErrorHandler(DefaultWatchErrorHandler),  
   }  
   r.setExpectedType(expectedType)  
   return r  
}

(3)ListerWatcher interface

type Lister interface {  
   List(options metav1.ListOptions) (runtime.Object, error)  
}  

type Watcher interface {  
Watch(options metav1.ListOptions) (watch.Interface, error)  
}  

type ListerWatcher interface {  
   Lister  
   Watcher
}

(4)ListWatch struct

type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

type ListWatch struct {  
   ListFunc  ListFunc  
   WatchFunc WatchFunc  
   DisableChunking bool  
}

(5)Reflector 启动

创建 Reflector 对象后, Run 方法启动监听并处理事件,通过 wait.BackoffUntil 不断调用 ListAndWatch 方法,如果该方法 return 了,那么就会发生re-list,watch过程则被嵌套在for循环中。 Run() 中最核心的就是 List-Watch 方法。

func (r *Reflector) Run(stopCh <-chan struct{}) {  
   klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)  
   wait.BackoffUntil(func() {  
      if err := r.ListAndWatch(stopCh); err != nil {  
         r.watchErrorHandler(r, err)  
      }  
   }, r.backoffManager, true, stopCh)  
   klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)  
}

ListAndWatch 核心代码:

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {  
   // ...
   // list 获取资源下的所有对象的数据  
   err := r.list(stopCh)  
   if err != nil {  
      return err  
   }  
  
   // go 部分
   go func() {  
      // 返回重新同步的定时通道  
      resyncCh, cleanup := r.resyncChan()  
      // ...
      for {  
         select {  
         case <-resyncCh:  
         case <-stopCh:  
            return  
         case <-cancelCh:  
            return  
         }  
         // 判断是否需要执行Resync操作,即重新同步  
         if r.ShouldResync == nil || r.ShouldResync() {  
            klog.V(4).Infof("%s: forcing resync", r.name)  
            // Resync 机制会将本地存储(LocalStore)的资源对象同步到 DeltaFIFO 中  
            if err := r.store.Resync(); err != nil {  
               resyncerrc <- err  
               return  
            }  
         }  
         cleanup()  
         // 重新启⽤定时器定时触发  
         resyncCh, cleanup = r.resyncChan()  
      }  
   }()  
  
   // for 部分 
   for {  
      // 1、stopCh处理,判断是否需要退出循环 
      select {  
      case <-stopCh:  
         return nil  
      default:  
      }  
      
      // 2、将resourceVersion为最新的resourceVersion,即从list回来的最新resourceVersion开始执行watch操作
      timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))  
      options := metav1.ListOptions{  
         ResourceVersion: r.LastSyncResourceVersion(),
         TimeoutSeconds: &timeoutSeconds,
         AllowWatchBookmarks: true}   
  
  // 3、 开始监听
      start := r.clock.Now()  
  w, err := r.listerWatcher.Watch(options)
  
      // 4、Reflctor 组件的功能: 事件处理函数  
      // 事件处理函数,当触发增删改时,将对应的资源对象更新到本地缓存 DeltaFIFO,并设置 ResouceVersion 最新  
      err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)  
      // ... 
   }  
}

r.watchHandler() 函数:

watchHandler()函数是 reflector组件的一个重要函数,它负责监听Kubernetes API server中的对象变更事件。如当触发增删改时,将对应的资源对象更新到本地缓存 DeltaFIFO,并设置 ResouceVersion 最新。

主要逻辑:
(1)从watch操作返回来的结果中获取event事件;
(2)接收到每个事件后,watchHandler()函数会判断该事件是否为错误事件及根据事件类型作出处理;
(3)获得当前watch到资源的ResourceVersion;
(4)判断不同类型的event事件作出相应处理;
(5)调用r.setLastSyncResourceVersion,更新Reflector对象中存储的最新的资源版本号。

循环操作,直至event事件处理完毕。


func watchHandler(start time.Time,  
   w watch.Interface,  
   store Store,  
   expectedType reflect.Type,  
   expectedGVK *schema.GroupVersionKind,  
   name string,  
   expectedTypeName string,  
   setLastSyncResourceVersion func(string),  
   clock clock.Clock,  
   errc chan error,  
   stopCh <-chan struct{},  
) error {  
   eventCount := 0  
  
   // Stopping the watcher should be idempotent and if we return from this function there's no way  
   // we're coming back in with the same watch interface.   defer w.Stop()  
  
loop:  
   for {  
      select {  
      case <-stopCh:  
         return errorStopRequested  
      case err := <-errc:  
         return err  
      case event, ok := <-w.ResultChan():  
         if !ok {  // 错误事件,可能与客户端的连接已断开,则重试机制下尝试重新连接
            break loop  
         }  
         if event.Type == watch.Error {  
            return apierrors.FromObject(event.Object)  
         }  
         if expectedType != nil {  
            if e, a := expectedType, reflect.TypeOf(event.Object); e != a {  
               utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))  
               continue  
            }  
         }  
         if expectedGVK != nil {  
            if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {  
               utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))  
               continue  
            }  
         }  
         meta, err := meta.Accessor(event.Object)  
         if err != nil {  
            utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))  
            continue  
         }  

         // 获得当前watch到资源的ResourceVersion
         resourceVersion := meta.GetResourceVersion()  
         
         switch event.Type {  
         // 不同类型的event事件,调用不同函数处理。如事件为Added则调用 store.Add 处理
         case watch.Added:  
            err := store.Add(event.Object)  
            if err != nil {  
               utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))  
            }  
         case watch.Modified:  
            err := store.Update(event.Object)  
            if err != nil {  
               utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))  
            }  
         case watch.Deleted:  
            // TODO: Will any consumers need access to the "last known  
            // state", which is passed in event.Object? If so, may need  
            // to change this.            err := store.Delete(event.Object)  
            if err != nil {  
               utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))  
            }  
         case watch.Bookmark:  
            // A `Bookmark` means watch has synced here, just update the resourceVersion  
         default:  
            utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))  
         }  
         // 记录Reflector对象已经处理过的最新的资源版本号,以便在下次请求资源数据时能够从该版本号开始监听资源变更。
         setLastSyncResourceVersion(resourceVersion)  
         if rvu, ok := store.(ResourceVersionUpdater); ok {  
            rvu.UpdateResourceVersion(resourceVersion)  
         }  
         eventCount++  
      }  
   }  
  
   watchDuration := clock.Since(start)  
   if watchDuration < 1*time.Second && eventCount == 0 {  
      return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)  
   }  
   klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)  
   return nil  
}

到这里 Reflector 组件的功能基本就结束了,接下来分析 DeltaFIFO 组件。


原文地址:https://blog.csdn.net/realize_dream/article/details/144310749

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!