공부/Kubernetes

k8s client-go leader election 코드 메커니즘 분석

토고미 2022. 11. 9. 20:06

개념

leader election이란 말 그대로 리더를 선출하는 것으로, CS에서 자주 쓰이는 개념이다.

kubernetes go client에서는 이러한 leader election 기능을 제공한다.

아래는 k8s go client가 사용하는 leader election 방식에 대한 간단한 그림이다.

 

k8s에서의 일반적인 leader election 방식 (출처 : https://itnext.io/leader-election-in-kubernetes-using-client-go-a19cbe7a9a85)

특정 리소스(그림 상에서 Lock)에 대해서 소유권이 자신에게 있다고 작성하는 사람이 leader이다.

 

일단 leader가 되면 leader는 주기적으로(그림 상에서 leaseDuration) 자신이 아직도 리더임을 갱신한다.(그림 상에서 lastObservedTime)

leader가 되지 못한 나머지는 주기적으로 그 리소스를 확인하다가, 한 주기가 지났는데도 리더가 갱신을 못했을 시 소유권이 자신에게 있다고 작성을 시도한다.

위 과정이 계속 반복되게 되는 것이 k8s go client의 leader eleciton 기본 메커니즘이다.

 

이러한 방식이 가능한 이유는 k8s가 ResourceVersion을 기준으로 Optimistic concurrency(낙관적 동시성 제어) 정책을 가지고 있기 때문이다.

 

코드

그렇다면 코드를 보자.

import(
	"k8s.io/client-go/tools/leaderelection"
	"k8s.io/client-go/tools/leaderelection/resourcelock"
)

func runLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, id string) {
	...
	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
		Lock:            lock,
		ReleaseOnCancel: true,
		LeaseDuration:   15 * time.Second,
		RenewDeadline:   10 * time.Second,
		RetryPeriod:     2 * time.Second,
		Callbacks: leaderelection.LeaderCallbacks{
			OnStartedLeading: func(c context.Context) {
				// 리더가 할 일
			},
			OnStoppedLeading: func() {
				// 리더에서 퇴출되면 할 일
			},
			OnNewLeader: func(current_id string) {
				// 리더가 새롭게 뽑혔을 때 할 일
			},
		},
	})
    ...
}

실제로 사용할 때는 위와 같이 leaderelection.RunOrDie() 함수만 사용하면 된다.

주석에 쓰여 있는 대로

OnStartedLeading은 리더가 할 일,

OnStoppedLeading은 리더에서 퇴출되면 할 일,

OnNewLeader는 리더가 새롭게 뽑혔을 때 할 일이다.

 

내부 메커니즘 파악을 위해서 RunOrDie 함수를 파헤쳐보자.

 

// RunOrDie starts a client with the provided config or panics if the config
// fails to validate. RunOrDie blocks until leader election loop is
// stopped by ctx or it has stopped holding the leader lease
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
	le, err := NewLeaderElector(lec)
	if err != nil {
		panic(err)
	}
	if lec.WatchDog != nil {
		lec.WatchDog.SetLeaderElection(le)
	}
	le.Run(ctx)
}

NewLeaderElector는 올바른 설정을 주었는지 확인하는 함수이다.

lec.WatchDog은 아직 파악을 못했다... 필수는 아니지만 무언가 옵션 값이다.

 

어쨌든 설정이 제대로 돼있는지 확인하는 작업을 해주고

le.Run(ctx)를 실행하게 된다.

 

// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
func (le *LeaderElector) Run(ctx context.Context) {
	defer runtime.HandleCrash()
	defer func() {
		le.config.Callbacks.OnStoppedLeading()
	}()

	if !le.acquire(ctx) {
		return // ctx signalled done
	}
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	go le.config.Callbacks.OnStartedLeading(ctx)
	le.renew(ctx)
}

가장 위에 defer 키워드 두 개는 간단하다.

알다시피 defer는 함수가 종료될 때 부르도록 지정하는 예약어이다.

defer runtime.HandleCrash()는 이 함수에서 crash(에러)가 났을 때 처리를 위한 함수이고,

defer .... OnStoppedLeading() 은 이 함수가 종료될 때 OnStoppedLeading() 함수를 실행하겠다는 것이다.

 

즉, 이와 같이 유추할 수 있다.

이 함수 중간에서 leader를 획득하는 과정이 있을 것이고,

이 함수를 종료한다는 것은 leader에서 퇴출당한다는 의미이다!

 

다시 말하면,

	if !le.acquire(ctx) {
		return // ctx signalled done
	}

이 부분이 리소스를 통해 리더를 선출하는 과정을 거치는 것이다.

그리고 위 if문을 통과해야 리더로서 선출이 되며,

	go le.config.Callbacks.OnStartedLeading(ctx)
	le.renew(ctx)

OnStartedLeading() 함수를 수행하고, le.renew() 함수를 통해 계속 자신이 리더임을 갱신하는 것을 확인할 수 있다.

 

그렇다면 핵심 로직으로 예상이 되는 le.acquired() 함수를 확인해보자!

// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if ctx signals done.
func (le *LeaderElector) acquire(ctx context.Context) bool {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	succeeded := false
	desc := le.config.Lock.Describe()
	klog.Infof("attempting to acquire leader lease %v...", desc)
	wait.JitterUntil(func() {
		succeeded = le.tryAcquireOrRenew(ctx)
		le.maybeReportTransition()
		if !succeeded {
			klog.V(4).Infof("failed to acquire lease %v", desc)
			return
		}
		le.config.Lock.RecordEvent("became leader")
		le.metrics.leaderOn(le.config.Name)
		klog.Infof("successfully acquired lease %v", desc)
		cancel()
	}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
	return succeeded
}

주석에도 쓰여있듯이 메인이 되는 로직은 wait.JitterUntil() 함수이며, 특히 그 안에서도 tryAcquireOrRenew() 함수가 실제 리소스를 획득하려는 시도를 하는 함수임을 유추할 수 있다.

 

즉 이 함수는 le.config.RetryPeriod 주기마다 tryAcquireOrRenew()를 시도하며, 리소스 획득에 성공하면 리더가 되고 그렇지 못한 경우 주기적으로 계속 시도한다.

 

그렇다면 마지막으로 tryAcquireOrRenew() 함수를 확인해보자.

// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
	now := metav1.Now()
	leaderElectionRecord := rl.LeaderElectionRecord{
		HolderIdentity:       le.config.Lock.Identity(),
		LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
		RenewTime:            now,
		AcquireTime:          now,
	}

	// 1. obtain or create the ElectionRecord
	oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
	if err != nil {
		if !errors.IsNotFound(err) {
			klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
			return false
		}
		if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
			klog.Errorf("error initially creating leader election record: %v", err)
			return false
		}

		le.setObservedRecord(&leaderElectionRecord)

		return true
	}

	// 2. Record obtained, check the Identity & Time
	if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
		le.setObservedRecord(oldLeaderElectionRecord)

		le.observedRawRecord = oldLeaderElectionRawRecord
	}
	if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
		le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
		!le.IsLeader() {
		klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
		return false
	}

	// 3. We're going to try to update. The leaderElectionRecord is set to it's default
	// here. Let's correct it before updating.
	if le.IsLeader() {
		leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
	} else {
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
	}

	// update the lock itself
	if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
		klog.Errorf("Failed to update lock: %v", err)
		return false
	}

	le.setObservedRecord(&leaderElectionRecord)
	return true
}

코드가 약간 길지만, 가장 위에서 서술한 개념이 여기에서 드디어 등장한다.

그 개념으로 보면 간단하게 구성되어있음을 알 수 있다.

차근차근 주석을 따라가 보자.

 

	// 1. obtain or create the ElectionRecord
	oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
	if err != nil {
		if !errors.IsNotFound(err) {
			klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
			return false
		}
		if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
			klog.Errorf("error initially creating leader election record: %v", err)
			return false
		}

		le.setObservedRecord(&leaderElectionRecord)

		return true
	}

주석 1에서는 lock 리소스를 Get 해본다.

만약 lock이 없으면 생성하며, 자신이 리더가 된다.

만약 lock이 있다면, 자신이 리더가 아니므로 다음 로직으로 넘어간다.

 

	// 2. Record obtained, check the Identity & Time
	if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
		le.setObservedRecord(oldLeaderElectionRecord)

		le.observedRawRecord = oldLeaderElectionRawRecord
	}
	if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
		le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
		!le.IsLeader() {
		klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
		return false
	}

주석 2에서는 조회한 lock 리소스를 가지고 있는 리더가 갱신을 제때 했는지 확인한다.

만약 갱신이 제시간에 됐다면, false를 return 하며 리더 획득에 실패한다.

만약 갱신이 제시간에 되지 않았다면, 그제야 드디어 lock 획득을 시도하기 위해 다음 로직으로 넘어간다.

 

	// 3. We're going to try to update. The leaderElectionRecord is set to it's default
	// here. Let's correct it before updating.
	if le.IsLeader() {
		leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
	} else {
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
	}

	// update the lock itself
	if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
		klog.Errorf("Failed to update lock: %v", err)
		return false
	}

	le.setObservedRecord(&leaderElectionRecord)
	return true

주석 3에서는 마침내 lock 업데이트를 시도한다.

 

만약 업데이트에 성공하면 return true를 하여 마침내 리더가 되고,

업데이트에 실패하면(누군가 먼저 업데이트를 할 경우) return false를 하며 리더가 되는 것에 실패한다.

 

마무리

완벽히 파악하기 위해서는 golang의 context 개념 등을 알고 있어야 하지만,

이렇게 큰 개념을 이해한 상태에서 간단하게나마 파헤쳐보았다.

k8s go client의 leader election을 알고 싶은 사람에게 도움이 되었으면 한다.

 

혹시 틀린 내용이 있으면 댓글로 제보!!

 

 

- 참고

https://itnext.io/leader-election-in-kubernetes-using-client-go-a19cbe7a9a85