k8s client-go源码解析之informer 一
创始人
2024-05-30 11:13:31
0

Informer(一)

注意:本文内容为学习笔记,内容为个人见解,不保证准确性,但欢迎大家讨论何指教。

本篇为先导篇, 介绍informer的入口工厂函数。
在这里插入图片描述

informer目录结构 (仅展示部分目录,省略的目录相似)

client-go|master⚡ ⇒ tree informers -L 2
informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2
├── core
│   ├── interface.go
│   └── v1
├── doc.go
├── factory.go
├── flowcontrol
├── generic.go
├── node
│   ├── interface.go
│   ├── v1
│   ├── v1alpha1
│   └── v1beta1
└── storage├── interface.go├── v1├── v1alpha1└── v1beta165 directories, 23 files

可以看到,factory.go为工厂函数的文件,作为调用的入口。每个资源类型为单独的文件夹, 按照版本号划分子文件夹。

factory

type sharedInformerFactory struct {client           kubernetes.Interfacenamespace        stringtweakListOptions internalinterfaces.TweakListOptionsFunclock             sync.MutexdefaultResync    time.DurationcustomResync     map[reflect.Type]time.Duration// 存放对应资源类型的informerinformers map[reflect.Type]cache.SharedIndexInformer// informer启动状态startedInformers map[reflect.Type]bool// 用于等待多个资源类型的informer启动wg sync.WaitGroupshuttingDown bool
}

对应资源的监听实现,通过InformerFor方法传入并记录。
sharedInformer 将多种资源放在map中保存。
重复监听相同资源的动作是安全的。

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {f.lock.Lock()defer f.lock.Unlock()informerType := reflect.TypeOf(obj)informer, exists := f.informers[informerType]// 如果资源已经监听过了,则什么都不做if exists {return informer}resyncPeriod, exists := f.customResync[informerType]if !exists {resyncPeriod = f.defaultResync}informer = newFunc(f.client, resyncPeriod)f.informers[informerType] = informerreturn informer
}

多次调用Start()是安全的

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()if f.shuttingDown {return}for informerType, informer := range f.informers {// 只会run新的资源类型if !f.startedInformers[informerType] {f.wg.Add(1)informer := informergo func() {defer f.wg.Done()informer.Run(stopCh)}()f.startedInformers[informerType] = true}}
}

调用对应资源方法,对应的实现在上面的资源目录

func (f *sharedInformerFactory) Internal() apiserverinternal.Interface {return apiserverinternal.New(f, f.namespace, f.tweakListOptions)
}func (f *sharedInformerFactory) Apps() apps.Interface {return apps.New(f, f.namespace, f.tweakListOptions)
}

resource

以apps目录举例

informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2

apps在当前的存在3个版本,故对应三个文件夹。
interface.go为当前资源入口。

type group struct {// 传入的工厂对象factory          internalinterfaces.SharedInformerFactorynamespace        stringtweakListOptions internalinterfaces.TweakListOptionsFunc
}// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {// 当调用factory.Apps()时,工厂对象是传入的,不会创建新的工厂,也就不会创建新的liste/watch连接。// SharedInformer中的shared就是指这个。return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}// V1 returns a new v1.Interface.
func (g *group) V1() v1.Interface {return v1.New(g.factory, g.namespace, g.tweakListOptions)
}// V1beta1 returns a new v1beta1.Interface.
func (g *group) V1beta1() v1beta1.Interface {return v1beta1.New(g.factory, g.namespace, g.tweakListOptions)
}// V1beta2 returns a new v1beta2.Interface.
func (g *group) V1beta2() v1beta2.Interface {return v1beta2.New(g.factory, g.namespace, g.tweakListOptions)
}

当我们调用factory.Apps().V1().Deployments(),实现文件为:

informers
├── apps
│   ├── interface.go
│   ├── v1├── deployment.go

调用factory.Apps().V1().Deployments().Informer(), 会触发工厂函数的InformerFor()方法监听资源。
重复:InformerFor()方法,重复监听相同资源的动作是安全的。

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

任意资源有自己的监听函数的实现, Deployments的为:

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(// 定义 list/watch规则。// 实际上所有资源类型的informer, 最终都会走到cache.SharedIndexInforme。// 根据不同的ListWatch对象决定监听不同的资源。 这是informer实现的基础。// 这里 ListWatch 监听的是 AppsV1().Deployments&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)},},&appsv1.Deployment{},resyncPeriod,indexers,)
}
// 上面Informer()函数中传入的方法
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

