在上一个对于 client-go 学习中,通过 sample-controller 的项目了解到 Kubernetes 的 部分 client-go 的学习。继续深入学习 client-go
kubectl 并不适合 Kubernetes 的二次开发者来和 k8s 打交道,Go语言提供了一个专门和 Kubernetes API 交互的 库 Client-go。
Kubernetes 也提供了一个编写控制器的小案例,可以作为 client-go 的入门
Client-go是一个用于与 Kubernetes API 交互的Go库。它提供了广泛的功能,用于与Kubernetes API交互,包括强类型 API、资源客户端、Watch API 和动态客户端。使用 client-go,开发人员可以轻松地在 Kubernetes 中创建、读取、更新和删除资源对象。
上面是 client-go 的 GitHub 仓库,不过这个库是 actions 以每天一次的频率从 Kubernetes/Kubernetes 主仓库中自动同步过来的。所以如果我们想贡献的话找好位置去 Kubernetes 贡献(kubernetes/stagin/src/k8s.io)。
├── discovery # DsicoveryClient客户端,用于发现k8s所支持GVR。
├── dynamic # DynamicClient客户端, 用于访问k8s Resources,也可以访问CRD。
├── informers # k8s中各种Resources的Informer机制的实现。
├── kubernetes # 对RestClient进行了封装,定义多种Client的客户端集合,俗称clientset。
├── listers # 提供对Resources的获取功能。对于Get()和List()而言,listers提供给二者的数据都是从缓存中读取的。
├── pkg
├── plugin # 提供第三方插件。
├── scale # 定义用于Deploy, RS, RC等资源进行的扩、缩容的客户端ScaleClient。
├── tools # 实现client查询和缓存机制,以及定义诸如SharedInformer、Reflector、DealtFIFO和Indexer等常用工具。
├── transport
└── util # 提供诸如WorkQueue、Certificate等常用方法。
📜 对上面的解释:
:该目录包含用于发现和获取Kubernetes API资源的代码。这些资源包括Pod、Service、ReplicationController等。discovery
:该目录包含动态客户端库,用于与Kubernetes API交互,而无需生成代码。这对于构建需要与任意Kubernetes资源交互的通用工具和实用程序非常有用。/kubernetes
:这个包中方的是用 client-gen 自动生成的用来访问 Kubernetes API 的 ClientSet,后面会经常看到 ClientSet 这个工具。/informers
:该目录包含用于与Kubernetes API交互的代码。这些API包括Pod、Service、Namespace等。rest
:该目录包含用于客户端库的辅助功能的代码。这些功能包括对Kubernetes API对象的类型转换、对象比较等。util
可以通过 go get 命令获取 client-go,不过我直接克隆最新源代码,然后构建为可执行文件:
git clone https://github.com/kubernetes/client-go.git
❯ find -name main.go
这些文件可以帮助我们快速上手 client-go:
:演示如何使用 Kubernetes 的工作队列(Workqueue)实现资源控制器(Controller)。./examples/in-cluster-client-configuration/main.go
:演示如何在 Kubernetes 集群内部使用client-go
访问 Kubernetes API Server。./examples/out-of-cluster-client-configuration/main.go
:演示如何在 Kubernetes 集群外部使用client-go
访问 Kubernetes API Server。./examples/dynamic-create-update-delete-deployment/main.go
:演示如何使用 Kubernetes 的动态客户端库(Dynamic Client)实现对 Deployment 资源对象的增删改查等操作。./examples/create-update-delete-deployment/main.go
实现对 Deployment 资源对象的增删改查等操作。./examples/leader-election/main.go
:演示如何使用 Kubernetes 的 Leader Election 机制,实现资源控制器的高可用性和故障转移。
在 /root/workspaces/client-go/examples/workqueue
❯ ./main --help
Usage of ./main:
-kubeconfig string
absolute path to the kubeconfig file
-master string
master url
指定 kubeconfig (也可以通过设置环境变量 export KUBECONFIG
❯ ./main -kubeconfig ~/.kube/config
举例:使用 kubectl 命令行工具创建一个名为 myresource 的自定义资源,并将其保存到 YAML 文件中。然后,运行
go run ./examples/workqueue/main.go
命令启动控制器。此时,控制器会开始监听 myresource 资源,并在该资源被创建或更新时,异步地处理一些任务。
继续演示 对 Deployment CURD 操作:
❯ cd dynamic-create-update-delete-deployment/
❯ ls
README.md main.go
❯ go build main.go
❯ ./main
Creating deployment...
Created deployment "demo-deployment".
-> Press Return key to continue.
Updating deployment...
Updated deployment...
-> Press Return key to continue.
Listing deployments in namespace "default":
* demo-deployment (1 replicas)
* my-nginx-app (3 replicas)
* nginx-deployment (3 replicas)
-> Press Return key to continue.
Deleting deployment...
Deleted deployment.
:这个示例文件演示如何使用 Kubernetes 的动态客户端库(Dynamic Client)实现对 Deployment 资源对象的增删改查等操作。使用动态客户端库,开发人员可以更加灵活地操作 Kubernetes 资源对象,而不需要手动编写复杂的代码。例如,在这个示例中,开发人员可以使用 DynamicClient 对象,动态地创建、更新和删除 Deployment 资源对象。
举例:使用 kubectl 命令行工具创建一个名为 my-deployment 的 Deployment 对象,并将其保存到 YAML 文件中。然后,运行
go run ./examples/dynamic-create-update-delete-deployment/main.go
命令,使用 DynamicClient 对象,动态地创建、更新和删除 my-deployment Deployment 对象。
❯ k get deployment
demo-deployment 0/1 0 0 4s
my-nginx-app 0/3 3 0 26h
nginx-deployment 3/3 3 3 165m
WorkQueue 一般使用的说就是延迟队列的实现,在 Resopurce Event Handlers 中会完成将对象的 key 放入 WorkQueue 的过程,然后再自己的逻辑代码中从 WorkQueue 中消费这些 Key。
Client-go 的 utile/Workqueue
分别对应的是 普通队列、延时队列和限速队列。对应的源文件分别是:
- queue.go
- delaying_queue.go
- rate_limiting_queue.go
我们先看看 普通队列,这个对应的文件就是 queue.go:
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShuttingDown() bool
看接口就知道了实现了哪些方法,分别是添加元素,元素个数、获取第一个元素、第二个返回值和 channel 类似,标记了队列是否关闭。
Done 标记一个元素是否已经处理完,ShutDown 关闭队列,ShowDownWithDrain 关闭队列,但是等待队列中的元素处理完。ShuttingDown 标记当前的 channel 是否正在关闭。
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
接口就很简单,定义 DelayingInterface 接口在 delaying_queue.go 源文件,名字和 Queue 所使用的interface对称,叫做 DelayingInterface。
这里有一个 Interface,对此我感觉到有一些疑惑,Interface 是普通队列命名的接口,这个设计模式的接口设计的思路蛮有意思的,先别激动,后面有你激动的,看到 Controller 的接口设计,那简直就是 6 ~
最后我们看一下限速队列,这个有意思很多了,同样今天的项目 sample-controller
也用到了这个队列,我们看一下 controller 部分:
type Controller struct {
workqueue workqueue.RateLimitingInterface
我们看到了 限速队列包含了延迟队列的所有接口实现,也包含了 基本队列的所有接口实现。
- AddRateLimited:在速率限制器认可的情况下将项目添加到工作队列中。
- Forget:表示项目已经完成重试。无论是永久性失败还是成功,我们都会停止速率限制器对其进行跟踪。这仅清除了
方法。 - NumRequeues:返回项目被重新排队的次数。
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
在后面的学习中,我们再深入探索这些 WorkQueue 的实现。
DeltaFIFO 也是一个特别重要的组件,在 k8s.io/client-go/tools/cache
- 存储对象的增量更新,包括添加、删除和更新操作。
- 能够按顺序处理更新,以确保它们被正确地应用。
- 支持阻塞式同步,以便在应用所有更新之前等待。
在 fifo.go 中定义了一个 Queue 的接口,我们看一下接口代码:
type Queue interface {
Pop(PopProcessFunc) (interface{}, error)
AddIfNotPresent(interface{}) error
HasSynced() bool
// Close the queue
嵌入了一个 Store 接口:
Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
都是表面的意思了,CURD 操作,各种 Get、List 操作。
Informar 这个单词出镜率很高,我们在很多文章中都可以看到这个 Informer,包括 Kubernetes 源码讲解的 ETCD 和 API Server 中。
Informer 从 DeltaFIFO 中 Pop 相对应的对象,会通过 Indexer 将对象和索引都丢在本地的 cache 中,再触发相应的事件处理函数(Resource Event Handlers) 的运行。
源码位置: k8s.io/client-go/tools/cache
Informar 是通过一个 Controller 对象来进行定义的,我们先看看对象接口:
在 controller.go
文件中能看到 Controller 的定义:
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
看出,最核心的方法是 Run(stopCh <-chan struct{})
Run 这个函数我们点击进去会发现,主要做了两个事情:
- 构造 Reflector 利用 ListerWatcher 的能力将对象事件更新到 DeltaFIFO
- 从 DeltaFIFO 中 Pop 对象后调用 ProcessFunc 来处理。
// New makes a new Controller from the given Config.
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
return ctlr
传入了一个 Config 过来,然后初始化,核心逻辑那就在 Config 中了:
// Config contains all the settings for one of these low-level controllers.
type Config struct {
// Something that can list and watch your objects.
// Something that can process a popped Deltas.
Process ProcessFunc
// ObjectType is an example object of the type this controller is
// expected to handle.
ObjectType runtime.Object
// ObjectDescription is the description to use when logging type-specific information about this controller.
ObjectDescription string
// FullResyncPeriod is the period at which ShouldResync is considered.
ShouldResync ShouldResyncFunc
RetryOnError bool
// Called whenever the ListAndWatch drops the connection with an error.
WatchErrorHandler WatchErrorHandler
// WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64
SharedInformer 和 Informer 之间的名字就差一个 Shared,在 Operator 开发中,如果不适用 controller-runtime 库(kubebuilder controller-runtime 子项目的Repo),也就是不用 Kubebuilder 等工具生成脚手架,那么应该用到的是 SharedInformerFactory,比如今天主要做的 sample-controller 的 main()
函数,在下面的 sample-controller 源码介绍中会讲解~
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
controller := NewController(ctx, kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(), exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
控制器的初始化后面会讲 controller.go
的时候重点讲解,先看看初始化过程和 SharedInformer 的关系。
controller 依赖于 kubeInformerFactory.Apps().V1().Deployments()
和 exampleInformerFactory.Samplecontroller().V1alpha1().Foos()
后者是 sample-controller
自己 pkg 包提供的,前者是 client-go 提供的,那么看看前者:
这里的 Deployment 依赖的是:Deployments() DeploymentInformer
而 DeploymentInformer
// DeploymentInformer provides access to a shared informer and lister for
// Deployments.
type DeploymentInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.DeploymentLister
实现了 Informer
和 Lister
本质上就是一个 ShareIndexInformer
接下来的三个部分,我将详细介绍 sample-controller、kubebuilder、operator-sdk 以及它们之间那微妙的关系。
✴️版权声明 © :本书所有内容遵循CC-BY-SA 3.0协议(署名-相同方式共享)©