网络编程
位置:首页>> 网络编程>> Go语言>> Go语言k8s kubernetes使用leader election实现选举

Go语言k8s kubernetes使用leader election实现选举

作者:李蓝  发布时间:2024-04-26 17:20:53 

标签:Go,k8s,leader,election,选举

一、背景

在kubernetes的世界中,很多组件仅仅需要一个实例在运行,比如controller-manager或第三方的controller,但是为了高可用性,需要组件有多个副本,在发生故障的时候需要自动切换。因此,需要利用leader election的机制多副本部署,单实例运行的模式。应用程序可以使用外部的组件比如ZooKeeper或Etcd等中间件进行leader eleaction, ZooKeeper的实现是采用临时节点的方案,临时节点存活与客户端与ZooKeeper的会话期间,在会话结束后,临时节点会被立刻删除,临时节点被删除后,其他处于被动状态的服务实例会竞争生成临时节点,生成临时节点的客户端(服务实例)就变成Leader,从而保证整个集群中只有一个活跃的实例,在发生故障的时候,也能快速的实现主从之间的迁移。Etcd是一个分布式的kv存储组件,利用Raft协议维护副本的状态服务,Etcd的Revision机制可以实现分布式锁的功能,Etcd的concurrency利用的分布式锁的能力实现了选Leader的功能(本文更多关注的是k8s本身的能力,Etcd的concurrency机制不做详细介绍)。

kubernetes使用的Etcd作为底层的存储组件,因此我们是不是有可能利用kubernetes的API实现选leader的功能呢?其实kubernetes的SIG已经提供了这方面的能力,主要是通过configmap/lease/endpoint的资源实现选Leader的功能。

二、官网代码示例

kubernetes官方提供了一个使用的例子,源码在:github.com/kubernetes/…

选举的过程中,每个实例的状态有可能是:

  • 选择成功->运行业务代码

  • 等待状态,有其他实例成为了leader。当leader放弃锁后,此状态的实例有可能会成为新的leader

  • 释放leader的锁,在运行的业务代码退出

在稳定的环境中,实例一旦成为了leader,通常情况是不会释放锁的,会保持一直运行的状态,这样有利于业务的稳定和Controller快速的对资源的状态变化做成相应的操作。只有在网络不稳定或误操作删除实例的情况下,才会触发leader的重新选举。

kubernetes官方提供的选举例子详解如下:

package main
import (
 "context"
 "flag"
 "os"
 "os/signal"
 "syscall"
 "time"
 "github.com/google/uuid"
 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 clientset "k8s.io/client-go/kubernetes"
 "k8s.io/client-go/rest"
 "k8s.io/client-go/tools/clientcmd"
 "k8s.io/client-go/tools/leaderelection"
 "k8s.io/client-go/tools/leaderelection/resourcelock"
 "k8s.io/klog/v2"
)
func buildConfig(kubeconfig string) (*rest.Config, error) {
 if kubeconfig != "" {
   cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
   if err != nil {
     return nil, err
   }
   return cfg, nil
 }
 cfg, err := rest.InClusterConfig()
 if err != nil {
   return nil, err
 }
 return cfg, nil
}
func main() {
 klog.InitFlags(nil)
 var kubeconfig string
 var leaseLockName string
 var leaseLockNamespace string
 var id string
 // kubeconfig 指定了kubernetes集群的配置文文件路径
 flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
 // 锁的拥有者的ID,如果没有传参数进来,就随机生成一个
 flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
 // 锁的ID,对应kubernetes中资源的name
 flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
 // 锁的命名空间
 flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
 // 解析命令行参数
 flag.Parse()
 if leaseLockName == "" {
   klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
 }
 if leaseLockNamespace == "" {
   klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
 }
 // leader election uses the Kubernetes API by writing to a
 // lock object, which can be a LeaseLock object (preferred),
 // a ConfigMap, or an Endpoints (deprecated) object.
 // Conflicting writes are detected and each client handles those actions
 // independently.
 config, err := buildConfig(kubeconfig)
 if err != nil {
   klog.Fatal(err)
 }
 // 获取kubernetes集群的客户端,如果获取不到,就抛异常退出
 client := clientset.NewForConfigOrDie(config)
 // 模拟Controller的逻辑代码
 run := func(ctx context.Context) {
   // complete your controller loop here
   klog.Info("Controller loop...")
   // 不退出
   select {}
 }
 // use a Go context so we can tell the leaderelection code when we
 // want to step down
 ctx, cancel := context.WithCancel(context.Background())
 defer cancel()
 // listen for interrupts or the Linux SIGTERM signal and cancel
 // our context, which the leader election code will observe and
 // step down
 // 处理系统的系统,收到SIGTERM信号后,会退出进程
 ch := make(chan os.Signal, 1)
 signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
 go func() {
   <-ch
   klog.Info("Received termination, signaling shutdown")
   cancel()
 }()
 // we use the Lease lock type since edits to Leases are less common
 // and fewer objects in the cluster watch "all Leases".

// 根据参数,生成锁。这里使用的Lease这种类型资源作为锁
 lock := &resourcelock.LeaseLock{
   LeaseMeta: metav1.ObjectMeta{
     Name:      leaseLockName,
     Namespace: leaseLockNamespace,
   },
   // 跟kubernetes集群关联起来
   Client: client.CoordinationV1(),
   LockConfig: resourcelock.ResourceLockConfig{
     Identity: id,
   },
 }
 // start the leader election code loop

// 注意,选举逻辑启动时候,会传入ctx参数,如果ctx对应的cancel函数被调用,那么选举也会结束
 leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
   // 选举使用的锁
   Lock: lock,
   // IMPORTANT: you MUST ensure that any code you have that
   // is protected by the lease must terminate **before**
   // you call cancel. Otherwise, you could have a background
   // loop still running and another process could
   // get elected before your background loop finished, violating
   // the stated goal of the lease.
   //主动放弃leader,当ctx canceled的时候
   ReleaseOnCancel: true,
   LeaseDuration:   60 * time.Second,  // 选举的任期,60s一个任期,如果在60s后没有renew,那么leader就会释放锁,重新选举
   RenewDeadline:   15 * time.Second,  // renew的请求的超时时间
   RetryPeriod:     5 * time.Second, // leader获取到锁后,renew leadership的间隔。非leader,抢锁成为leader的间隔(有1.2的jitter因子,详细看代码)

// 回调函数的注册
   Callbacks: leaderelection.LeaderCallbacks{

// 成为leader的回调
     OnStartedLeading: func(ctx context.Context) {
       // we're notified when we start - this is where you would
       // usually put your code
       // 运行controller的逻辑
       run(ctx)
     },
     OnStoppedLeading: func() {
       // we can do cleanup here
       // 退出leader的
       klog.Infof("leader lost: %s", id)
       os.Exit(0)
     },
     OnNewLeader: func(identity string) {
       // 有新的leader当选
       // we're notified when new leader elected
       if identity == id {
         // I just got the lock
         return
       }
       klog.Infof("new leader elected: %s", identity)
     },
   },
 })
}

启动一个实例,观察日志输出和kubernetes集群上的lease资源,启动命令

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1

可以看到,日志有输出,id=1的实例获取到资源了。

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 I1023 17:00:21.670298 94227 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:00:21.784234 94227 leaderelection.go:258] successfully acquired lease default/example I1023 17:00:21.784316 94227 main.go:78] Controller loop...

在kubernetes的集群上,看到

Go语言k8s kubernetes使用leader election实现选举

我们接着启动一个实例,id=2,日志中输出

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 I1023 17:05:00.555145 95658 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:05:00.658202 95658 main.go:151] new leader elected: 1

可以看出,id=2的实例,没有获取到锁,并且观察到id=1的锁获取到了实例。接着我们尝试退出id=1的实例,观察id=2的实例是否会成为新的leader

Go语言k8s kubernetes使用leader election实现选举

三、锁的实现

kubernets的资源都可以实现Get/Create/Update的操作,因此,理论上所有的资源都可以作为锁的底层。kubernetes 提供了Lease/Configmap/Endpoint作为锁的底层。

锁的状态转移如下:

Go语言k8s kubernetes使用leader election实现选举

锁需要实现以下的接口

type Interface interface {
 // Get returns the LeaderElectionRecord
 Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
 // Create attempts to create a LeaderElectionRecord
 Create(ctx context.Context, ler LeaderElectionRecord) error
 // Update will update and existing LeaderElectionRecord
 Update(ctx context.Context, ler LeaderElectionRecord) error
 // RecordEvent is used to record events
 RecordEvent(string)
 // Identity will return the locks Identity
 Identity() string
 // Describe is used to convert details on current resource lock
 // into a string
 Describe() string
}

理论上,有Get/Create/Update三个方法,就可以实现锁的机制了。但是,需要保证update和create操作的原子性,这个就是kuberenetes的机制保证了。第二章的官网代码例子中,leaderelection.RunOrDie使用的RunOrDie接口,其实就是调用Run接口,而Run接口实现非常简单:

