k8s client-go源码解析之informer 一
创始人
2024-05-30 11:13:31
0

Informer(一)

注意:本文内容为学习笔记,内容为个人见解,不保证准确性,但欢迎大家讨论何指教。

本篇为先导篇, 介绍informer的入口工厂函数。
在这里插入图片描述

informer目录结构 (仅展示部分目录,省略的目录相似)

client-go|master⚡ ⇒ tree informers -L 2
informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2
├── core
│   ├── interface.go
│   └── v1
├── doc.go
├── factory.go
├── flowcontrol
├── generic.go
├── node
│   ├── interface.go
│   ├── v1
│   ├── v1alpha1
│   └── v1beta1
└── storage├── interface.go├── v1├── v1alpha1└── v1beta165 directories, 23 files

可以看到,factory.go为工厂函数的文件,作为调用的入口。每个资源类型为单独的文件夹, 按照版本号划分子文件夹。

factory

type sharedInformerFactory struct {client           kubernetes.Interfacenamespace        stringtweakListOptions internalinterfaces.TweakListOptionsFunclock             sync.MutexdefaultResync    time.DurationcustomResync     map[reflect.Type]time.Duration// 存放对应资源类型的informerinformers map[reflect.Type]cache.SharedIndexInformer// informer启动状态startedInformers map[reflect.Type]bool// 用于等待多个资源类型的informer启动wg sync.WaitGroupshuttingDown bool
}

对应资源的监听实现,通过InformerFor方法传入并记录。
sharedInformer 将多种资源放在map中保存。
重复监听相同资源的动作是安全的。

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {f.lock.Lock()defer f.lock.Unlock()informerType := reflect.TypeOf(obj)informer, exists := f.informers[informerType]// 如果资源已经监听过了,则什么都不做if exists {return informer}resyncPeriod, exists := f.customResync[informerType]if !exists {resyncPeriod = f.defaultResync}informer = newFunc(f.client, resyncPeriod)f.informers[informerType] = informerreturn informer
}

多次调用Start()是安全的

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()if f.shuttingDown {return}for informerType, informer := range f.informers {// 只会run新的资源类型if !f.startedInformers[informerType] {f.wg.Add(1)informer := informergo func() {defer f.wg.Done()informer.Run(stopCh)}()f.startedInformers[informerType] = true}}
}

调用对应资源方法,对应的实现在上面的资源目录

func (f *sharedInformerFactory) Internal() apiserverinternal.Interface {return apiserverinternal.New(f, f.namespace, f.tweakListOptions)
}func (f *sharedInformerFactory) Apps() apps.Interface {return apps.New(f, f.namespace, f.tweakListOptions)
}

resource

以apps目录举例

informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2

apps在当前的存在3个版本,故对应三个文件夹。
interface.go为当前资源入口。

type group struct {// 传入的工厂对象factory          internalinterfaces.SharedInformerFactorynamespace        stringtweakListOptions internalinterfaces.TweakListOptionsFunc
}// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {// 当调用factory.Apps()时,工厂对象是传入的,不会创建新的工厂,也就不会创建新的liste/watch连接。// SharedInformer中的shared就是指这个。return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}// V1 returns a new v1.Interface.
func (g *group) V1() v1.Interface {return v1.New(g.factory, g.namespace, g.tweakListOptions)
}// V1beta1 returns a new v1beta1.Interface.
func (g *group) V1beta1() v1beta1.Interface {return v1beta1.New(g.factory, g.namespace, g.tweakListOptions)
}// V1beta2 returns a new v1beta2.Interface.
func (g *group) V1beta2() v1beta2.Interface {return v1beta2.New(g.factory, g.namespace, g.tweakListOptions)
}

当我们调用factory.Apps().V1().Deployments(),实现文件为:

informers
├── apps
│   ├── interface.go
│   ├── v1├── deployment.go

调用factory.Apps().V1().Deployments().Informer(), 会触发工厂函数的InformerFor()方法监听资源。
重复:InformerFor()方法,重复监听相同资源的动作是安全的。

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

任意资源有自己的监听函数的实现, Deployments的为:

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(// 定义 list/watch规则。// 实际上所有资源类型的informer, 最终都会走到cache.SharedIndexInforme。// 根据不同的ListWatch对象决定监听不同的资源。 这是informer实现的基础。// 这里 ListWatch 监听的是 AppsV1().Deployments&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)},},&appsv1.Deployment{},resyncPeriod,indexers,)
}
// 上面Informer()函数中传入的方法
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