总结

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8VqWCMFv-1678153878970)(/Users/chenyang/Projects/github.com/MyNotes/images/informer-1.png)]
informers/factory.go为工厂方法实现文件。
“工厂”产出不同资源类型的informer。
资源通过InformerFor记录到“工厂”中,通过Start方法启动监听。这两个方法重复调用均为安全操作。
最终的数据处理由cache.SharedIndexInforme实现。

Informer(一)

注意:本文内容为学习笔记,内容为个人见解,不保证准确性,但欢迎大家讨论何指教。

本篇为先导篇, 介绍informer的入口工厂函数。

informer目录结构 (仅展示部分目录,省略的目录相似)

client-go|master⚡ ⇒ tree informers -L 2
informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2
├── core
│   ├── interface.go
│   └── v1
├── doc.go
├── factory.go
├── flowcontrol
├── generic.go
├── node
│   ├── interface.go
│   ├── v1
│   ├── v1alpha1
│   └── v1beta1
└── storage├── interface.go├── v1├── v1alpha1└── v1beta165 directories, 23 files

可以看到,factory.go为工厂函数的文件,作为调用的入口。每个资源类型为单独的文件夹, 按照版本号划分子文件夹。

factory

type sharedInformerFactory struct {client           kubernetes.Interfacenamespace        stringtweakListOptions internalinterfaces.TweakListOptionsFunclock             sync.MutexdefaultResync    time.DurationcustomResync     map[reflect.Type]time.Duration// 存放对应资源类型的informerinformers map[reflect.Type]cache.SharedIndexInformer// informer启动状态startedInformers map[reflect.Type]bool// 用于等待多个资源类型的informer启动wg sync.WaitGroupshuttingDown bool
}

对应资源的监听实现,通过InformerFor方法传入并记录。
sharedInformer 将多种资源放在map中保存。
重复监听相同资源的动作是安全的。

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {f.lock.Lock()defer f.lock.Unlock()informerType := reflect.TypeOf(obj)informer, exists := f.informers[informerType]// 如果资源已经监听过了,则什么都不做if exists {return informer}resyncPeriod, exists := f.customResync[informerType]if !exists {resyncPeriod = f.defaultResync}informer = newFunc(f.client, resyncPeriod)f.informers[informerType] = informerreturn informer
}

多次调用Start()是安全的

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()if f.shuttingDown {return}for informerType, informer := range f.informers {// 只会run新的资源类型if !f.startedInformers[informerType] {f.wg.Add(1)informer := informergo func() {defer f.wg.Done()informer.Run(stopCh)}()f.startedInformers[informerType] = true}}
}

调用对应资源方法,对应的实现在上面的资源目录

func (f *sharedInformerFactory) Internal() apiserverinternal.Interface {return apiserverinternal.New(f, f.namespace, f.tweakListOptions)
}func (f *sharedInformerFactory) Apps() apps.Interface {return apps.New(f, f.namespace, f.tweakListOptions)
}

resource

以apps目录举例

informers
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2

apps在当前的存在3个版本,故对应三个文件夹。
interface.go为当前资源入口。

type group struct {// 传入的工厂对象factory          internalinterfaces.SharedInformerFactorynamespace        stringtweakListOptions internalinterfaces.TweakListOptionsFunc
}// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {// 当调用factory.Apps()时,工厂对象是传入的,不会创建新的工厂,也就不会创建新的liste/watch连接。// SharedInformer中的shared就是指这个。return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}// V1 returns a new v1.Interface.
func (g *group) V1() v1.Interface {return v1.New(g.factory, g.namespace, g.tweakListOptions)
}// V1beta1 returns a new v1beta1.Interface.
func (g *group) V1beta1() v1beta1.Interface {return v1beta1.New(g.factory, g.namespace, g.tweakListOptions)
}// V1beta2 returns a new v1beta2.Interface.
func (g *group) V1beta2() v1beta2.Interface {return v1beta2.New(g.factory, g.namespace, g.tweakListOptions)
}

当我们调用factory.Apps().V1().Deployments(),实现文件为:

informers
├── apps
│   ├── interface.go
│   ├── v1├── deployment.go

调用factory.Apps().V1().Deployments().Informer(), 会触发工厂函数的InformerFor()方法监听资源。
重复:InformerFor()方法,重复监听相同资源的动作是安全的。

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

任意资源有自己的监听函数的实现, Deployments的为:

