From 2bff7444978dc48d9eccf68c67987c7d48d1e81b Mon Sep 17 00:00:00 2001 From: Paco Xu Date: Tue, 31 Dec 2024 16:29:35 +0800 Subject: [PATCH 1/2] add etcd server overrides to etcd probe factory for healthz and readyz Kubernetes-commit: 8bc7e6c10e0a4dceb738318a3a34f6af94e06158 --- pkg/server/options/etcd.go | 40 ++++++++++++++++++++++++++++ pkg/storage/storagebackend/config.go | 35 ++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/pkg/server/options/etcd.go b/pkg/server/options/etcd.go index 8c3fcfeff..101d3f5ee 100644 --- a/pkg/server/options/etcd.go +++ b/pkg/server/options/etcd.go @@ -376,6 +376,46 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error { return readyCheck() })) + if len(s.EtcdServersOverrides) != 0 { + if err := s.addOverrideEtcdHealthEndpoint(c); err != nil { + return err + } + } + + return nil +} + +func (s *EtcdOptions) addOverrideEtcdHealthEndpoint(c *server.Config) error { + sc := s.StorageConfig + serverSet := sets.Set[string]{} + + for _, override := range s.EtcdServersOverrides { + tokens := strings.Split(override, "#") + servers := strings.Split(tokens[1], ";") + for _, server := range servers { + serverSet.Insert(server) + } + } + if serverSet.Len() != 0 { + sc.Transport.ServerList = serverSet.UnsortedList() + } + + healthCheck, err := storagefactory.CreateHealthCheck(sc, c.DrainedNotify()) + if err != nil { + return err + } + c.AddHealthChecks(healthz.NamedCheck("etcd-override", func(r *http.Request) error { + return healthCheck() + })) + + readyCheck, err := storagefactory.CreateReadyCheck(sc, c.DrainedNotify()) + if err != nil { + return err + } + c.AddReadyzChecks(healthz.NamedCheck("etcd-override-readiness", func(r *http.Request) error { + return readyCheck() + })) + return nil } diff --git a/pkg/storage/storagebackend/config.go b/pkg/storage/storagebackend/config.go index c948d6411..297d0cfc0 100644 --- a/pkg/storage/storagebackend/config.go +++ b/pkg/storage/storagebackend/config.go @@ -125,3 +125,38 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { Transport: TransportConfig{TracerProvider: noopoteltrace.NewTracerProvider()}, } } + +// DeepCopy returns a deep copy of the Config. +func (config *Config) DeepCopy() *Config { + if config == nil { + return nil + } + // Deep copy the transport config + transport := config.Transport + transportCopy := TransportConfig{ + ServerList: make([]string, len(transport.ServerList)), + KeyFile: transport.KeyFile, + CertFile: transport.CertFile, + TrustedCAFile: transport.TrustedCAFile, + EgressLookup: transport.EgressLookup, + TracerProvider: transport.TracerProvider, + } + copy(transportCopy.ServerList, transport.ServerList) + + return &Config{ + Type: config.Type, + Prefix: config.Prefix, + Transport: transportCopy, + Codec: config.Codec, + EncodeVersioner: config.EncodeVersioner, + Transformer: config.Transformer, + CompactionInterval: config.CompactionInterval, + CountMetricPollPeriod: config.CountMetricPollPeriod, + DBMetricPollInterval: config.DBMetricPollInterval, + EventsHistoryWindow: config.EventsHistoryWindow, + HealthcheckTimeout: config.HealthcheckTimeout, + ReadycheckTimeout: config.ReadycheckTimeout, + LeaseManagerConfig: config.LeaseManagerConfig, + StorageObjectCountTracker: config.StorageObjectCountTracker, + } +} From 9388b76b2182998beeb33afd9aef458dce7124e9 Mon Sep 17 00:00:00 2001 From: Paco Xu Date: Sun, 26 Jan 2025 15:05:59 +0800 Subject: [PATCH 2/2] add seperate health check/probe for multi etcd override servers - grouping health checks for exclusion purposes & add exclude integration test Signed-off-by: Paco Xu Kubernetes-commit: 891e7fec6e17382389b01f590888375a29491afc --- pkg/server/healthz/healthz.go | 39 ++++++++- pkg/server/options/etcd.go | 117 ++++++++++++++++----------- pkg/server/options/etcd_test.go | 44 ++++++++-- pkg/storage/storagebackend/config.go | 35 -------- 4 files changed, 143 insertions(+), 92 deletions(-) diff --git a/pkg/server/healthz/healthz.go b/pkg/server/healthz/healthz.go index 730929331..2783b8cb2 100644 --- a/pkg/server/healthz/healthz.go +++ b/pkg/server/healthz/healthz.go @@ -43,6 +43,11 @@ type HealthChecker interface { Check(req *http.Request) error } +type GroupedHealthChecker interface { + HealthChecker + GroupName() string +} + // PingHealthz returns true automatically when checked var PingHealthz HealthChecker = ping{} @@ -151,6 +156,14 @@ func NamedCheck(name string, check func(r *http.Request) error) HealthChecker { return &healthzCheck{name, check} } +// NamedGroupedCheck returns a healthz checker for the given name and function. +func NamedGroupedCheck(name string, groupName string, check func(r *http.Request) error) HealthChecker { + return &groupedHealthzCheck{ + groupName: groupName, + HealthChecker: &healthzCheck{name, check}, + } +} + // InstallHandler registers handlers for health checking on the path // "/healthz" to mux. *All handlers* for mux must be specified in // exactly one call to InstallHandler. Calling InstallHandler more @@ -232,6 +245,17 @@ func (c *healthzCheck) Check(r *http.Request) error { return c.check(r) } +type groupedHealthzCheck struct { + groupName string + HealthChecker +} + +var _ GroupedHealthChecker = &groupedHealthzCheck{} + +func (c *groupedHealthzCheck) GroupName() string { + return c.groupName +} + // getExcludedChecks extracts the health check names to be excluded from the query param func getExcludedChecks(r *http.Request) sets.String { checks, found := r.URL.Query()["exclude"] @@ -246,6 +270,7 @@ func handleRootHealth(name string, firstTimeHealthy func(), checks ...HealthChec var notifyOnce sync.Once return func(w http.ResponseWriter, r *http.Request) { excluded := getExcludedChecks(r) + unknownExcluded := excluded.Clone() // failedVerboseLogOutput is for output to the log. It indicates detailed failed output information for the log. var failedVerboseLogOutput bytes.Buffer var failedChecks []string @@ -253,10 +278,16 @@ func handleRootHealth(name string, firstTimeHealthy func(), checks ...HealthChec for _, check := range checks { // no-op the check if we've specified we want to exclude the check if excluded.Has(check.Name()) { - excluded.Delete(check.Name()) + unknownExcluded.Delete(check.Name()) fmt.Fprintf(&individualCheckOutput, "[+]%s excluded: ok\n", check.Name()) continue } + // no-op the check if it is a grouped check and we want to exclude the group + if check, ok := check.(GroupedHealthChecker); ok && excluded.Has(check.GroupName()) { + unknownExcluded.Delete(check.GroupName()) + fmt.Fprintf(&individualCheckOutput, "[+]%s excluded with %s: ok\n", check.Name(), check.GroupName()) + continue + } if err := check.Check(r); err != nil { slis.ObserveHealthcheck(context.Background(), check.Name(), name, slis.Error) // don't include the error since this endpoint is public. If someone wants more detail @@ -270,10 +301,10 @@ func handleRootHealth(name string, firstTimeHealthy func(), checks ...HealthChec fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", check.Name()) } } - if excluded.Len() > 0 { - fmt.Fprintf(&individualCheckOutput, "warn: some health checks cannot be excluded: no matches for %s\n", formatQuoted(excluded.List()...)) + if unknownExcluded.Len() > 0 { + fmt.Fprintf(&individualCheckOutput, "warn: some health checks cannot be excluded: no matches for %s\n", formatQuoted(unknownExcluded.List()...)) klog.V(6).Infof("cannot exclude some health checks, no health checks are installed matching %s", - formatQuoted(excluded.List()...)) + formatQuoted(unknownExcluded.List()...)) } // always be verbose on failure if len(failedChecks) > 0 { diff --git a/pkg/server/options/etcd.go b/pkg/server/options/etcd.go index 101d3f5ee..8f73749c5 100644 --- a/pkg/server/options/etcd.go +++ b/pkg/server/options/etcd.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "sort" "strconv" "strings" "time" @@ -106,19 +107,8 @@ func (s *EtcdOptions) Validate() []error { allErrors = append(allErrors, fmt.Errorf("--storage-backend invalid, allowed values: %s. If not specified, it will default to 'etcd3'", strings.Join(storageTypes.List(), ", "))) } - for _, override := range s.EtcdServersOverrides { - tokens := strings.Split(override, "#") - if len(tokens) != 2 { - allErrors = append(allErrors, fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated")) - continue - } - - apiresource := strings.Split(tokens[0], "/") - if len(apiresource) != 2 { - allErrors = append(allErrors, fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated")) - continue - } - + if _, err := ParseEtcdServersOverrides(s.EtcdServersOverrides); err != nil { + allErrors = append(allErrors, err) } if len(s.EncryptionProviderConfigFilepath) == 0 && s.EncryptionProviderConfigAutomaticReload { @@ -364,7 +354,7 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error { if err != nil { return err } - c.AddHealthChecks(healthz.NamedCheck("etcd", func(r *http.Request) error { + c.AddHealthChecks(healthz.NamedGroupedCheck("etcd", "etcd", func(r *http.Request) error { return healthCheck() })) @@ -372,50 +362,51 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error { if err != nil { return err } - c.AddReadyzChecks(healthz.NamedCheck("etcd-readiness", func(r *http.Request) error { + c.AddReadyzChecks(healthz.NamedGroupedCheck("etcd-readiness", "etcd-readiness", func(r *http.Request) error { return readyCheck() })) if len(s.EtcdServersOverrides) != 0 { - if err := s.addOverrideEtcdHealthEndpoint(c); err != nil { + // multi overrides servers may in different order + // example: ["apps/deployments#s2.example.com;s1.example.com","apps/replicasets#s1.example.com;s2.example.com"] + overrides, err := ParseEtcdServersOverrides(s.EtcdServersOverrides) + if err != nil { return err } - } + // multi overrides may point to the same servers + // example: ["apps/deployments#s1.example.com;s2.example.com","apps/replicasets#s1.example.com;s2.example.com"] + serversSets := sets.NewString() + for _, override := range overrides { + sortedServers := make([]string, len(override.Servers)) + // use a copied slice to avoid modifying the original slice for client in SetEtcdLocation + copy(sortedServers, override.Servers) + sort.Strings(sortedServers) + serversKeyStr := strings.Join(sortedServers, ";") + if serversSets.Has(serversKeyStr) { + continue + } + serversSets.Insert(serversKeyStr) - return nil -} + sc := s.StorageConfig + sc.Transport.ServerList = override.Servers -func (s *EtcdOptions) addOverrideEtcdHealthEndpoint(c *server.Config) error { - sc := s.StorageConfig - serverSet := sets.Set[string]{} + healthCheck, err := storagefactory.CreateHealthCheck(sc, c.DrainedNotify()) + if err != nil { + return err + } + c.AddHealthChecks(healthz.NamedGroupedCheck(fmt.Sprintf("etcd-override-%d", len(serversSets)-1), "etcd", func(r *http.Request) error { + return healthCheck() + })) - for _, override := range s.EtcdServersOverrides { - tokens := strings.Split(override, "#") - servers := strings.Split(tokens[1], ";") - for _, server := range servers { - serverSet.Insert(server) + readyCheck, err := storagefactory.CreateReadyCheck(sc, c.DrainedNotify()) + if err != nil { + return err + } + c.AddReadyzChecks(healthz.NamedGroupedCheck(fmt.Sprintf("etcd-override-readiness-%d", len(serversSets)-1), "etcd-readiness", func(r *http.Request) error { + return readyCheck() + })) } } - if serverSet.Len() != 0 { - sc.Transport.ServerList = serverSet.UnsortedList() - } - - healthCheck, err := storagefactory.CreateHealthCheck(sc, c.DrainedNotify()) - if err != nil { - return err - } - c.AddHealthChecks(healthz.NamedCheck("etcd-override", func(r *http.Request) error { - return healthCheck() - })) - - readyCheck, err := storagefactory.CreateReadyCheck(sc, c.DrainedNotify()) - if err != nil { - return err - } - c.AddReadyzChecks(healthz.NamedCheck("etcd-override-readiness", func(r *http.Request) error { - return readyCheck() - })) - return nil } @@ -559,3 +550,35 @@ func (t *transformerStorageFactory) Configs() []storagebackend.Config { func (t *transformerStorageFactory) Backends() []serverstorage.Backend { return t.delegate.Backends() } + +type EtcdServerOverride struct { + GroupResource schema.GroupResource + Servers []string +} + +var errOverridesInvalid = fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated") + +func ParseEtcdServersOverrides(etcdServersOverrides []string) ([]EtcdServerOverride, error) { + var overrides []EtcdServerOverride + for _, override := range etcdServersOverrides { + tokens := strings.Split(override, "#") + if len(tokens) != 2 { + return nil, errOverridesInvalid + } + apiresource := strings.Split(tokens[0], "/") + if len(apiresource) != 2 { + return nil, errOverridesInvalid + } + servers := strings.Split(tokens[1], ";") + for _, server := range servers { + if len(server) == 0 { + return nil, errOverridesInvalid + } + } + overrides = append(overrides, EtcdServerOverride{ + GroupResource: schema.GroupResource{Group: apiresource[0], Resource: apiresource[1]}, + Servers: servers, + }) + } + return overrides, nil +} diff --git a/pkg/server/options/etcd_test.go b/pkg/server/options/etcd_test.go index ea4464b24..11008c6cf 100644 --- a/pkg/server/options/etcd_test.go +++ b/pkg/server/options/etcd_test.go @@ -375,11 +375,12 @@ func TestKMSHealthzEndpoint(t *testing.T) { func TestReadinessCheck(t *testing.T) { testCases := []struct { - name string - wantReadyzChecks []string - wantHealthzChecks []string - wantLivezChecks []string - skipHealth bool + name string + wantReadyzChecks []string + wantHealthzChecks []string + wantLivezChecks []string + skipHealth bool + etcdServersOverrides []string }{ { name: "Readyz should have etcd-readiness check", @@ -394,6 +395,37 @@ func TestReadinessCheck(t *testing.T) { wantLivezChecks: nil, skipHealth: true, }, + { + name: "Health checks should not have duplicated servers from etcd-servers-overrides", + wantReadyzChecks: []string{"etcd", "etcd-readiness", "etcd-override-0", "etcd-override-readiness-0"}, + wantHealthzChecks: []string{"etcd", "etcd-override-0"}, + wantLivezChecks: []string{"etcd", "etcd-override-0"}, + etcdServersOverrides: []string{"/r1#s1.com;s2.com", "/r2#s1.com;s2.com"}, + }, + { + name: "Health checks should not have duplicated servers from etcd-servers-overrides " + + "if servers are provided in different orders", + wantReadyzChecks: []string{"etcd", "etcd-readiness", "etcd-override-0", "etcd-override-readiness-0"}, + wantHealthzChecks: []string{"etcd", "etcd-override-0"}, + wantLivezChecks: []string{"etcd", "etcd-override-0"}, + etcdServersOverrides: []string{"/r1#s1.com;s2.com", "/r2#s2.com;s1.com"}, + }, + { + name: "Health checks should allow multiple overrides in etcd-servers-overrides", + wantReadyzChecks: []string{"etcd", "etcd-readiness", "etcd-override-0", "etcd-override-readiness-0", + "etcd-override-1", "etcd-override-readiness-1"}, + wantHealthzChecks: []string{"etcd", "etcd-override-0", "etcd-override-1"}, + wantLivezChecks: []string{"etcd", "etcd-override-0", "etcd-override-1"}, + etcdServersOverrides: []string{"/r1#s1.com;s2.com", "/r2#s3.com;s4.com"}, + }, + { + name: "Health checks should allow multiple overrides in etcd-servers-overrides if servers overlap between overrides", + wantReadyzChecks: []string{"etcd", "etcd-readiness", "etcd-override-0", "etcd-override-readiness-0", + "etcd-override-1", "etcd-override-readiness-1"}, + wantHealthzChecks: []string{"etcd", "etcd-override-0", "etcd-override-1"}, + wantLivezChecks: []string{"etcd", "etcd-override-0", "etcd-override-1"}, + etcdServersOverrides: []string{"/r1#s1.com;s2.com", "/r2#s2.com;s3.com"}, + }, } scheme := runtime.NewScheme() @@ -402,7 +434,7 @@ func TestReadinessCheck(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { serverConfig := server.NewConfig(codecs) - etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth} + etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth, EtcdServersOverrides: tc.etcdServersOverrides} if err := etcdOptions.ApplyTo(serverConfig); err != nil { t.Fatalf("Failed to add healthz error: %v", err) } diff --git a/pkg/storage/storagebackend/config.go b/pkg/storage/storagebackend/config.go index 297d0cfc0..c948d6411 100644 --- a/pkg/storage/storagebackend/config.go +++ b/pkg/storage/storagebackend/config.go @@ -125,38 +125,3 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { Transport: TransportConfig{TracerProvider: noopoteltrace.NewTracerProvider()}, } } - -// DeepCopy returns a deep copy of the Config. -func (config *Config) DeepCopy() *Config { - if config == nil { - return nil - } - // Deep copy the transport config - transport := config.Transport - transportCopy := TransportConfig{ - ServerList: make([]string, len(transport.ServerList)), - KeyFile: transport.KeyFile, - CertFile: transport.CertFile, - TrustedCAFile: transport.TrustedCAFile, - EgressLookup: transport.EgressLookup, - TracerProvider: transport.TracerProvider, - } - copy(transportCopy.ServerList, transport.ServerList) - - return &Config{ - Type: config.Type, - Prefix: config.Prefix, - Transport: transportCopy, - Codec: config.Codec, - EncodeVersioner: config.EncodeVersioner, - Transformer: config.Transformer, - CompactionInterval: config.CompactionInterval, - CountMetricPollPeriod: config.CountMetricPollPeriod, - DBMetricPollInterval: config.DBMetricPollInterval, - EventsHistoryWindow: config.EventsHistoryWindow, - HealthcheckTimeout: config.HealthcheckTimeout, - ReadycheckTimeout: config.ReadycheckTimeout, - LeaseManagerConfig: config.LeaseManagerConfig, - StorageObjectCountTracker: config.StorageObjectCountTracker, - } -}