diff --git a/pkg/server/healthz/healthz.go b/pkg/server/healthz/healthz.go index f22bbfcad..b2d0007f5 100644 --- a/pkg/server/healthz/healthz.go +++ b/pkg/server/healthz/healthz.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "net/http" + "reflect" "strings" "sync" "sync/atomic" @@ -29,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/server/httplog" - "k8s.io/client-go/informers" "k8s.io/klog/v2" ) @@ -82,16 +82,20 @@ func (l *log) Check(_ *http.Request) error { return fmt.Errorf("logging blocked") } +type cacheSyncWaiter interface { + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool +} + type informerSync struct { - sharedInformerFactory informers.SharedInformerFactory + cacheSyncWaiter cacheSyncWaiter } var _ HealthChecker = &informerSync{} -// NewInformerSyncHealthz returns a new HealthChecker that will pass only if all informers in the given sharedInformerFactory sync. -func NewInformerSyncHealthz(sharedInformerFactory informers.SharedInformerFactory) HealthChecker { +// NewInformerSyncHealthz returns a new HealthChecker that will pass only if all informers in the given cacheSyncWaiter sync. +func NewInformerSyncHealthz(cacheSyncWaiter cacheSyncWaiter) HealthChecker { return &informerSync{ - sharedInformerFactory: sharedInformerFactory, + cacheSyncWaiter: cacheSyncWaiter, } } @@ -104,8 +108,8 @@ func (i *informerSync) Check(_ *http.Request) error { // Close stopCh to force checking if informers are synced now. close(stopCh) - var informersByStarted map[bool][]string - for informerType, started := range i.sharedInformerFactory.WaitForCacheSync(stopCh) { + informersByStarted := make(map[bool][]string) + for informerType, started := range i.cacheSyncWaiter.WaitForCacheSync(stopCh) { informersByStarted[started] = append(informersByStarted[started], informerType.String()) } diff --git a/pkg/server/healthz/healthz_test.go b/pkg/server/healthz/healthz_test.go index 4f99739ae..8356f001f 100644 --- a/pkg/server/healthz/healthz_test.go +++ b/pkg/server/healthz/healthz_test.go @@ -26,6 +26,7 @@ import ( "strings" "testing" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/component-base/metrics/legacyregistry" @@ -273,3 +274,43 @@ func createGetRequestWithUrl(rawUrlString string) *http.Request { URL: url, } } + +func TestInformerSyncHealthChecker(t *testing.T) { + t.Run("test that check returns nil when all informers are started", func(t *testing.T) { + healthChecker := NewInformerSyncHealthz(cacheSyncWaiterStub{ + startedByInformerType: map[reflect.Type]bool{ + reflect.TypeOf(corev1.Pod{}): true, + }, + }) + + err := healthChecker.Check(nil) + if err != nil { + t.Errorf("Got %v, expected no error", err) + } + }) + + t.Run("test that check returns err when there is not started informer", func(t *testing.T) { + healthChecker := NewInformerSyncHealthz(cacheSyncWaiterStub{ + startedByInformerType: map[reflect.Type]bool{ + reflect.TypeOf(corev1.Pod{}): true, + reflect.TypeOf(corev1.Service{}): false, + reflect.TypeOf(corev1.Node{}): true, + }, + }) + + err := healthChecker.Check(nil) + if err == nil { + t.Errorf("expected error, got: %v", err) + } + }) +} + +type cacheSyncWaiterStub struct { + startedByInformerType map[reflect.Type]bool +} + +// WaitForCacheSync is a stub implementation of the corresponding func +// that simply returns the value passed during stub initialization. +func (s cacheSyncWaiterStub) WaitForCacheSync(_ <-chan struct{}) map[reflect.Type]bool { + return s.startedByInformerType +}