diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index e5f3927fb..2b5021ea6 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -80,10 +80,22 @@ func NewServer( return nil, err } - endpoints := watcher.NewEndpointsWatcher(k8sAPI, log, enableEndpointSlices) - opaquePorts := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts) - profiles := watcher.NewProfileWatcher(k8sAPI, log) - servers := watcher.NewServerWatcher(k8sAPI, log) + endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, log, enableEndpointSlices) + if err != nil { + return nil, err + } + opaquePorts, err := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts) + if err != nil { + return nil, err + } + profiles, err := watcher.NewProfileWatcher(k8sAPI, log) + if err != nil { + return nil, err + } + servers, err := watcher.NewServerWatcher(k8sAPI, log) + if err != nil { + return nil, err + } srv := server{ pb.UnimplementedDestinationServer{}, diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index 117bdfc25..61a4be2af 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -381,10 +381,22 @@ spec: t.Fatalf("initializeIndexers returned an error: %s", err) } - endpoints := watcher.NewEndpointsWatcher(k8sAPI, log, false) - opaquePorts := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts) - profiles := watcher.NewProfileWatcher(k8sAPI, log) - servers := watcher.NewServerWatcher(k8sAPI, log) + endpoints, err := watcher.NewEndpointsWatcher(k8sAPI, log, false) + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } + opaquePorts, err := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts) + if err != nil { + t.Fatalf("can't create opaque ports watcher: %s", err) + } + profiles, err := watcher.NewProfileWatcher(k8sAPI, log) + if err != nil { + t.Fatalf("can't create profile watcher: %s", err) + } + servers, err := watcher.NewServerWatcher(k8sAPI, log) + if err != nil { + t.Fatalf("can't create Server watcher: %s", err) + } // Sync after creating watchers so that the the indexers added get updated // properly diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index cfc99b1c6..889a8577d 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -134,7 +134,7 @@ var undefinedEndpointPort = Port(0) // NewEndpointsWatcher creates an EndpointsWatcher and begins watching the // k8sAPI for pod, service, and endpoint changes. An EndpointsWatcher will // watch on Endpoints or EndpointSlice resources, depending on cluster configuration. -func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlices bool) *EndpointsWatcher { +func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlices bool) (*EndpointsWatcher, error) { ew := &EndpointsWatcher{ publishers: make(map[ServiceID]*servicePublisher), k8sAPI: k8sAPI, @@ -144,34 +144,46 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlic }), } - k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addService, DeleteFunc: ew.deleteService, UpdateFunc: func(_, obj interface{}) { ew.addService(obj) }, }) + if err != nil { + return nil, err + } - k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addServer, DeleteFunc: ew.deleteServer, UpdateFunc: func(_, obj interface{}) { ew.addServer(obj) }, }) + if err != nil { + return nil, err + } if ew.enableEndpointSlices { ew.log.Debugf("Watching EndpointSlice resources") - k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err := k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addEndpointSlice, DeleteFunc: ew.deleteEndpointSlice, UpdateFunc: ew.updateEndpointSlice, }) + if err != nil { + return nil, err + } } else { ew.log.Debugf("Watching Endpoints resources") - k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err = k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: ew.addEndpoints, DeleteFunc: ew.deleteEndpoints, UpdateFunc: func(_, obj interface{}) { ew.addEndpoints(obj) }, }) + if err != nil { + return nil, err + } } - return ew + return ew, nil } //////////////////////// diff --git a/controller/api/destination/watcher/endpoints_watcher_test.go b/controller/api/destination/watcher/endpoints_watcher_test.go index 0e2019532..e589a35d6 100644 --- a/controller/api/destination/watcher/endpoints_watcher_test.go +++ b/controller/api/destination/watcher/endpoints_watcher_test.go @@ -660,7 +660,10 @@ status: t.Fatalf("NewFakeAPI returned an error: %s", err) } - watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false) + watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false) + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } k8sAPI.Sync(nil) @@ -1276,7 +1279,10 @@ status: t.Fatalf("NewFakeAPI returned an error: %s", err) } - watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true) + watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true) + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } k8sAPI.Sync(nil) @@ -1393,7 +1399,10 @@ status: t.Fatalf("NewFakeAPI returned an error: %s", err) } - watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false) + watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false) + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } k8sAPI.Sync(nil) @@ -1514,7 +1523,10 @@ status: t.Fatalf("NewFakeAPI returned an error: %s", err) } - watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true) + watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), true) + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } k8sAPI.Sync(nil) @@ -1741,7 +1753,10 @@ subsets: t.Fatalf("NewFakeAPI returned an error: %s", err) } - watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices) + watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices) + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } k8sAPI.Sync(nil) @@ -1901,7 +1916,10 @@ subsets: t.Fatalf("NewFakeAPI returned an error: %s", err) } - watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false) + watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false) + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } k8sAPI.Sync(nil) @@ -2024,7 +2042,10 @@ status: t.Fatalf("NewFakeAPI returned an error: %s", err) } - watcher := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false) + watcher, err := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false) + if err != nil { + t.Fatalf("can't create Endpoints watcher: %s", err) + } k8sAPI.Sync(nil) diff --git a/controller/api/destination/watcher/opaque_ports_watcher.go b/controller/api/destination/watcher/opaque_ports_watcher.go index 0b9da47ca..afcde3095 100644 --- a/controller/api/destination/watcher/opaque_ports_watcher.go +++ b/controller/api/destination/watcher/opaque_ports_watcher.go @@ -37,19 +37,23 @@ type ( // NewOpaquePortsWatcher creates a OpaquePortsWatcher and begins watching for // k8sAPI for service changes. -func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) *OpaquePortsWatcher { +func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error) { opw := &OpaquePortsWatcher{ subscriptions: make(map[ServiceID]*svcSubscriptions), k8sAPI: k8sAPI, log: log.WithField("component", "opaque-ports-watcher"), defaultOpaquePorts: opaquePorts, } - k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: opw.addService, DeleteFunc: opw.deleteService, UpdateFunc: func(_, obj interface{}) { opw.addService(obj) }, }) - return opw + if err != nil { + return nil, err + } + + return opw, nil } // Subscribe subscribes a listener to a service; each time the service diff --git a/controller/api/destination/watcher/opaque_ports_watcher_test.go b/controller/api/destination/watcher/opaque_ports_watcher_test.go index bce866d80..9b94cb202 100644 --- a/controller/api/destination/watcher/opaque_ports_watcher_test.go +++ b/controller/api/destination/watcher/opaque_ports_watcher_test.go @@ -218,7 +218,10 @@ func TestOpaquePortsWatcher(t *testing.T) { if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) } - watcher := NewOpaquePortsWatcher(k8sAPI, logging.WithField("test", t.Name()), defaultOpaquePorts) + watcher, err := NewOpaquePortsWatcher(k8sAPI, logging.WithField("test", t.Name()), defaultOpaquePorts) + if err != nil { + t.Fatalf("can't create opaque ports watcher: %s", err) + } k8sAPI.Sync(nil) listener := newTestOpaquePortsListener() watcher.Subscribe(tt.service, listener) diff --git a/controller/api/destination/watcher/profile_watcher.go b/controller/api/destination/watcher/profile_watcher.go index 859f67a81..ec52f96bd 100644 --- a/controller/api/destination/watcher/profile_watcher.go +++ b/controller/api/destination/watcher/profile_watcher.go @@ -44,22 +44,25 @@ var profileVecs = newMetricsVecs("profile", []string{"namespace", "profile"}) // NewProfileWatcher creates a ProfileWatcher and begins watching the k8sAPI for // service profile changes. -func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) *ProfileWatcher { +func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) (*ProfileWatcher, error) { watcher := &ProfileWatcher{ profileLister: k8sAPI.SP().Lister(), profiles: make(map[ProfileID]*profilePublisher), log: log.WithField("component", "profile-watcher"), } - k8sAPI.SP().Informer().AddEventHandler( + _, err := k8sAPI.SP().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: watcher.addProfile, UpdateFunc: watcher.updateProfile, DeleteFunc: watcher.deleteProfile, }, ) + if err != nil { + return nil, err + } - return watcher + return watcher, nil } ////////////////////// diff --git a/controller/api/destination/watcher/profile_watcher_test.go b/controller/api/destination/watcher/profile_watcher_test.go index fa6b6242f..02af3ca9d 100644 --- a/controller/api/destination/watcher/profile_watcher_test.go +++ b/controller/api/destination/watcher/profile_watcher_test.go @@ -84,7 +84,10 @@ func TestProfileWatcherUpdates(t *testing.T) { t.Fatalf("NewFakeAPI returned an error: %s", err) } - watcher := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name())) + watcher, err := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name())) + if err != nil { + t.Fatalf("can't create profile watcher: %s", err) + } k8sAPI.Sync(nil) @@ -136,7 +139,10 @@ func TestProfileWatcherDeletes(t *testing.T) { t.Fatalf("NewFakeAPI returned an error: %s", err) } - watcher := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name())) + watcher, err := NewProfileWatcher(k8sAPI, logging.WithField("test", t.Name())) + if err != nil { + t.Fatalf("can't create profile watcher: %s", err) + } k8sAPI.Sync(nil) listener := NewDeletingProfileListener() diff --git a/controller/api/destination/watcher/server_watcher.go b/controller/api/destination/watcher/server_watcher.go index a8e91ccf8..8611cd875 100644 --- a/controller/api/destination/watcher/server_watcher.go +++ b/controller/api/destination/watcher/server_watcher.go @@ -37,18 +37,22 @@ type ServerUpdateListener interface { } // NewServerWatcher creates a new ServerWatcher. -func NewServerWatcher(k8sAPI *k8s.API, log *logging.Entry) *ServerWatcher { +func NewServerWatcher(k8sAPI *k8s.API, log *logging.Entry) (*ServerWatcher, error) { sw := &ServerWatcher{ subscriptions: make(map[podPort][]ServerUpdateListener), k8sAPI: k8sAPI, log: log, } - k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + _, err := k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sw.addServer, DeleteFunc: sw.deleteServer, UpdateFunc: func(_, obj interface{}) { sw.addServer(obj) }, }) - return sw + if err != nil { + return nil, err + } + + return sw, nil } // Subscribe subscribes a listener for any Server updates that may select the diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index a609f107a..ca2127426 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -782,7 +782,7 @@ func (rcsw *RemoteClusterServiceWatcher) processEvents(ctx context.Context) { func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { rcsw.remoteAPIClient.Sync(rcsw.stopper) rcsw.eventsQueue.Add(&OrphanedServicesGcTriggered{}) - rcsw.remoteAPIClient.Svc().Informer().AddEventHandler( + _, err := rcsw.remoteAPIClient.Svc().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(svc interface{}) { rcsw.eventsQueue.Add(&OnAddCalled{svc.(*corev1.Service)}) @@ -808,8 +808,11 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { }, }, ) + if err != nil { + return err + } - rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler( + _, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ // AddFunc only relevant for exported headless endpoints AddFunc: func(obj interface{}) { @@ -845,14 +848,20 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { }, }, ) + if err != nil { + return err + } - rcsw.localAPIClient.NS().Informer().AddEventHandler( + _, err = rcsw.localAPIClient.NS().Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { rcsw.eventsQueue.Add(&OnLocalNamespaceAdded{obj.(*corev1.Namespace)}) }, }, ) + if err != nil { + return err + } go rcsw.processEvents(ctx)