总结

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8VqWCMFv-1678153878970)(/Users/chenyang/Projects/github.com/MyNotes/images/informer-1.png)]
informers/factory.go为工厂方法实现文件。
“工厂”产出不同资源类型的informer。
资源通过InformerFor记录到“工厂”中,通过Start方法启动监听。这两个方法重复调用均为安全操作。
最终的数据处理由cache.SharedIndexInforme实现。

Informer(一)

注意:本文内容为学习笔记,内容为个人见解,不保证准确性,但欢迎大家讨论何指教。

本篇为先导篇, 介绍informer的入口工厂函数。

informer目录结构 (仅展示部分目录,省略的目录相似)

client-go|master⚡ ⇒ tree informers -L 2
informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2
├── core
│   ├── interface.go
│   └── v1
├── doc.go
├── factory.go
├── flowcontrol
├── generic.go
├── node
│   ├── interface.go
│   ├── v1
│   ├── v1alpha1
│   └── v1beta1
└── storage├── interface.go├── v1├── v1alpha1└── v1beta165 directories, 23 files

可以看到,factory.go为工厂函数的文件,作为调用的入口。每个资源类型为单独的文件夹, 按照版本号划分子文件夹。

factory

type sharedInformerFactory struct {client           kubernetes.Interfacenamespace        stringtweakListOptions internalinterfaces.TweakListOptionsFunclock             sync.MutexdefaultResync    time.DurationcustomResync     map[reflect.Type]time.Duration// 存放对应资源类型的informerinformers map[reflect.Type]cache.SharedIndexInformer// informer启动状态startedInformers map[reflect.Type]bool// 用于等待多个资源类型的informer启动wg sync.WaitGroupshuttingDown bool
}

对应资源的监听实现,通过InformerFor方法传入并记录。
sharedInformer 将多种资源放在map中保存。
重复监听相同资源的动作是安全的。

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {f.lock.Lock()defer f.lock.Unlock()informerType := reflect.TypeOf(obj)informer, exists := f.informers[informerType]// 如果资源已经监听过了,则什么都不做if exists {return informer}resyncPeriod, exists := f.customResync[informerType]if !exists {resyncPeriod = f.defaultResync}informer = newFunc(f.client, resyncPeriod)f.informers[informerType] = informerreturn informer
}

多次调用Start()是安全的

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()if f.shuttingDown {return}for informerType, informer := range f.informers {// 只会run新的资源类型if !f.startedInformers[informerType] {f.wg.Add(1)informer := informergo func() {defer f.wg.Done()informer.Run(stopCh)}()f.startedInformers[informerType] = true}}
}

调用对应资源方法,对应的实现在上面的资源目录

func (f *sharedInformerFactory) Internal() apiserverinternal.Interface {return apiserverinternal.New(f, f.namespace, f.tweakListOptions)
}func (f *sharedInformerFactory) Apps() apps.Interface {return apps.New(f, f.namespace, f.tweakListOptions)
}

resource

以apps目录举例

informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2

apps在当前的存在3个版本,故对应三个文件夹。
interface.go为当前资源入口。

type group struct {// 传入的工厂对象factory          internalinterfaces.SharedInformerFactorynamespace        stringtweakListOptions internalinterfaces.TweakListOptionsFunc
}// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {// 当调用factory.Apps()时,工厂对象是传入的,不会创建新的工厂,也就不会创建新的liste/watch连接。// SharedInformer中的shared就是指这个。return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}// V1 returns a new v1.Interface.
func (g *group) V1() v1.Interface {return v1.New(g.factory, g.namespace, g.tweakListOptions)
}// V1beta1 returns a new v1beta1.Interface.
func (g *group) V1beta1() v1beta1.Interface {return v1beta1.New(g.factory, g.namespace, g.tweakListOptions)
}// V1beta2 returns a new v1beta2.Interface.
func (g *group) V1beta2() v1beta2.Interface {return v1beta2.New(g.factory, g.namespace, g.tweakListOptions)
}

当我们调用factory.Apps().V1().Deployments(),实现文件为:

informers
├── apps
│   ├── interface.go
│   ├── v1├── deployment.go

调用factory.Apps().V1().Deployments().Informer(), 会触发工厂函数的InformerFor()方法监听资源。
重复:InformerFor()方法,重复监听相同资源的动作是安全的。

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