func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(// 定义 list/watch规则。// 实际上所有资源类型的informer, 最终都会走到cache.SharedIndexInforme。// 根据不同的ListWatch对象决定监听不同的资源。 这是informer实现的基础。// 这里 ListWatch 监听的是 AppsV1().Deployments&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options)},},&appsv1.Deployment{},resyncPeriod,indexers,)
}
// 上面Informer()函数中传入的方法
func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

总结

informers/factory.go为工厂方法实现文件。
“工厂”产出不同资源类型的informer。
资源通过InformerFor记录到“工厂”中,通过Start方法启动监听。这两个方法重复调用均为安全操作。
最终的数据处理由cache.SharedIndexInforme实现。

相关内容

热门资讯

张天灵安卓系统,引领智能生活新... 你知道吗?最近在手机圈里,有个名字可是火得一塌糊涂,那就是张天灵安卓系统。没错,就是那个让无数手机用...
安卓系统使用官方文档,系统架构... 你有没有想过,你的安卓手机里那些神奇的软件和功能,其实都是基于一个强大的系统——安卓系统?没错,就是...
安卓系统哪个系列最好,探索最佳... 你有没有想过,手机里的安卓系统就像是一群各具特色的英雄,每个系列都有它的独门绝技。那么,问题来了,安...
安卓修改系统时间设置,安卓系统... 你有没有发现,有时候手机上的时间总是和你心中的时间不太一样?是不是有时候你明明觉得才刚过中午,一看手...
安卓系统制裁华为,自主创新之路 你知道吗?最近安卓系统对华为下手了,这可真是让人大跌眼镜啊!华为作为我国科技界的佼佼者,一直以来都备...
安卓系统隐藏扣费,揭秘恶意应用... 你知道吗?在安卓系统的世界里,有时候会有一些小秘密,就像隐藏的宝藏一样,让人意想不到。今天,我就要来...
低安卓系统游戏推荐,盘点那些让... 手机里的游戏是不是已经玩腻了?别急,今天就来给你推荐一些适合低安卓系统运行的游戏,让你的手机焕发第二...
真我是安卓系统嘛,揭秘安卓系统... 亲爱的读者,你是否曾好奇过,自己手中的安卓手机,它的“灵魂”究竟是不是安卓系统呢?这个问题听起来可能...
安卓儿童手表换系统,轻松换新体... 你家的安卓儿童手表是不是已经陪伴了孩子好长一段时间了呢?是不是觉得它有点儿“老态龙钟”,想要给它来个...
王者安卓系统如何退钱,快速返还 你是不是在王者荣耀里花了点小钱,现在想退回来呢?别急,今天就来手把手教你如何用王者安卓系统退钱,让你...
安卓系统instagram哪里... 你有没有发现,最近你的手机里少了点什么?没错,就是那个让你每天刷到停不下来的社交神器——Instag...
安卓系统怎样用苹果系统,系统切... 你是不是也和我一样,对安卓系统和苹果系统都情有独钟呢?有时候,手头上的安卓设备用得正得心应手,突然又...
平板安卓系统价格多少,不同档次... 你有没有想过,拥有一台平板电脑,是不是就能随时随地享受大屏幕的观影体验,或者轻松处理工作上的事情呢?...
日历app推荐安卓系统,生活更... 你有没有发现,时间就像那溜走的沙子,不经意间就悄悄溜走了。想要抓住时间的尾巴,一款好用的日历app可...
山水投影删除安卓系统,基于山水... 你有没有想过,家里的电视屏幕上突然出现一幅幅流动的山水画,美得让你仿佛置身于仙境?这可不是梦,而是现...
安卓怎么换系统版本,轻松切换至... 亲爱的安卓用户们,你是否对手机系统版本升级充满了好奇和期待?想要让你的手机焕然一新,体验更流畅的性能...
安卓系统 定时锁屏,智能守护您... 你有没有发现,手机这玩意儿,简直就是现代生活的得力助手,但有时候,它也像个调皮的小家伙,时不时地给你...
小米手机安卓系统rom,功能与... 你有没有发现,最近小米手机的热度又上来了?没错,就是那个以性价比著称的小米。今天,咱们就来聊聊小米手...
安卓8.0系统自动重启,安卓8... 最近你的安卓手机是不是也遇到了一个让人头疼的问题?没错,就是那个让人抓狂的自动重启!是不是每次正在关...
安卓导航进入系统设置,解锁个性... 亲爱的手机控们,你是否曾在某个午后,手捧着你的安卓手机,突然想探索一下它的深处,看看那些隐藏在系统设...