diff --git a/controller/api/destination/watcher/workload_watcher.go b/controller/api/destination/watcher/workload_watcher.go index d4c13da2e..23bb5716e 100644 --- a/controller/api/destination/watcher/workload_watcher.go +++ b/controller/api/destination/watcher/workload_watcher.go @@ -20,6 +20,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" ) @@ -92,8 +94,8 @@ func NewWorkloadWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logg } _, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: ww.updateServers, - DeleteFunc: ww.updateServers, + AddFunc: ww.addOrDeleteServer, + DeleteFunc: ww.addOrDeleteServer, UpdateFunc: ww.updateServer, }) if err != nil { @@ -310,27 +312,51 @@ func (ww *WorkloadWatcher) updateServer(oldObj interface{}, newObj interface{}) oldUpdated := latestUpdated(oldServer.ManagedFields) updated := latestUpdated(newServer.ManagedFields) + if !updated.IsZero() && updated != oldUpdated { delta := time.Since(updated) serverInformerLag.Observe(delta.Seconds()) } - ww.updateServers(newObj) + ww.updateServers(oldServer, newServer) +} + +func (ww *WorkloadWatcher) addOrDeleteServer(obj interface{}) { + server, ok := obj.(*v1beta2.Server) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + ww.log.Errorf("Couldn't get object from DeletedFinalStateUnknown %#v", obj) + return + } + server, ok = tombstone.Obj.(*v1beta2.Server) + if !ok { + ww.log.Errorf("DeletedFinalStateUnknown contained object that is not a Server %#v", obj) + return + } + } + ww.updateServers(server) } // updateServer triggers an Update() call to the listeners of the workloadPublishers -// whose pod matches the Server's podSelector or whose externalworkload matches -// the Server's externalworkload selection. This function is an event handler -// so it cannot block. -func (ww *WorkloadWatcher) updateServers(_ any) { +// whose pod matches the any of the Servers' podSelector or whose +// externalworkload matches any of the Servers' externalworkload selection. This +// function is an event handler so it cannot block. +func (ww *WorkloadWatcher) updateServers(servers ...*v1beta2.Server) { ww.mu.RLock() defer ww.mu.RUnlock() for _, wp := range ww.publishers { var opaquePorts map[uint32]struct{} if wp.pod != nil { + if !ww.isPodSelectedByAny(wp.pod, servers...) { + continue + } opaquePorts = GetAnnotatedOpaquePorts(wp.pod, ww.defaultOpaquePorts) } else if wp.externalWorkload != nil { + if !ww.isExternalWorkloadSelectedByAny(wp.externalWorkload, servers...) { + continue + } opaquePorts = GetAnnotatedOpaquePortsForExternalWorkload(wp.externalWorkload, ww.defaultOpaquePorts) } else { continue @@ -369,6 +395,34 @@ func (ww *WorkloadWatcher) updateServers(_ any) { } } +func (ww *WorkloadWatcher) isPodSelectedByAny(pod *corev1.Pod, servers ...*v1beta2.Server) bool { + for _, s := range servers { + selector, err := metav1.LabelSelectorAsSelector(s.Spec.PodSelector) + if err != nil { + ww.log.Errorf("failed to parse PodSelector of Server %s.%s: %q", s.GetName(), s.GetNamespace(), err) + continue + } + if selector.Matches(labels.Set(pod.Labels)) { + return true + } + } + return false +} + +func (ww *WorkloadWatcher) isExternalWorkloadSelectedByAny(ew *ext.ExternalWorkload, servers ...*v1beta2.Server) bool { + for _, s := range servers { + selector, err := metav1.LabelSelectorAsSelector(s.Spec.ExternalWorkloadSelector) + if err != nil { + ww.log.Errorf("failed to parse ExternalWorkloadSelector of Server %s.%s: %q", s.GetName(), s.GetNamespace(), err) + continue + } + if selector.Matches(labels.Set(ew.Labels)) { + return true + } + } + return false +} + // getOrNewWorkloadPublisher returns the workloadPublisher for the given target if it // exists. Otherwise, it creates a new one and returns it. func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostname, ip string, port Port) (*workloadPublisher, error) {