diff --git a/leaderelection/config.go b/leaderelection/config.go index 44378d30c..8b7c02058 100644 --- a/leaderelection/config.go +++ b/leaderelection/config.go @@ -126,6 +126,14 @@ type ComponentConfig struct { RetryPeriod time.Duration } +// StatefulSetConfig represents the required information for a StatefulSet service. +type StatefulSetConfig struct { + StatefulSetName string + ServiceName string + Port string + Protocol string +} + func defaultComponentConfig(name string) ComponentConfig { return ComponentConfig{ Component: name, diff --git a/leaderelection/context.go b/leaderelection/context.go index a0d8555f9..d273a572e 100644 --- a/leaderelection/context.go +++ b/leaderelection/context.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "knative.dev/pkg/logging" + "knative.dev/pkg/network" "knative.dev/pkg/reconciler" "knative.dev/pkg/system" ) @@ -42,6 +43,16 @@ func WithStandardLeaderElectorBuilder(ctx context.Context, kc kubernetes.Interfa }) } +// WithStatefulSetLeaderElectorBuilder infuses a context with the ability to build +// Electors which are assigned leadership based on the StatefulSet ordinal from +// the provided component configuration. +func WithStatefulSetLeaderElectorBuilder(ctx context.Context, cc ComponentConfig, ssc StatefulSetConfig) context.Context { + return context.WithValue(ctx, builderKey{}, &statefulSetBuilder{ + lec: cc, + ssc: ssc, + }) +} + // HasLeaderElection returns whether there is leader election configuration // associated with the context func HasLeaderElection(ctx context.Context) bool { @@ -61,8 +72,9 @@ func BuildElector(ctx context.Context, la reconciler.LeaderAware, name string, e switch builder := val.(type) { case *standardBuilder: return builder.BuildElector(ctx, la, name, enq) + case *statefulSetBuilder: + return builder.BuildElector(ctx, la, enq) } - // TODO(mattmoor): Add a flavor of builder that relies on StatefulSet to partition the key space. } return &unopposedElector{ @@ -90,10 +102,11 @@ func (b *standardBuilder) BuildElector(ctx context.Context, la reconciler.Leader buckets := make([]Elector, 0, b.lec.Buckets) for i := uint32(0); i < b.lec.Buckets; i++ { bkt := &bucket{ - component: b.lec.Component, - name: name, - index: i, - total: b.lec.Buckets, + // The resource name is the lowercase: + // {component}.{workqueue}.{index}-of-{total} + name: strings.ToLower(fmt.Sprintf("%s.%s.%02d-of-%02d", b.lec.Component, name, i, b.lec.Buckets)), + index: i, + total: b.lec.Buckets, } rl, err := resourcelock.New(b.lec.ResourceLock, @@ -145,6 +158,35 @@ func (b *standardBuilder) BuildElector(ctx context.Context, la reconciler.Leader return &runAll{les: buckets}, nil } +type statefulSetBuilder struct { + lec ComponentConfig + ssc StatefulSetConfig +} + +func (b *statefulSetBuilder) BuildElector(ctx context.Context, la reconciler.LeaderAware, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) { + logger := logging.FromContext(ctx) + + ordinal, err := ControllerOrdinal() + if err != nil { + return nil, err + } + + logger.Infof("%s will run in StatefulSet ordinal assignement mode with ordinal %d", b.lec.Component, ordinal) + + return &unopposedElector{ + bkt: &bucket{ + // The name is the full pod DNS of the owner pod of this bucket. + name: fmt.Sprintf("%s://%s-%d.%s.%s.svc.%s:%s", b.ssc.Protocol, + b.ssc.StatefulSetName, ordinal, b.ssc.ServiceName, + system.Namespace(), network.GetClusterDomainName(), b.ssc.Port), + index: uint32(ordinal), + total: b.lec.Buckets, + }, + la: la, + enq: enq, + }, nil +} + // unopposedElector promotes when run without needing to be elected. type unopposedElector struct { bkt reconciler.Bucket @@ -197,8 +239,7 @@ func (ruc *runUntilCancelled) Run(ctx context.Context) { } type bucket struct { - component string - name string + name string // We are bucket {index} of {total} index uint32 @@ -209,9 +250,7 @@ var _ reconciler.Bucket = (*bucket)(nil) // Name implements reconciler.Bucket func (b *bucket) Name() string { - // The resource name is the lowercase: - // {component}.{workqueue}.{index}-of-{total} - return strings.ToLower(fmt.Sprintf("%s.%s.%02d-of-%02d", b.component, b.name, b.index, b.total)) + return b.name } // Has implements reconciler.Bucket diff --git a/leaderelection/context_test.go b/leaderelection/context_test.go index 5e9bc6ce0..76e75caae 100644 --- a/leaderelection/context_test.go +++ b/leaderelection/context_test.go @@ -21,6 +21,7 @@ package leaderelection import ( "context" + "os" "testing" "time" @@ -95,7 +96,7 @@ func TestWithBuilder(t *testing.T) { le, err := BuildElector(ctx, laf, "name", enq) if err != nil { - t.Errorf("BuildElector() = %v", err) + t.Fatalf("BuildElector() = %v", err) } // We shouldn't see leases until we Run the elector. @@ -144,3 +145,71 @@ func TestWithBuilder(t *testing.T) { t.Fatal("Timed out waiting for demotion.") } } + +func TestWithStatefulSetBuilder(t *testing.T) { + cc := ComponentConfig{ + Component: "component", + LeaderElect: true, + Buckets: 1, + } + podDNS := "ws://as-0.autoscaler.knative-testing.svc.cluster.local:8080" + ctx := context.Background() + + promoted := make(chan struct{}) + laf := &reconciler.LeaderAwareFuncs{ + PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { + close(promoted) + return nil + }, + } + enq := func(reconciler.Bucket, types.NamespacedName) {} + + ctx = WithStatefulSetLeaderElectorBuilder(ctx, cc, StatefulSetConfig{ + ServiceName: "autoscaler", + StatefulSetName: "as", + Protocol: "ws", + Port: "8080", + }) + if !HasLeaderElection(ctx) { + t.Error("HasLeaderElection() = false, wanted true") + } + + le, err := BuildElector(ctx, laf, "name", enq) + if err == nil { + // controller ordinal env not set + t.Error("expected BuildElector() returns error but got none") + } + + os.Setenv(controllerOrdinalEnv, "as-0") + defer os.Unsetenv(controllerOrdinalEnv) + le, err = BuildElector(ctx, laf, "name", enq) + if err != nil { + t.Fatalf("BuildElector() = %v", err) + } + + ule, ok := le.(*unopposedElector) + if !ok { + t.Fatalf("BuildElector() = %T, wanted an unopposedElector", le) + } + if got, want := ule.bkt.Name(), podDNS; got != want { + t.Errorf("bkt.Name() = %s, wanted %s", got, want) + } + + // Shouldn't be promoted until we Run the elector. + select { + case <-promoted: + t.Error("Got promoted, want no actions.") + default: + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go le.Run(ctx) + + select { + case <-promoted: + // We expect to have been promoted. + case <-time.After(1 * time.Second): + t.Fatal("Timed out waiting for promotion.") + } +} diff --git a/leaderelection/ordinal.go b/leaderelection/ordinal.go new file mode 100644 index 000000000..94a3775a0 --- /dev/null +++ b/leaderelection/ordinal.go @@ -0,0 +1,39 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "fmt" + "os" + "strconv" + "strings" +) + +// If run a process on Kubernetes, the value of this environment variable +// should be set to the pod name via the downward API. +const controllerOrdinalEnv = "CONTROLLER_ORDINAL" + +// ControllerOrdinal tries to get ordinal from the pod name of a StatefulSet, +// which is provided from the environment variable CONTROLLER_ORDINAL. +func ControllerOrdinal() (uint64, error) { + v := os.Getenv(controllerOrdinalEnv) + if i := strings.LastIndex(v, "-"); i != -1 { + return strconv.ParseUint(v[i+1:], 10, 64) + } + + return 0, fmt.Errorf("ordinal not found in %s=%s", controllerOrdinalEnv, v) +} diff --git a/leaderelection/ordinal_test.go b/leaderelection/ordinal_test.go new file mode 100644 index 000000000..ce300a33b --- /dev/null +++ b/leaderelection/ordinal_test.go @@ -0,0 +1,72 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "fmt" + "os" + "testing" +) + +func TestControllerOrdinal(t *testing.T) { + testCases := []struct { + testname string + podName string + want uint64 + err error + }{{ + testname: "NotSet", + err: fmt.Errorf("ordinal not found in %s=", controllerOrdinalEnv), + }, { + testname: "NoHyphen", + podName: "as", + err: fmt.Errorf("ordinal not found in %s=as", controllerOrdinalEnv), + }, { + testname: "InvalidOrdinal", + podName: "as-invalid", + err: fmt.Errorf(`strconv.ParseUint: parsing "invalid": invalid syntax`), + }, { + testname: "ValidName", + podName: "as-0", + }, { + testname: "ValidName", + podName: "as-1", + want: 1, + }} + + defer os.Unsetenv(controllerOrdinalEnv) + for _, tt := range testCases { + t.Run(tt.testname, func(t *testing.T) { + if tt.podName != "" { + if os.Setenv(controllerOrdinalEnv, tt.podName) != nil { + t.Fatalf("fail to set env var %s=%s", controllerOrdinalEnv, tt.podName) + } + } + + got, gotErr := ControllerOrdinal() + if tt.err != nil { + if gotErr == nil || gotErr.Error() != tt.err.Error() { + t.Errorf("got %v, want = %v, ", gotErr, tt.err) + } + } else if gotErr != nil { + t.Error("ControllerOrdinal() =", gotErr) + } else if got != tt.want { + t.Errorf("ControllerOrdinal() = %d, want = %d", got, tt.want) + } + }) + } +}