k8s-Informer之Reflector的解析
Reflect 概述
Reflector 从 kube-apiserver 中 list&watch 资源对象,用于监听指定资源的 Kubernetes 。当资源对象发生变化时(如:添加和删除等事件),Reflector 会将其这些资源对象的变化包装成Delta并将其丢到DeltaFIFO中。其实就是将 Etcd 的对象及其变化反射到DeltaFIFO中,实时更新本地缓存,确保本地数据和 ETCD 数据一致。
源码位置:k8s.io/client-go/tools/cache/reflector.go
(1)Reflector 它的数据结构如下:
type Reflector struct {
name string
expectedTypeName string
expectedType reflect.Type // 放到Store中(即DeltaFIFO中)的对象类型
expectedGVK *schema.GroupVersionKind
store Store // 与 Watch 源同步的⽬标,会赋值为 DeltaFIFO
listerWatcher ListerWatcher // ListerWatcher是个interface(含list和watch)
backoffManager wait.BackoffManager
initConnBackoffManager wait.BackoffManager
MaxInternalErrorRetryDuration time.Duration
resyncPeriod time.Duration // 重新同步周期
ShouldResync func() bool
clock clock.Clock
paginatedResult bool
lastSyncResourceVersion string
isLastSyncResourceVersionUnavailable bool
lastSyncResourceVersionMutex sync.RWMutex
WatchListPageSize int64
watchErrorHandler WatchErrorHandler
}
(2)Reflector 初始化
通过 NewReflector 实例化 Reflector 对象,在实例中需要传入的 ListerWatcher 数据接口对象,这个包含核心 List 和 Watch 方法,主要是负责 List 和 Watch 指定的 Kubernetes APIServer 资源。
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
realClock := &clock.RealClock{}
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
}
r.setExpectedType(expectedType)
return r
}
(3)ListerWatcher interface
type Lister interface {
List(options metav1.ListOptions) (runtime.Object, error)
}
type Watcher interface {
Watch(options metav1.ListOptions) (watch.Interface, error)
}
type ListerWatcher interface {
Lister
Watcher
}
(4)ListWatch struct
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
DisableChunking bool
}
(5)Reflector 启动
创建 Reflector 对象后, Run 方法启动监听并处理事件,通过 wait.BackoffUntil 不断调用 ListAndWatch 方法,如果该方法 return 了,那么就会发生re-list,watch过程则被嵌套在for循环中。 Run() 中最核心的就是 List-Watch 方法。
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
ListAndWatch 核心代码:
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// ...
// list 获取资源下的所有对象的数据
err := r.list(stopCh)
if err != nil {
return err
}
// go 部分
go func() {
// 返回重新同步的定时通道
resyncCh, cleanup := r.resyncChan()
// ...
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
// 判断是否需要执行Resync操作,即重新同步
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
// Resync 机制会将本地存储(LocalStore)的资源对象同步到 DeltaFIFO 中
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
// 重新启⽤定时器定时触发
resyncCh, cleanup = r.resyncChan()
}
}()
// for 部分
for {
// 1、stopCh处理,判断是否需要退出循环
select {
case <-stopCh:
return nil
default:
}
// 2、将resourceVersion为最新的resourceVersion,即从list回来的最新resourceVersion开始执行watch操作
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options := metav1.ListOptions{
ResourceVersion: r.LastSyncResourceVersion(),
TimeoutSeconds: &timeoutSeconds,
AllowWatchBookmarks: true}
// 3、 开始监听
start := r.clock.Now()
w, err := r.listerWatcher.Watch(options)
// 4、Reflctor 组件的功能: 事件处理函数
// 事件处理函数,当触发增删改时,将对应的资源对象更新到本地缓存 DeltaFIFO,并设置 ResouceVersion 最新
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)
// ...
}
}
r.watchHandler() 函数:
watchHandler()
函数是 reflector
组件的一个重要函数,它负责监听Kubernetes API server中的对象变更事件。如当触发增删改时,将对应的资源对象更新到本地缓存 DeltaFIFO,并设置 ResouceVersion 最新。
主要逻辑:
(1)从watch操作返回来的结果中获取event事件;
(2)接收到每个事件后,watchHandler()
函数会判断该事件是否为错误事件及根据事件类型作出处理;
(3)获得当前watch到资源的ResourceVersion;
(4)判断不同类型的event事件作出相应处理;
(5)调用r.setLastSyncResourceVersion,更新Reflector对象中存储的最新的资源版本号。
循环操作,直至event事件处理完毕。
func watchHandler(start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
clock clock.Clock,
errc chan error,
stopCh <-chan struct{},
) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface. defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok { // 错误事件,可能与客户端的连接已断开,则重试机制下尝试重新连接
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if expectedType != nil {
if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
continue
}
}
if expectedGVK != nil {
if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
continue
}
// 获得当前watch到资源的ResourceVersion
resourceVersion := meta.GetResourceVersion()
switch event.Type {
// 不同类型的event事件,调用不同函数处理。如事件为Added则调用 store.Add 处理
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Modified:
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this. err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
}
// 记录Reflector对象已经处理过的最新的资源版本号,以便在下次请求资源数据时能够从该版本号开始监听资源变更。
setLastSyncResourceVersion(resourceVersion)
if rvu, ok := store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(resourceVersion)
}
eventCount++
}
}
watchDuration := clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
return nil
}
到这里 Reflector 组件的功能基本就结束了,接下来分析 DeltaFIFO 组件。
原文地址:https://blog.csdn.net/realize_dream/article/details/144310749
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!