Merge pull request #129438 from pacoxu/apiserver-probe-etcd

add etcd server overrides to etcd probe factory for healthz and readyz

Kubernetes-commit: b569406b792fef24bb5613f1263ea354755bbae0
This commit is contained in:
Kubernetes Publisher 2025-06-19 11:30:52 -07:00
commit d16c916aad
5 changed files with 157 additions and 31 deletions

4
go.mod
View File

@ -49,8 +49,8 @@ require (
gopkg.in/go-jose/go-jose.v2 v2.6.3 gopkg.in/go-jose/go-jose.v2 v2.6.3
gopkg.in/natefinch/lumberjack.v2 v2.2.1 gopkg.in/natefinch/lumberjack.v2 v2.2.1
k8s.io/api v0.0.0-20250616192729-f4a3fcd2245e k8s.io/api v0.0.0-20250616192729-f4a3fcd2245e
k8s.io/apimachinery v0.0.0-20250617032454-d2d60b8eb21f k8s.io/apimachinery v0.0.0-20250618223650-ae7698643b3d
k8s.io/client-go v0.0.0-20250617175059-212c32462b4e k8s.io/client-go v0.0.0-20250619112734-41574813b1be
k8s.io/component-base v0.0.0-20250617034450-dc0881cd03da k8s.io/component-base v0.0.0-20250617034450-dc0881cd03da
k8s.io/klog/v2 v2.130.1 k8s.io/klog/v2 v2.130.1
k8s.io/kms v0.0.0-20250527175117-e6cd4d7331a4 k8s.io/kms v0.0.0-20250527175117-e6cd4d7331a4

8
go.sum
View File

@ -295,10 +295,10 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.0.0-20250616192729-f4a3fcd2245e h1:YVWWCL8/51Adbyo/pps3kAK1Asi7W8RSFVU9JbJJAkU= k8s.io/api v0.0.0-20250616192729-f4a3fcd2245e h1:YVWWCL8/51Adbyo/pps3kAK1Asi7W8RSFVU9JbJJAkU=
k8s.io/api v0.0.0-20250616192729-f4a3fcd2245e/go.mod h1:+9QbMyXTXctHAXg3fdhJbuZgyzhYgprCn43M5NqoJzw= k8s.io/api v0.0.0-20250616192729-f4a3fcd2245e/go.mod h1:+9QbMyXTXctHAXg3fdhJbuZgyzhYgprCn43M5NqoJzw=
k8s.io/apimachinery v0.0.0-20250617032454-d2d60b8eb21f h1:+TyAxkvb3oaCIN4Ev+o9FvyFVd1VDcWHHrEKB5V6zRE= k8s.io/apimachinery v0.0.0-20250618223650-ae7698643b3d h1:9qxYMmJsUps3r/rHWWQ4G/1Ny3Z7MafzXfdslORyPmg=
k8s.io/apimachinery v0.0.0-20250617032454-d2d60b8eb21f/go.mod h1:/kMnP8WowZRse4LjwOCxh+RR0W03jl2f/gCRYqeYMZA= k8s.io/apimachinery v0.0.0-20250618223650-ae7698643b3d/go.mod h1:/kMnP8WowZRse4LjwOCxh+RR0W03jl2f/gCRYqeYMZA=
k8s.io/client-go v0.0.0-20250617175059-212c32462b4e h1:5Ogeq5djCwGHvwM9iMLzNbE2niwzrdpBg+j3UtPYtcI= k8s.io/client-go v0.0.0-20250619112734-41574813b1be h1:1q8TEC0FSoYbSKvQXw0hDZX9eYiko816DT/s0Lcd5Bc=
k8s.io/client-go v0.0.0-20250617175059-212c32462b4e/go.mod h1:oP97QArDmRQMXTG+rljxoy/wvK5GEl6w8Dp6Y9mloSA= k8s.io/client-go v0.0.0-20250619112734-41574813b1be/go.mod h1:svnXN0zcYP88CRMk6Sg32RnUVb5a8uqJG1QOrMsPYBk=
k8s.io/component-base v0.0.0-20250617034450-dc0881cd03da h1:JIslv7EMZ/AS8fKVL2ttpIbKiLKiMc4FEBiFepcaHCk= k8s.io/component-base v0.0.0-20250617034450-dc0881cd03da h1:JIslv7EMZ/AS8fKVL2ttpIbKiLKiMc4FEBiFepcaHCk=
k8s.io/component-base v0.0.0-20250617034450-dc0881cd03da/go.mod h1:iMJjQa+o/mr6ZuRxeZJyv/X6qV7SlKwQtmX4TSYQais= k8s.io/component-base v0.0.0-20250617034450-dc0881cd03da/go.mod h1:iMJjQa+o/mr6ZuRxeZJyv/X6qV7SlKwQtmX4TSYQais=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=

View File

@ -43,6 +43,11 @@ type HealthChecker interface {
Check(req *http.Request) error Check(req *http.Request) error
} }
type GroupedHealthChecker interface {
HealthChecker
GroupName() string
}
// PingHealthz returns true automatically when checked // PingHealthz returns true automatically when checked
var PingHealthz HealthChecker = ping{} var PingHealthz HealthChecker = ping{}
@ -151,6 +156,14 @@ func NamedCheck(name string, check func(r *http.Request) error) HealthChecker {
return &healthzCheck{name, check} return &healthzCheck{name, check}
} }
// NamedGroupedCheck returns a healthz checker for the given name and function.
func NamedGroupedCheck(name string, groupName string, check func(r *http.Request) error) HealthChecker {
return &groupedHealthzCheck{
groupName: groupName,
HealthChecker: &healthzCheck{name, check},
}
}
// InstallHandler registers handlers for health checking on the path // InstallHandler registers handlers for health checking on the path
// "/healthz" to mux. *All handlers* for mux must be specified in // "/healthz" to mux. *All handlers* for mux must be specified in
// exactly one call to InstallHandler. Calling InstallHandler more // exactly one call to InstallHandler. Calling InstallHandler more
@ -232,6 +245,17 @@ func (c *healthzCheck) Check(r *http.Request) error {
return c.check(r) return c.check(r)
} }
type groupedHealthzCheck struct {
groupName string
HealthChecker
}
var _ GroupedHealthChecker = &groupedHealthzCheck{}
func (c *groupedHealthzCheck) GroupName() string {
return c.groupName
}
// getExcludedChecks extracts the health check names to be excluded from the query param // getExcludedChecks extracts the health check names to be excluded from the query param
func getExcludedChecks(r *http.Request) sets.String { func getExcludedChecks(r *http.Request) sets.String {
checks, found := r.URL.Query()["exclude"] checks, found := r.URL.Query()["exclude"]
@ -246,6 +270,7 @@ func handleRootHealth(name string, firstTimeHealthy func(), checks ...HealthChec
var notifyOnce sync.Once var notifyOnce sync.Once
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
excluded := getExcludedChecks(r) excluded := getExcludedChecks(r)
unknownExcluded := excluded.Clone()
// failedVerboseLogOutput is for output to the log. It indicates detailed failed output information for the log. // failedVerboseLogOutput is for output to the log. It indicates detailed failed output information for the log.
var failedVerboseLogOutput bytes.Buffer var failedVerboseLogOutput bytes.Buffer
var failedChecks []string var failedChecks []string
@ -253,10 +278,16 @@ func handleRootHealth(name string, firstTimeHealthy func(), checks ...HealthChec
for _, check := range checks { for _, check := range checks {
// no-op the check if we've specified we want to exclude the check // no-op the check if we've specified we want to exclude the check
if excluded.Has(check.Name()) { if excluded.Has(check.Name()) {
excluded.Delete(check.Name()) unknownExcluded.Delete(check.Name())
fmt.Fprintf(&individualCheckOutput, "[+]%s excluded: ok\n", check.Name()) fmt.Fprintf(&individualCheckOutput, "[+]%s excluded: ok\n", check.Name())
continue continue
} }
// no-op the check if it is a grouped check and we want to exclude the group
if check, ok := check.(GroupedHealthChecker); ok && excluded.Has(check.GroupName()) {
unknownExcluded.Delete(check.GroupName())
fmt.Fprintf(&individualCheckOutput, "[+]%s excluded with %s: ok\n", check.Name(), check.GroupName())
continue
}
if err := check.Check(r); err != nil { if err := check.Check(r); err != nil {
slis.ObserveHealthcheck(context.Background(), check.Name(), name, slis.Error) slis.ObserveHealthcheck(context.Background(), check.Name(), name, slis.Error)
// don't include the error since this endpoint is public. If someone wants more detail // don't include the error since this endpoint is public. If someone wants more detail
@ -270,10 +301,10 @@ func handleRootHealth(name string, firstTimeHealthy func(), checks ...HealthChec
fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", check.Name()) fmt.Fprintf(&individualCheckOutput, "[+]%s ok\n", check.Name())
} }
} }
if excluded.Len() > 0 { if unknownExcluded.Len() > 0 {
fmt.Fprintf(&individualCheckOutput, "warn: some health checks cannot be excluded: no matches for %s\n", formatQuoted(excluded.List()...)) fmt.Fprintf(&individualCheckOutput, "warn: some health checks cannot be excluded: no matches for %s\n", formatQuoted(unknownExcluded.List()...))
klog.V(6).Infof("cannot exclude some health checks, no health checks are installed matching %s", klog.V(6).Infof("cannot exclude some health checks, no health checks are installed matching %s",
formatQuoted(excluded.List()...)) formatQuoted(unknownExcluded.List()...))
} }
// always be verbose on failure // always be verbose on failure
if len(failedChecks) > 0 { if len(failedChecks) > 0 {

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -106,19 +107,8 @@ func (s *EtcdOptions) Validate() []error {
allErrors = append(allErrors, fmt.Errorf("--storage-backend invalid, allowed values: %s. If not specified, it will default to 'etcd3'", strings.Join(storageTypes.List(), ", "))) allErrors = append(allErrors, fmt.Errorf("--storage-backend invalid, allowed values: %s. If not specified, it will default to 'etcd3'", strings.Join(storageTypes.List(), ", ")))
} }
for _, override := range s.EtcdServersOverrides { if _, err := ParseEtcdServersOverrides(s.EtcdServersOverrides); err != nil {
tokens := strings.Split(override, "#") allErrors = append(allErrors, err)
if len(tokens) != 2 {
allErrors = append(allErrors, fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated"))
continue
}
apiresource := strings.Split(tokens[0], "/")
if len(apiresource) != 2 {
allErrors = append(allErrors, fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated"))
continue
}
} }
if len(s.EncryptionProviderConfigFilepath) == 0 && s.EncryptionProviderConfigAutomaticReload { if len(s.EncryptionProviderConfigFilepath) == 0 && s.EncryptionProviderConfigAutomaticReload {
@ -364,7 +354,7 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
if err != nil { if err != nil {
return err return err
} }
c.AddHealthChecks(healthz.NamedCheck("etcd", func(r *http.Request) error { c.AddHealthChecks(healthz.NamedGroupedCheck("etcd", "etcd", func(r *http.Request) error {
return healthCheck() return healthCheck()
})) }))
@ -372,10 +362,51 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
if err != nil { if err != nil {
return err return err
} }
c.AddReadyzChecks(healthz.NamedCheck("etcd-readiness", func(r *http.Request) error { c.AddReadyzChecks(healthz.NamedGroupedCheck("etcd-readiness", "etcd-readiness", func(r *http.Request) error {
return readyCheck() return readyCheck()
})) }))
if len(s.EtcdServersOverrides) != 0 {
// multi overrides servers may in different order
// example: ["apps/deployments#s2.example.com;s1.example.com","apps/replicasets#s1.example.com;s2.example.com"]
overrides, err := ParseEtcdServersOverrides(s.EtcdServersOverrides)
if err != nil {
return err
}
// multi overrides may point to the same servers
// example: ["apps/deployments#s1.example.com;s2.example.com","apps/replicasets#s1.example.com;s2.example.com"]
serversSets := sets.NewString()
for _, override := range overrides {
sortedServers := make([]string, len(override.Servers))
// use a copied slice to avoid modifying the original slice for client in SetEtcdLocation
copy(sortedServers, override.Servers)
sort.Strings(sortedServers)
serversKeyStr := strings.Join(sortedServers, ";")
if serversSets.Has(serversKeyStr) {
continue
}
serversSets.Insert(serversKeyStr)
sc := s.StorageConfig
sc.Transport.ServerList = override.Servers
healthCheck, err := storagefactory.CreateHealthCheck(sc, c.DrainedNotify())
if err != nil {
return err
}
c.AddHealthChecks(healthz.NamedGroupedCheck(fmt.Sprintf("etcd-override-%d", len(serversSets)-1), "etcd", func(r *http.Request) error {
return healthCheck()
}))
readyCheck, err := storagefactory.CreateReadyCheck(sc, c.DrainedNotify())
if err != nil {
return err
}
c.AddReadyzChecks(healthz.NamedGroupedCheck(fmt.Sprintf("etcd-override-readiness-%d", len(serversSets)-1), "etcd-readiness", func(r *http.Request) error {
return readyCheck()
}))
}
}
return nil return nil
} }
@ -519,3 +550,35 @@ func (t *transformerStorageFactory) Configs() []storagebackend.Config {
func (t *transformerStorageFactory) Backends() []serverstorage.Backend { func (t *transformerStorageFactory) Backends() []serverstorage.Backend {
return t.delegate.Backends() return t.delegate.Backends()
} }
type EtcdServerOverride struct {
GroupResource schema.GroupResource
Servers []string
}
var errOverridesInvalid = fmt.Errorf("--etcd-servers-overrides invalid, must be of format: group/resource#servers, where servers are URLs, semicolon separated")
func ParseEtcdServersOverrides(etcdServersOverrides []string) ([]EtcdServerOverride, error) {
var overrides []EtcdServerOverride
for _, override := range etcdServersOverrides {
tokens := strings.Split(override, "#")
if len(tokens) != 2 {
return nil, errOverridesInvalid
}
apiresource := strings.Split(tokens[0], "/")
if len(apiresource) != 2 {
return nil, errOverridesInvalid
}
servers := strings.Split(tokens[1], ";")
for _, server := range servers {
if len(server) == 0 {
return nil, errOverridesInvalid
}
}
overrides = append(overrides, EtcdServerOverride{
GroupResource: schema.GroupResource{Group: apiresource[0], Resource: apiresource[1]},
Servers: servers,
})
}
return overrides, nil
}

View File

@ -375,11 +375,12 @@ func TestKMSHealthzEndpoint(t *testing.T) {
func TestReadinessCheck(t *testing.T) { func TestReadinessCheck(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
wantReadyzChecks []string wantReadyzChecks []string
wantHealthzChecks []string wantHealthzChecks []string
wantLivezChecks []string wantLivezChecks []string
skipHealth bool skipHealth bool
etcdServersOverrides []string
}{ }{
{ {
name: "Readyz should have etcd-readiness check", name: "Readyz should have etcd-readiness check",
@ -394,6 +395,37 @@ func TestReadinessCheck(t *testing.T) {
wantLivezChecks: nil, wantLivezChecks: nil,
skipHealth: true, skipHealth: true,
}, },
{
name: "Health checks should not have duplicated servers from etcd-servers-overrides",
wantReadyzChecks: []string{"etcd", "etcd-readiness", "etcd-override-0", "etcd-override-readiness-0"},
wantHealthzChecks: []string{"etcd", "etcd-override-0"},
wantLivezChecks: []string{"etcd", "etcd-override-0"},
etcdServersOverrides: []string{"/r1#s1.com;s2.com", "/r2#s1.com;s2.com"},
},
{
name: "Health checks should not have duplicated servers from etcd-servers-overrides " +
"if servers are provided in different orders",
wantReadyzChecks: []string{"etcd", "etcd-readiness", "etcd-override-0", "etcd-override-readiness-0"},
wantHealthzChecks: []string{"etcd", "etcd-override-0"},
wantLivezChecks: []string{"etcd", "etcd-override-0"},
etcdServersOverrides: []string{"/r1#s1.com;s2.com", "/r2#s2.com;s1.com"},
},
{
name: "Health checks should allow multiple overrides in etcd-servers-overrides",
wantReadyzChecks: []string{"etcd", "etcd-readiness", "etcd-override-0", "etcd-override-readiness-0",
"etcd-override-1", "etcd-override-readiness-1"},
wantHealthzChecks: []string{"etcd", "etcd-override-0", "etcd-override-1"},
wantLivezChecks: []string{"etcd", "etcd-override-0", "etcd-override-1"},
etcdServersOverrides: []string{"/r1#s1.com;s2.com", "/r2#s3.com;s4.com"},
},
{
name: "Health checks should allow multiple overrides in etcd-servers-overrides if servers overlap between overrides",
wantReadyzChecks: []string{"etcd", "etcd-readiness", "etcd-override-0", "etcd-override-readiness-0",
"etcd-override-1", "etcd-override-readiness-1"},
wantHealthzChecks: []string{"etcd", "etcd-override-0", "etcd-override-1"},
wantLivezChecks: []string{"etcd", "etcd-override-0", "etcd-override-1"},
etcdServersOverrides: []string{"/r1#s1.com;s2.com", "/r2#s2.com;s3.com"},
},
} }
scheme := runtime.NewScheme() scheme := runtime.NewScheme()
@ -402,7 +434,7 @@ func TestReadinessCheck(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
serverConfig := server.NewConfig(codecs) serverConfig := server.NewConfig(codecs)
etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth} etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth, EtcdServersOverrides: tc.etcdServersOverrides}
if err := etcdOptions.ApplyTo(serverConfig); err != nil { if err := etcdOptions.ApplyTo(serverConfig); err != nil {
t.Fatalf("Failed to add healthz error: %v", err) t.Fatalf("Failed to add healthz error: %v", err)
} }