informer中DeltaFIFO机制的实现分析与源码解读
informer中的DeltaFIFO机制的实现分析与源码解读
DeltaFIFO作为informer中重要组件,本文从源码层面了解是如何DelatFIFO是实现的。
DeltaFIFO的定义
找到delta_fifo.go的源码,位于client-go/tools/cache/delta_fifo.go
代码结构大致如下:
store定义了一个通用的存储接口
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
Resync() error
}
Queue接口继承了store,但添加了Pop()重要方法,实现了队列的能力
// Queue is exactly like a Store, but has a Pop() method too.
type Queue interface {
Store
// Pop blocks until it has something to process.
// It returns the object that was process and the result of processing.
// The PopProcessFunc may return an ErrRequeue{...} to indicate the item
// should be requeued before releasing the lock on the queue.
Pop(PopProcessFunc) (interface{}, error)
// AddIfNotPresent adds a value previously
// returned by Pop back into the queue as long
// as nothing else (presumably more recent)
// has since been added.
AddIfNotPresent(interface{}) error
// HasSynced returns true if the first batch of items has been popped
HasSynced() bool
// Close queue
Close()
}
FIFO类型实现了Queue接口
type FIFO struct {
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in the queue and vice versa.
items map[string]interface{}
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
}
var (
_ = Queue(&FIFO{}) // FIFO is a Queue
)
DeltaFIFO类型也实现了Queue接口,与FIFO主要的区别是有两种特殊的方式:replaced和sync。replaced一般发生在资源版本更新时,而sync由resync定时发起。
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
}
var (
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)
DeltaFIFO的构造函数
// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: keyFunc,
KnownObjects: knownObjects,
})
}
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc // 如果不指定keyFunc,默认就是MetaNamespaceKeyFunc
}
f := &DeltaFIFO{
items: map[string]Deltas{}, // 存放[]Delta的数组,
queue: []string{},// 存储obj的key,key通常是ns/name格式的字符串
keyFunc: opts.KeyFunction,// 由obj生成key的函数
knownObjects: opts.KnownObjects,
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
transformer: opts.Transformer,
}
f.cond.L = &f.lock
return f
}
var (
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)
Delta的定义
根据Delta数据结构的定义,delta包含了一个资源对象的变更类型及变更的内容。这里的Object不一定是完整的资源数据,大部分场景下只会有变更的部分信息。
// Delta is a member of Deltas (a list of Delta objects) which
// in its turn is the type stored by a DeltaFIFO. It tells you what
// change happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {
Type DeltaType// 表示对obj的操作类型"Added/Updated/Deleted/Replaced/Sync"
Object interface{}// 表示某个资源对象,比如命名为"one"的pod
}
// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta
// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string
// Change type definition
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// Replaced is emitted when we encountered watch errors and had to do a
// relist. We don't know if the replaced object has changed.
//
// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
// as well. Hence, Replaced is only emitted when the option
// EmitDeltaTypeReplaced is true.
Replaced DeltaType = "Replaced"
// Sync is for synthetic events during a periodic resync.
Sync DeltaType = "Sync"
)
Deletas的ADD()入队分析
watch机制监控到事件后,会把事件入队操作。
// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)// 实际调用的是函数queueActionLocked()
}
queueActionLocked的逻辑主要包括从obj生产key(代码中是id),再有actionType和Obj构建一个新的Delta, 再把Delta加入Deltas切片中,之后,把Deltas放入items哈希表,key放入Queue队列中去。要注意Delta加入Deltas时需要进行出重。
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)// 先用keyFunc,通过obj获取到对应的key
if err != nil {
return KeyError{obj, err}
}
newDeltas := append(f.items[id], Delta{actionType, obj})// 用actionType和Obj构建一个新的Delta,再把Delta追加到(f.items[id]返回的)Deltas切片
newDeltas = dedupDeltas(newDeltas)// 对新的Deltas切去去重
if len(newDeltas) > 0 {// 如果newDeltas切片中存在Delta
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)// 将key放到queue中
}
f.items[id] = newDeltas// 把新的Deltes切片,放到items哈希表
f.cond.Broadcast()
}
return nil
}
Delta去重
Delta进行Add()操作时,会对加入的delta进行去重。去重逻辑目前只针对两个delete类型的delta有效:当delta数组中倒数第一个和第二个delta都是delete类型时,将会去掉其中一个
。
// re-listing and watching can deliver the same update multiple times in any
// order. This will combine the most recent two deltas if they are the same.
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
// If a & b represent the same event, returns the delta that ought to be kept.
// Otherwise, returns nil.
// TODO: is there anything other than deletions that need deduping?
func isDup(a, b *Delta) *Delta {
if out := isDeletionDup(a, b); out != nil {
return out
}
// TODO: Detect other duplicate situations? Are there any?
return nil
}
// keep the one with the most information if both are deletions.
func isDeletionDup(a, b *Delta) *Delta {
if b.Type != Deleted || a.Type != Deleted {// 仅处理a,b都是"Deleted"类型的事件;
return nil
}
// Do more sophisticated checks, or is this sufficient?
if _, ok := b.Object.(DeletedFinalStateUnknown); ok { // 如果a,b都是"Deleted",就只返回一个Delta
return a
}
return b
}
Deltas的pop出队
deltaFIFO出队的操作和普通的队列出队类似,从队头取出一个资源对象key,并删除items中key对应的deltas数组。
pop出队时,会调用传参PopProcessFunc对出队元素进行处理。
// Pop blocks until an item is added to the queue, and then returns it. If
// multiple items are ready, they are returned in the order in which they were
// added/updated. The item is removed from the queue (and the store) before it
// is returned, so if you don't successfully process it, you need to add it back
// with AddIfNotPresent().
// process function is called under lock, so it is safe update data structures
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
// may return an instance of ErrRequeue with a nested error to indicate the current
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
//
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 如果队列为空,就f.cond.Wait阻塞等待
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
// 从f.queue中去重第一个元素
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// 从items哈希表中根据id,取出Deltas
item, ok := f.items[id]
// 如果itmes哈希表中差不到id对应的Deltas,就结束进入下次循环
if !ok {
// Item may have been deleted subsequently.
continue
}
// 从items哈希表中删除id对应的Deltas
delete(f.items, id)
// process()函数来处理从items哈希表中取出的Deltas
err := process(item)
// 如果出现错误,就把id加回queue,同时把Deltas加回items
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
接着我们看看process()函数具体是什么。
如果对informer启动比较熟悉的话,可以知道在创建informer时,newInformer()函数需要指定ProcessFunc。这个处理函数包括数据同步到存储,以及调用注册的用户函数两个操作。
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
// 指定处理从deltaFIFO队列pop处理的数据的处理函数ProcessFunc
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
// 同步存储数据,clientState是一个store
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
// 回调用户定义的hander函数
h.OnUpdate(old, d.Object)
} else {
// 同步存储数据
if err := clientState.Add(d.Object); err != nil {
return err
}
// 回调用户定义的hander函数
h.OnAdd(d.Object)
}
case Deleted:
// 同步存储数据
if err := clientState.Delete(d.Object); err != nil {
return err
}
// 回调用户定义的hander函数
h.OnDelete(d.Object)
}
}
return nil
},
}
return New(cfg)
}
进一步探究一下,informer启动run()后,会调用controller.Run(),最后c.processLoop会循环处理pop出队处理,流程大致如下:
informer.run(stopCh) —> s.controller.Run(stopCh)—>c.processLoop—>c.config.Queue.Pop(PopProcessFunc(c.config.Process))
源码位于:vender/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 构建一个Reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run)
// 调用c.processLoop函数
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
func (c *controller) processLoop() {
for {
// 循环的Pop出队,把出队的事件交给PopProcessFunc函数处理
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
DeltaFIFO出队入队的流程图
原文地址:https://blog.csdn.net/xsw164711368/article/details/140859389
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!