任意资源有自己的监听函数的实现, Deployments的为:

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(// 定义 list/watch规则。// 实际上所有资源类型的informer, 最终都会走到cache.SharedIndexInforme。// 根据不同的ListWatch对象决定监听不同的资源。 这是informer实现的基础。// 这里 ListWatch 监听的是 AppsV1().Deployments&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)},},&appsv1.Deployment{},resyncPeriod,indexers,)
}
// 上面Informer()函数中传入的方法
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

总结

informers/factory.go为工厂方法实现文件。
“工厂”产出不同资源类型的informer。
资源通过InformerFor记录到“工厂”中,通过Start方法启动监听。这两个方法重复调用均为安全操作。
最终的数据处理由cache.SharedIndexInforme实现。

相关内容

热门资讯

电视安卓系统哪个品牌好,哪家品... 你有没有想过,家里的电视是不是该升级换代了呢?现在市面上电视品牌琳琅满目,各种操作系统也是让人眼花缭...
安卓会员管理系统怎么用,提升服... 你有没有想过,手机里那些你爱不释手的APP,背后其实有个强大的会员管理系统在默默支持呢?没错,就是那...
安卓系统软件使用技巧,解锁软件... 你有没有发现,用安卓手机的时候,总有一些小技巧能让你玩得更溜?别小看了这些小细节,它们可是能让你的手...
安卓系统提示音替换 你知道吗?手机里那个时不时响起的提示音,有时候真的能让人心情大好,有时候又让人抓狂不已。今天,就让我...
安卓开机不了系统更新 手机突然开不了机,系统更新还卡在那里,这可真是让人头疼的问题啊!你是不是也遇到了这种情况?别急,今天...
安卓系统中微信视频,安卓系统下... 你有没有发现,现在用手机聊天,视频通话简直成了标配!尤其是咱们安卓系统的小伙伴们,微信视频功能更是用...
安卓系统是服务器,服务器端的智... 你知道吗?在科技的世界里,安卓系统可是个超级明星呢!它不仅仅是个手机操作系统,竟然还能成为服务器的得...
pc电脑安卓系统下载软件,轻松... 你有没有想过,你的PC电脑上安装了安卓系统,是不是瞬间觉得世界都大不一样了呢?没错,就是那种“一机在...
电影院购票系统安卓,便捷观影新... 你有没有想过,在繁忙的生活中,一部好电影就像是一剂强心针,能瞬间让你放松心情?而我今天要和你分享的,...
安卓系统可以写程序? 你有没有想过,安卓系统竟然也能写程序呢?没错,你没听错!这个我们日常使用的智能手机操作系统,竟然有着...
安卓系统架构书籍推荐,权威书籍... 你有没有想过,想要深入了解安卓系统架构,却不知道从何下手?别急,今天我就要给你推荐几本超级实用的书籍...
安卓系统看到的炸弹,技术解析与... 安卓系统看到的炸弹——揭秘手机中的隐形威胁在数字化时代,智能手机已经成为我们生活中不可或缺的一部分。...
鸿蒙系统有安卓文件,畅享多平台... 你知道吗?最近在科技圈里,有个大新闻可是闹得沸沸扬扬的,那就是鸿蒙系统竟然有了安卓文件!是不是觉得有...
宝马安卓车机系统切换,驾驭未来... 你有没有发现,现在的汽车越来越智能了?尤其是那些豪华品牌,比如宝马,它们的内饰里那个大屏幕,简直就像...
p30退回安卓系统 你有没有听说最近P30的用户们都在忙活一件大事?没错,就是他们的手机要退回安卓系统啦!这可不是一个简...
oppoa57安卓原生系统,原... 你有没有发现,最近OPPO A57这款手机在安卓原生系统上的表现真是让人眼前一亮呢?今天,就让我带你...
安卓系统输入法联想,安卓系统输... 你有没有发现,手机上的输入法真的是个神奇的小助手呢?尤其是安卓系统的输入法,简直就是智能生活的点睛之...
怎么进入安卓刷机系统,安卓刷机... 亲爱的手机控们,你是否曾对安卓手机的刷机系统充满好奇?想要解锁手机潜能,体验全新的系统魅力?别急,今...
安卓系统程序有病毒 你知道吗?在这个数字化时代,手机已经成了我们生活中不可或缺的好伙伴。但是,你知道吗?即使是安卓系统,...
奥迪中控安卓系统下载,畅享智能... 你有没有发现,现在汽车的中控系统越来越智能了?尤其是奥迪这种豪华品牌,他们的中控系统简直就是科技与艺...