自学内容网 自学内容网

[k8s源码]6.reflector

Reflector 和 Informer 是 Kubernetes 客户端库中两个密切相关但职责不同的组件。Reflector 是一个较低级别的组件,主要负责与 Kubernetes API 服务器进行交互,执行资源的初始列表操作和持续的监视操作,将获取到的数据放入队列中。而 Informer 是一个更高级别的抽象,它内部使用了 Reflector,但提供了更全面的功能。Informer 不仅负责数据同步,还维护了资源对象的本地缓存,并提供了事件处理机制,允许开发者注册自定义的事件处理函数。

Informer 实际上通过一个称为 Controller 的内部组件来管理 Reflector。在这个架构中,Reflector 负责从 Kubernetes API 服务器获取数据并将其放入一个 DeltaFIFO queue。Controller 则从这个 queue 中取出数据,更新 Informer 的 LocalStore(这是一个持久化的缓存),并触发相应的事件处理器。这种设计允许 Informer 提供一个高级抽象,隐藏了底层的复杂性。虽然 Informer 不直接调用 Reflector 的方法,但它们通过 Controller 紧密集成。这种架构确保了各个组件职责单一:Reflector 专注于 API 交互,DeltaFIFO 管理变更队列,Controller 协调数据流,而 Informer 则提供高级 API 和缓存管理。这种设计使得开发者可以方便地使用 Informer,而无需直接处理 Reflector 或了解内部 queue 和 store 的细节。

以kubernetes源码中的reflector的一个测试为例:
执行这个test-reflector.go文件,这种test文件可以封装一些测试的代码,更好的帮助我们理解reflector的工作原理。


func TestReflectorListAndWatch(t *testing.T) {
createdFakes := make(chan *watch.FakeWatcher)

// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc
// to get called at the beginning of the watch with 1, and again with 3 when we
// inject an error.
expectedRVs := []string{"1", "3"}
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
rv := options.ResourceVersion
fw := watch.NewFake()
if e, a := expectedRVs[0], rv; e != a {
t.Errorf("Expected rv %v, but got %v", e, a)
}
expectedRVs = expectedRVs[1:]
// channel is not buffered because the for loop below needs to block. But
// we don't want to block here, so report the new fake via a go routine.
go func() { createdFakes <- fw }()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
}
s := NewFIFO(MetaNamespaceKeyFunc)
r := NewReflector(lw, &v1.Pod{}, s, 0)
go r.ListAndWatch(wait.NeverStop)

ids := []string{"foo", "bar", "baz", "qux", "zoo"}
var fw *watch.FakeWatcher
for i, id := range ids {
if fw == nil {
fw = <-createdFakes
}
sendingRV := strconv.FormatUint(uint64(i+2), 10)
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}})
if sendingRV == "3" {
// Inject a failure.
fw.Stop()
fw = nil
}
}

// Verify we received the right ids with the right resource versions.
for i, id := range ids {
pod := Pop(s).(*v1.Pod)
if e, a := id, pod.Name; e != a {
t.Errorf("%v: Expected %v, got %v", i, e, a)
}
if e, a := strconv.FormatUint(uint64(i+2), 10), pod.ResourceVersion; e != a {
t.Errorf("%v: Expected %v, got %v", i, e, a)
}
}

if len(expectedRVs) != 0 {
t.Error("called watchStarter an unexpected number of times")
}
}

第一部分初始化一个通道createdFakes,初始化一个watcher,有两个方法,为watch和list。watch方法会创建watcher,并传递到createdFakes通道里面。这里的watchFunc是一个闭包:

闭包(Closure)是指一个函数可以捕获并“记住”其作用域外部的变量,即使这个变量在闭包函数的作用域之外。这种特性使得闭包能够访问和操作其外部函数中的变量。捕获外部变量:闭包可以捕获外部函数的局部变量,并且在闭包函数内使用这些变量,即使外部函数已经执行完毕。持久性:闭包会“记住”它捕获的变量,即使外部函数已经返回。状态共享:通过闭包,可以在不同的函数调用之间共享状态。

这里的fw是闭包里面创建的watcher,并不和外面的fw冲突,如果不理解,这里的fw也可以改为别的名字。

fw := watch.NewFake()
go func() { createdFakes <- fw }()

 然后初始化reflector

s := NewFIFO(MetaNamespaceKeyFunc)
r := NewReflector(lw, &v1.Pod{}, s, 0)
go r.ListAndWatch(wait.NeverStop)

然后下面是测试部分,首先为fw赋值为watcher,一开始为nil,而在上面r=NewReflector初始化的时候,就已经自动调用了watchFunc,创建了一个watcher。随后这个watcher被拿出来给了fw。sendingRV随机生成一个资源的版本,随着从ids拿出来的值,作为pod的属性用来创建pod,在测试中,fw.Add(...) 模拟了 Kubernetes API 服务器添加新 Pod 的行为。在实际环境中,当新 Pod 被创建时,Watch 操作会接收到这个事件。测试使用 fw.Add(...) 来模拟这个过程,向 FakeWatcher 发送一个新 Pod 被添加的事件。

 ids := []string{"foo", "bar", "baz", "qux", "zoo"}
    var fw *watch.FakeWatcher
    for i, id := range ids {
        if fw == nil {
            fw = <-createdFakes
        }
        sendingRV := strconv.FormatUint(uint64(i+2), 10)
        fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}})
        if sendingRV == "3" {
            // Inject a failure.
            fw.Stop()
            fw = nil
        }
    }

如果sendingRV=3,那么就会将fw停止并设为nil。这模拟了如果resourceVersion错误会发生什么,但是reflector已经设为neverStop,那么此时ListAndWatch会重新调用watchFunc,然后创建新的watcher,放入createdFakes中,从而fw可以重新拿取。 


原文地址:https://blog.csdn.net/weixin_45396500/article/details/140617864

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!