diff --git a/Gopkg.lock b/Gopkg.lock index bacd9620..4d3f56c2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -966,7 +966,7 @@ [[projects]] branch = "master" - digest = "1:82eaafe1b10a648afe37de603c6d9f4fe69fb5c129ecc832a5997335f3a0ee89" + digest = "1:1829518da17f4803e337ca56334ef7c887a9677f28081f7176bf15e2d3bb64d1" name = "knative.dev/pkg" packages = [ "apis", @@ -986,7 +986,7 @@ "reconciler", ] pruneopts = "T" - revision = "7af3fab62ce2bf2f861fee510eeb15eccb47925e" + revision = "ca35cb8791d737c06c13fc4c972b18f08647f732" [[projects]] branch = "master" @@ -997,7 +997,7 @@ "tools/dep-collector", ] pruneopts = "UT" - revision = "01760e2303d6a17936658f155004da3019d0f43c" + revision = "b3fc723351c34f03515ee548a5c4ff021e7f544d" [[projects]] digest = "1:8730e0150dfb2b7e173890c8b9868e7a273082ef8e39f4940e3506a481cf895c" diff --git a/vendor/knative.dev/pkg/Gopkg.lock b/vendor/knative.dev/pkg/Gopkg.lock index 8ec3cc89..ae0a987a 100644 --- a/vendor/knative.dev/pkg/Gopkg.lock +++ b/vendor/knative.dev/pkg/Gopkg.lock @@ -980,7 +980,7 @@ version = "kubernetes-1.16.4" [[projects]] - digest = "1:1aaf879947e3abf264929bb0220acced357f85da5ac0f58b10f0f8a5719a41ef" + digest = "1:e49d4f18068a8b4e5ac2ae52ec9363e8509f17de8c1569a41663ef1632fc4aeb" name = "k8s.io/apimachinery" packages = [ "pkg/api/apitesting", @@ -1025,6 +1025,7 @@ "pkg/util/sets", "pkg/util/sets/types", "pkg/util/strategicpatch", + "pkg/util/uuid", "pkg/util/validation", "pkg/util/validation/field", "pkg/util/wait", @@ -1039,7 +1040,7 @@ version = "kubernetes-1.16.4" [[projects]] - digest = "1:b03297e45cd203dee7e9449141555d3973d04d5323ece6b4e20562b80185767e" + digest = "1:b485876cac57a0612f78c907bde4e34c3f327e35e705f7f59d3bd7c5cd78a688" name = "k8s.io/client-go" packages = [ "discovery", @@ -1229,6 +1230,8 @@ "tools/clientcmd/api", "tools/clientcmd/api/latest", "tools/clientcmd/api/v1", + "tools/leaderelection", + "tools/leaderelection/resourcelock", "tools/metrics", "tools/pager", "tools/record", @@ -1457,6 +1460,7 @@ "k8s.io/apimachinery/pkg/util/runtime", "k8s.io/apimachinery/pkg/util/sets", "k8s.io/apimachinery/pkg/util/sets/types", + "k8s.io/apimachinery/pkg/util/uuid", "k8s.io/apimachinery/pkg/util/validation", "k8s.io/apimachinery/pkg/util/wait", "k8s.io/apimachinery/pkg/version", @@ -1487,6 +1491,8 @@ "k8s.io/client-go/testing", "k8s.io/client-go/tools/cache", "k8s.io/client-go/tools/clientcmd", + "k8s.io/client-go/tools/leaderelection", + "k8s.io/client-go/tools/leaderelection/resourcelock", "k8s.io/client-go/tools/metrics", "k8s.io/client-go/tools/record", "k8s.io/client-go/util/retry", diff --git a/vendor/knative.dev/pkg/injection/sharedmain/main.go b/vendor/knative.dev/pkg/injection/sharedmain/main.go index 83dd0d25..1b159338 100644 --- a/vendor/knative.dev/pkg/injection/sharedmain/main.go +++ b/vendor/knative.dev/pkg/injection/sharedmain/main.go @@ -32,8 +32,14 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -41,6 +47,7 @@ import ( "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection" + kle "knative.dev/pkg/leaderelection" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/profiling" @@ -94,6 +101,20 @@ func GetLoggingConfig(ctx context.Context) (*logging.Config, error) { return logging.NewConfigFromConfigMap(loggingConfigMap) } +// GetLeaderElectionConfig gets the leader election config. +func GetLeaderElectionConfig(ctx context.Context) (*kle.Config, error) { + leaderElectionConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(kle.ConfigMapName(), metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return kle.NewConfigFromMap(nil) + } + + return nil, err + } + + return kle.NewConfigFromConfigMap(leaderElectionConfigMap) +} + // Main runs the generic main flow for non-webhook controllers with a new // context. Use WebhookMainWith* if you need to serve webhooks. func Main(component string, ctors ...injection.ControllerConstructor) { @@ -127,36 +148,53 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto defer flush(logger) ctx = logging.WithLogger(ctx, logger) profilingHandler := profiling.NewHandler(logger, false) - - CheckK8sClientMinimumVersionOrDie(ctx, logger) - cmw := SetupConfigMapWatchOrDie(ctx, logger) - controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...) - WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component) - WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component) - - logger.Info("Starting configuration manager...") - if err := cmw.Start(ctx.Done()); err != nil { - logger.Fatalw("Failed to start configuration manager", zap.Error(err)) - } - logger.Info("Starting informers...") - if err := controller.StartInformers(ctx.Done(), informers...); err != nil { - logger.Fatalw("Failed to start informers", zap.Error(err)) - } - logger.Info("Starting controllers...") - go controller.StartAll(ctx.Done(), controllers...) - profilingServer := profiling.NewServer(profilingHandler) eg, egCtx := errgroup.WithContext(ctx) eg.Go(profilingServer.ListenAndServe) + go func() { + // This will block until either a signal arrives or one of the grouped functions + // returns an error. + <-egCtx.Done() - // This will block until either a signal arrives or one of the grouped functions - // returns an error. - <-egCtx.Done() + profilingServer.Shutdown(context.Background()) + if err := eg.Wait(); err != nil && err != http.ErrServerClosed { + logger.Errorw("Error while running server", zap.Error(err)) + } + }() + CheckK8sClientMinimumVersionOrDie(ctx, logger) - profilingServer.Shutdown(context.Background()) - // Don't forward ErrServerClosed as that indicates we're already shutting down. - if err := eg.Wait(); err != nil && err != http.ErrServerClosed { - logger.Errorw("Error while running server", zap.Error(err)) + run := func(ctx context.Context) { + cmw := SetupConfigMapWatchOrDie(ctx, logger) + controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...) + WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component) + WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component) + + logger.Info("Starting configuration manager...") + if err := cmw.Start(ctx.Done()); err != nil { + logger.Fatalw("Failed to start configuration manager", zap.Error(err)) + } + logger.Info("Starting informers...") + if err := controller.StartInformers(ctx.Done(), informers...); err != nil { + logger.Fatalw("Failed to start informers", zap.Error(err)) + } + logger.Info("Starting controllers...") + go controller.StartAll(ctx.Done(), controllers...) + + <-ctx.Done() + } + + // Set up leader election config + leaderElectionConfig, err := GetLeaderElectionConfig(ctx) + if err != nil { + logger.Fatalf("Error loading leader election configuration: %v", err) + } + leConfig := leaderElectionConfig.GetComponentConfig(component) + + if !leConfig.LeaderElect { + logger.Infof("%v will not run in leader-elected mode", component) + run(ctx) + } else { + RunLeaderElected(ctx, logger, run, component, leConfig) } } @@ -186,6 +224,7 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf defer flush(logger) ctx = logging.WithLogger(ctx, logger) profilingHandler := profiling.NewHandler(logger, false) + profilingServer := profiling.NewServer(profilingHandler) CheckK8sClientMinimumVersionOrDie(ctx, logger) cmw := SetupConfigMapWatchOrDie(ctx, logger) @@ -204,8 +243,6 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf logger.Info("Starting controllers...") go controller.StartAll(ctx.Done(), controllers...) - profilingServer := profiling.NewServer(profilingHandler) - eg, egCtx := errgroup.WithContext(ctx) eg.Go(profilingServer.ListenAndServe) @@ -369,3 +406,65 @@ func ControllersAndWebhooksFromCtors(ctx context.Context, return controllers, webhooks } + +// RunLeaderElected runs the given function in leader elected mode. The function +// will be run only once the leader election lock is obtained. +func RunLeaderElected(ctx context.Context, logger *zap.SugaredLogger, run func(context.Context), component string, leConfig kle.ComponentConfig) { + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + // Create event broadcaster + logger.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + watches := []watch.Interface{ + eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), + eventBroadcaster.StartRecordingToSink( + &typedcorev1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events(system.Namespace())}), + } + recorder = eventBroadcaster.NewRecorder( + scheme.Scheme, corev1.EventSource{Component: component}) + go func() { + <-ctx.Done() + for _, w := range watches { + w.Stop() + } + }() + } + + // Create a unique identifier so that two controllers on the same host don't + // race. + id, err := kle.UniqueID() + if err != nil { + logger.Fatalw("Failed to get unique ID for leader election", zap.Error(err)) + } + logger.Infof("%v will run in leader-elected mode with id %v", component, id) + + // rl is the resource used to hold the leader election lock. + rl, err := resourcelock.New(leConfig.ResourceLock, + system.Namespace(), // use namespace we are running in + component, // component is used as the resource name + kubeclient.Get(ctx).CoreV1(), + kubeclient.Get(ctx).CoordinationV1(), + resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorder, + }) + if err != nil { + logger.Fatalw("Error creating lock: %v", err) + } + + // Execute the `run` function when we have the lock. + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: leConfig.LeaseDuration, + RenewDeadline: leConfig.RenewDeadline, + RetryPeriod: leConfig.RetryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: run, + OnStoppedLeading: func() { + logger.Fatal("leaderelection lost") + }, + }, + // TODO: use health check watchdog, knative/pkg#1048 + Name: component, + }) +} diff --git a/vendor/knative.dev/pkg/leaderelection/config.go b/vendor/knative.dev/pkg/leaderelection/config.go new file mode 100644 index 00000000..1e547057 --- /dev/null +++ b/vendor/knative.dev/pkg/leaderelection/config.go @@ -0,0 +1,158 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "errors" + "fmt" + "os" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" +) + +const ConfigMapNameEnv = "CONFIG_LEADERELECTION_NAME" + +var ( + errEmptyLeaderElectionConfig = errors.New("empty leader election configuration") + validResourceLocks = sets.NewString("leases", "configmaps", "endpoints") +) + +// NewConfigFromMap returns a Config for the given map, or an error. +func NewConfigFromMap(data map[string]string) (*Config, error) { + config := &Config{ + EnabledComponents: sets.NewString(), + } + + if resourceLock := data["resourceLock"]; !validResourceLocks.Has(resourceLock) { + return nil, fmt.Errorf(`resourceLock: invalid value %q: valid values are "leases","configmaps","endpoints"`, resourceLock) + } else { + config.ResourceLock = resourceLock + } + + if leaseDuration, err := time.ParseDuration(data["leaseDuration"]); err != nil { + return nil, fmt.Errorf("leaseDuration: invalid duration: %q", data["leaseDuration"]) + } else { + config.LeaseDuration = leaseDuration + } + + if renewDeadline, err := time.ParseDuration(data["renewDeadline"]); err != nil { + return nil, fmt.Errorf("renewDeadline: invalid duration: %q", data["renewDeadline"]) + } else { + config.RenewDeadline = renewDeadline + } + + if retryPeriod, err := time.ParseDuration(data["retryPeriod"]); err != nil { + return nil, fmt.Errorf("retryPeriod: invalid duration: %q", data["retryPeriod"]) + } else { + config.RetryPeriod = retryPeriod + } + + // enabledComponents are not validated here, because they are dependent on + // the component. Components should provide additional validation for this + // field. + if enabledComponents, ok := data["enabledComponents"]; ok { + tokens := strings.Split(enabledComponents, ",") + config.EnabledComponents = sets.NewString(tokens...) + } + + return config, nil +} + +// NewConfigFromConfigMap returns a new Config from the given ConfigMap. +func NewConfigFromConfigMap(configMap *corev1.ConfigMap) (*Config, error) { + if configMap == nil { + config := defaultConfig() + return config, nil + } + + return NewConfigFromMap(configMap.Data) +} + +// Config represents the leader election config for a set of components +// contained within a single namespace. Typically these will correspond to a +// single source repository, viz: serving or eventing. +type Config struct { + ResourceLock string + LeaseDuration time.Duration + RenewDeadline time.Duration + RetryPeriod time.Duration + EnabledComponents sets.String +} + +func (c *Config) GetComponentConfig(name string) ComponentConfig { + if c.EnabledComponents.Has(name) { + return ComponentConfig{ + LeaderElect: true, + ResourceLock: c.ResourceLock, + LeaseDuration: c.LeaseDuration, + RenewDeadline: c.RenewDeadline, + RetryPeriod: c.RetryPeriod, + } + } + + return defaultComponentConfig() +} + +func defaultConfig() *Config { + return &Config{ + ResourceLock: "leases", + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + EnabledComponents: sets.NewString(), + } +} + +// ComponentConfig represents the leader election config for a single component. +type ComponentConfig struct { + LeaderElect bool + ResourceLock string + LeaseDuration time.Duration + RenewDeadline time.Duration + RetryPeriod time.Duration +} + +func defaultComponentConfig() ComponentConfig { + return ComponentConfig{ + LeaderElect: false, + } +} + +// ConfigMapName returns the name of the configmap to read for leader election +// settings. +func ConfigMapName() string { + cm := os.Getenv(ConfigMapNameEnv) + if cm == "" { + return "config-leader-election" + } + return cm +} + +// UniqueID returns a unique ID for use with a leader elector that prevents from +// pods running on the same host from colliding with one another. +func UniqueID() (string, error) { + id, err := os.Hostname() + if err != nil { + return "", err + } + + return (id + "_" + string(uuid.NewUUID())), nil +} diff --git a/vendor/knative.dev/pkg/metrics/README.md b/vendor/knative.dev/pkg/metrics/README.md index 49d3969c..bbbf2084 100644 --- a/vendor/knative.dev/pkg/metrics/README.md +++ b/vendor/knative.dev/pkg/metrics/README.md @@ -94,8 +94,8 @@ statistics for a short period of time if not. **This is true today.** [Ensure this on an ongoing basis.](https://github.com/knative/pkg/issues/957) - [ ] Google to implement OpenCensus Agent configuration to match what they are - doing for Stackdriver now. (No public issue link because this should be - in Google's vendor-specific configuration.) + doing for Stackdriver now. (No public issue link because this should be in + Google's vendor-specific configuration.) - [ ] Document how to configure OpenCensus/OpenTelemetry Agent + Prometheus to achieve the current level of application visibility, and determine a long-term course for how to maintain this as a "bare minimum" supported diff --git a/vendor/knative.dev/pkg/metrics/testdata/README.md b/vendor/knative.dev/pkg/metrics/testdata/README.md index 12444d6e..7bc8f0c8 100644 --- a/vendor/knative.dev/pkg/metrics/testdata/README.md +++ b/vendor/knative.dev/pkg/metrics/testdata/README.md @@ -6,9 +6,11 @@ The cert files were generated with: openssl req -x509 -nodes -newkey dsa:<(openssl dsaparam 1024) -keyout client-key.pem -out client-cert.pem -days 10000 ``` -Note that there are some manual prompts later in the process. This seemed simpler than generating the certs in Go. +Note that there are some manual prompts later in the process. This seemed +simpler than generating the certs in Go. To view the cert: + ```shell openssl x509 -noout -text -in client-cert.pem ```