Support customizing lease name and identity (#1602)

* add fields

* address comment

* use lease lock

* remove func

* add comment
This commit is contained in:
Yanwei Guo 2020-08-27 02:09:06 +00:00 committed by GitHub
parent c56f5e203b
commit 21dcafbfa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 7 deletions

View File

@ -34,8 +34,8 @@ import (
const (
configMapNameEnv = "CONFIG_LEADERELECTION_NAME"
// KnativeResourceLock is the only supported lock mechanism for Knative.
KnativeResourceLock = resourcelock.LeasesResourceLock
// knativeResourceLock is the only supported lock mechanism for Knative.
knativeResourceLock = resourcelock.LeasesResourceLock
)
// MaxBuckets is the maximum number of buckets to allow users to define.
@ -112,6 +112,18 @@ type ComponentConfig struct {
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration
// LeaseName is a function to customize the lease name given the index i.
// If not present, a name in format {Component}.{queue-name}.{i}-of-{Buckets}
// will be use.
// Autoscaler need to know the Lease names to filter out Leases which are not
// used for Autoscaler. Instead of exposing the names from leadelection package,
// we let Autoscaler to pass them in.
LeaseName func(i uint32) string
// Identity is the unique string identifying a resource lock holder across
// all participants in an election. If not present, a new unique string will
// be generated to be used as identity for each BuildElector call.
// Autoscaler uses the pod IP as identity.
Identity string
}
// statefulSetID is a envconfig Decodable controller ordinal and name.

View File

@ -109,9 +109,13 @@ func (b *standardBuilder) buildElector(ctx context.Context, la reconciler.Leader
queueName string, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) {
logger := logging.FromContext(ctx)
id, err := UniqueID()
if err != nil {
return nil, err
id := b.lec.Identity
if id == "" {
uid, err := UniqueID()
if err != nil {
return nil, err
}
id = uid
}
bkts := newStandardBuckets(queueName, b.lec)
@ -120,7 +124,7 @@ func (b *standardBuilder) buildElector(ctx context.Context, la reconciler.Leader
// Use a local var which won't change across the for loop since it is
// used in a callback asynchronously.
bkt := bkt
rl, err := resourcelock.New(KnativeResourceLock,
rl, err := resourcelock.New(knativeResourceLock,
system.Namespace(), // use namespace we are running in
bkt.Name(),
b.kc.CoreV1(),
@ -169,9 +173,15 @@ func (b *standardBuilder) buildElector(ctx context.Context, la reconciler.Leader
}
func newStandardBuckets(queueName string, cc ComponentConfig) []reconciler.Bucket {
ln := cc.LeaseName
if ln == nil {
ln = func(i uint32) string {
return standardBucketName(i, queueName, cc)
}
}
names := make(sets.String, cc.Buckets)
for i := uint32(0); i < cc.Buckets; i++ {
names.Insert(standardBucketName(i, queueName, cc))
names.Insert(ln(i))
}
return hash.NewBucketSet(names).Buckets()

View File

@ -21,6 +21,7 @@ package leaderelection
import (
"context"
"fmt"
"os"
"sort"
"testing"
@ -166,6 +167,60 @@ func TestWithBuilder(t *testing.T) {
}
}
func TestBuilderWithCustomizedLeaseName(t *testing.T) {
const buckets = 3
cc := ComponentConfig{
Component: "the-component",
Buckets: buckets,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
LeaseName: func(i uint32) string {
return fmt.Sprintf("bucket-%02d", i)
},
}
kc := fakekube.NewSimpleClientset()
ctx := context.Background()
gotNames := make(sets.String, buckets)
promoted := make(chan string)
laf := &reconciler.LeaderAwareFuncs{
PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
promoted <- bkt.Name()
return nil
},
}
enq := func(reconciler.Bucket, types.NamespacedName) {}
ctx = WithDynamicLeaderElectorBuilder(ctx, kc, cc)
le, err := BuildElector(ctx, laf, "name", enq)
if err != nil {
t.Fatalf("BuildElector() = %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go le.Run(ctx)
// We expect to have been promoted 3 times.
for i := 0; i < buckets; i++ {
select {
case s := <-promoted:
gotNames.Insert(s)
case <-time.After(time.Second):
t.Fatal("Timed out waiting for promotion.")
}
}
want := sets.NewString(
"bucket-00",
"bucket-01",
"bucket-02",
)
if !gotNames.Equal(want) {
t.Errorf("BucketSet.BucketList() = %q, want: %q", gotNames, want)
}
}
func TestNewStatefulSetBucketAndSet(t *testing.T) {
wantNames := []string{
"http://as-0.autoscaler.knative-testing.svc.cluster.local:80",