func (le *LeaderElector) Run(ctx context.Context) {
 defer runtime.HandleCrash()
 defer func() {
   le.config.Callbacks.OnStoppedLeading()
 }()
 // 获取锁,如果没有获取到,就一直等待
 if !le.acquire(ctx) {
   return // ctx signalled done
 }
 ctx, cancel := context.WithCancel(ctx)
 defer cancel()
 // 获取到锁后,需要调用回调函数中的OnStartedLeading,运行controller的代码
 go le.config.Callbacks.OnStartedLeading(ctx)

// 获取到锁后,需要不断地进行renew操作
 le.renew(ctx)
}

LeaderElector关键是需要acquire和renew的操作,acquire和renew操作代码如下:

func (le *LeaderElector) acquire(ctx context.Context) bool {
 ctx, cancel := context.WithCancel(ctx)
 defer cancel()
 succeeded := false
 desc := le.config.Lock.Describe()
 klog.Infof("attempting to acquire leader lease %v...", desc)
 // 此接口会阻塞,利用定时的机制,获取锁,如果获取不到一直循环,除非ctx被取消。
 wait.JitterUntil(func() {
   // 获取锁
   succeeded = le.tryAcquireOrRenew(ctx)
   le.maybeReportTransition()
   if !succeeded {
     klog.V(4).Infof("failed to acquire lease %v", desc)
     return
   }
   le.config.Lock.RecordEvent("became leader")
   le.metrics.leaderOn(le.config.Name)
   klog.Infof("successfully acquired lease %v", desc)
   cancel()
 }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
 return succeeded
}
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(ctx context.Context) {
 ctx, cancel := context.WithCancel(ctx)
 defer cancel()
 // 循环renew机制,renew成功,不会返回true,导致Until会不断循环
 wait.Until(func() {
   //RenewDeadline的实现在这里,如果renew超过了RenewDeadline,会导致renew失败,主退出
   timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
   defer timeoutCancel()
   err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
     // renew锁
     return le.tryAcquireOrRenew(timeoutCtx), nil
   }, timeoutCtx.Done())
   le.maybeReportTransition()
   desc := le.config.Lock.Describe()
   if err == nil {
     klog.V(5).Infof("successfully renewed lease %v", desc)
     // renew成功
     return
   }
   le.config.Lock.RecordEvent("stopped leading")
   le.metrics.leaderOff(le.config.Name)
   klog.Infof("failed to renew lease %v: %v", desc, err)
   cancel()
 }, le.config.RetryPeriod, ctx.Done())
 // if we hold the lease, give it up
 if le.config.ReleaseOnCancel {
   le.release()
 }
}

关键的实现在于tryAcquireOrRenew,而tryAcquireOrRenew就是依赖锁的状态转移机制完成核心逻辑。

func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
 now := metav1.Now()
 leaderElectionRecord := rl.LeaderElectionRecord{
   HolderIdentity:       le.config.Lock.Identity(),
   LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
   RenewTime:            now,
   AcquireTime:          now,
 }
 // 1. obtain or create the ElectionRecord
 // 检查锁有没有
 oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
 if err != nil {
   // 没有锁的资源,就创建一个
   if !errors.IsNotFound(err) {
     klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
     return false
   }
   if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
     klog.Errorf("error initially creating leader election record: %v", err)
     return false
   }
   //对外宣称自己成为了leader
   le.setObservedRecord(&leaderElectionRecord)
   return true
 }
 // 2. Record obtained, check the Identity & Time
 if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
   // 这个机制很重要,会如果leader会不断正常renew这个锁,oldLeaderElectionRawRecord会一直发生变化,发生变化会更新le.observedTime
   le.setObservedRecord(oldLeaderElectionRecord)
   le.observedRawRecord = oldLeaderElectionRawRecord
 }
 // 如果还没超时并且此实例不是leader(leader是其他实例),那么就直接退出
 if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
   le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
   !le.IsLeader() {
   klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
   return false
 }
 // 3. We're going to try to update. The leaderElectionRecord is set to it's default
 // here. Let's correct it before updating.
 // 如果是leader,就更新时间RenewTime,保证其他实例(非主)可以观察到:主还活着
 if le.IsLeader() {
   leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
   leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
 } else {
 // 不是leader,那么锁就发生了转移
   leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
 }
 // 更新锁
 // update the lock itself
 if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
   klog.Errorf("Failed to update lock: %v", err)
   return false
 }
 le.setObservedRecord(&leaderElectionRecord)
 return true
}

来源:https://juejin.cn/post/7157648925078323207

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com