okkey 我们知道,再上一个 Reflector 从 API Server 监听(watch)特定类型的资源,拿到变更通知后,将其放入到 DeltaFIFO 队列中。
DeltaFIFO 是 Kubernetes 为我们提供了一个存储。
不仅仅是 DeltaFIFO , Kubernetes 还为我们提供了很多存储
cache 主要实现了 Store,利用了 threadSafeMap
type Store interface {
// Add adds the given object to the accumulator associated with the given object's key
Add(obj interface{}) error
// Update updates the given object in the accumulator associated with the given object's key
Update(obj interface{}) error
// Delete deletes the given object from the accumulator associated with the given object's key
Delete(obj interface{}) error
// List returns a list of all the currently non-empty accumulators
List() []interface{}
// ListKeys returns a list of all the keys currently associated with non-empty accumulators
ListKeys() []string
// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error)
// GetByKey returns the accumulator associated with the given key
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior (e.g., DeltaFIFO).
Resync() error
📜 对上面的解释:
Add(obj interface{}) error
: 将给定对象添加到与给定对象的键相关联的累加器中Update(obj interface{}) error
: 更新与给定对象的键相关联的累加器中的给定对象Delete(obj interface{}) error
: 从与给定对象的键相关联的累加器中删除给定对象List() []interface{}
: 返回所有当前非空累加器的列表ListKeys() []string
: 返回当前与非空累加器关联的所有键的列表Get(obj interface{}) (item interface{}, exists bool, err error)
: 返回与给定对象的键相关联的累加器中的累加器GetByKey(key string) (item interface{}, exists bool, err error)
: 返回与给定键相关联的累加器Replace([]interface{}, string) error
: 删除存储中的内容,使用给定的列表替换。Store 获取该列表的所有权,调用此函数后不应再引用该列表。Resync() error
: 在此处出现的术语中毫无意义,但在某些具有非平凡附加行为(例如 DeltaFIFO)的实现中具有意义。
Add(obj interface{}) error
- 功能:将给定对象添加到与给定对象的键相关联的累加器中
- 参数:
- obj:要添加的对象
- 返回值:无
Update(obj interface{}) error
- 功能:更新与给定对象的键相关联的累加器中的给定对象
- 参数:
- obj:要更新的对象
- 返回值:无
Delete(obj interface{}) error
- 功能:从与给定对象的键相关联的累加器中删除给定对象
- 参数:
- obj:要删除的对象
- 返回值:无
List() []interface{}
- 功能:返回所有当前非空累加器的列表
- 参数:无
- 返回值:所有当前非空累加器的列表
ListKeys() []string
- 功能:返回当前与非空累加器关联的所有键的列表
- 参数:无
- 返回值:当前与非空累加器关联的所有键的列表
Get(obj interface{}) (item interface{}, exists bool, err error)
- 功能:返回与给定对象的键相关联的累加器中的累加器
- 参数:
- obj:要获取的对象
- 返回值:
- item:与给定对象的键相关联的累加器中的累加器
- exists:是否存在该累加器
- err:错误信息(如果有)
GetByKey(key string) (item interface{}, exists bool, err error)
- 功能:返回与给定键相关联的累加器
- 参数:
- key:要获取的键
- 返回值:
- item:与给定键相关联的累加器
- exists:是否存在该累加器
- err:错误信息(如果有)
Replace([]interface{}, string) error
- 功能:删除存储中的内容,使用给定的列表替换。Store 获取该列表的所有权,调用此函数后不应再引用该列表。
- 参数:
- []interface{}:要替换的列表
- string:用于记录日志的字符串
- 返回值:错误信息(如果有)
Resync() error
- 功能:在此处出现的术语中毫无意义,但在某些具有非平凡附加行为(例如 DeltaFIFO)的实现中具有意义。
- 参数:无
- 返回值:错误信息(如果有)
实现了 Store
,利用了 cache 存放数据,数据变更的时候通过 PushFunc 发送当前完整的状态。
type UndeltaStore struct {
PushFunc func([]interface{})
可以看到 UndeltaStore
比如说 Add:
func (u *UndeltaStore) Add(obj interface{}) error {
if err := u.Store.Add(obj); err != nil {
return err
return nil
Heap 通过实现 Store,利用 data 数据结构存放数据,实现堆数据结构,用于优先级队列。
// heapData is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type heapData struct {
// items is a map from key of the objects to the objects and their index.
// We depend on the property that items in the map are in the queue and vice versa.
items map[string]*heapItem
// queue implements a heap data structure and keeps the order of elements
// according to the heap invariant. The queue keeps the keys of objects stored
// in "items".
queue []string
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc
// lessFunc is used to compare two objects in the heap.
lessFunc LessFunc
FIFO 实现了 Queue (包含 Store),利用自己内部的 Items 数据结构存放数据。
// Queue extends Store with a collection of Store keys to "process".
// Every Add, Update, or Delete may put the object's key in that collection.
// A Queue has a way to derive the corresponding key given an accumulator.
// A Queue can be accessed concurrently from multiple goroutines.
// A Queue can be "closed", after which Pop operations return an error.
type Queue interface {
// Pop blocks until there is at least one key to process or the
// Queue is closed. In the latter case Pop returns with an error.
// In the former case Pop atomically picks one key to process,
// removes that (key, accumulator) association from the Store, and
// processes the accumulator. Pop returns the accumulator that
// was processed and the result of processing. The PopProcessFunc
// may return an ErrRequeue{inner} and in this case Pop will (a)
// return that (key, accumulator) association to the Queue as part
// of the atomic processing and (b) return the inner error from
// Pop.
Pop(PopProcessFunc) (interface{}, error)
// AddIfNotPresent puts the given accumulator into the Queue (in
// association with the accumulator's key) if and only if that key
// is not already associated with a non-empty accumulator.
AddIfNotPresent(interface{}) error
// HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, AddIfNotPresent,
// Update, or Delete; otherwise the first batch is empty.
HasSynced() bool
// Close the queue
接下来就是重点 DeltaFIFO 的部分了
DeltaFIFO 的主要应用场景如下:
- 当你希望处理每一个对象的变化最多一次
- 当你处理一个对象的时候,希望知道这个对象与你上一次处理时,发生了哪些变化。
- 当你希望一个对象删除的时候,你仍然能够处理它
- 能够周期性的处理所有的对象
,它不仅仅存储了数据保证了先进先出,而且存储有K8s 资源对象的类型。其是连接Reflector
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// `items` maps a key to a Deltas.
// Each such Deltas has at least one Delta.
items map[string]Deltas
// `queue` maintains FIFO order of keys for consumption in Pop().
// There are no duplicates in `queue`.
// A key is in `queue` if and only if it is in `items`.
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update/AddIfNotPresent was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// knownObjects list keys that are "known" --- affecting Delete(),
// Replace(), and Resync()
knownObjects KeyListerGetter
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRUD operations.
closed bool
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool
// Called with every object if non-nil.
transformer TransformFunc
📜 对上面的解释:
是计算的 key,value 是一个 Deltas 的结构体,queue
不指定 namespace 时候用<name>
: 专门用来存放数据的地方,其实就是Indexer
- Reflector 的 List
- Reflector 的 Watch
- Reflector 的 Resync
- 事件派发到 work queue
- 刷新本地缓存
Indexer 主要提供了一个对象根据一定检索能力,典型的实现就是通过 namespace 来构建 Key,通过 Thread Safe Store 来存储对象。
✴️版权声明 © :本书所有内容遵循CC-BY-SA 3.0协议(署名-相同方式共享)©