linkerd2/controller/api/destination/watcher/ip_watcher.go

414 lines
11 KiB
Go

package watcher
import (
"fmt"
"sync"
"github.com/linkerd/linkerd2/controller/k8s"
logging "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
type (
// IPWatcher wraps a EndpointsWatcher and allows subscriptions by
// IP address. It watches all services in the cluster to keep an index
// of service by cluster IP and translates subscriptions by IP address into
// subscriptions on the EndpointWatcher by service name.
IPWatcher struct {
publishers map[string]*serviceSubscriptions
endpoints *EndpointsWatcher
k8sAPI *k8s.API
log *logging.Entry
sync.RWMutex // This mutex protects modification of the map itself.
}
serviceSubscriptions struct {
clusterIP string
// At most one of service or pod may be non-zero.
service ServiceID
pod AddressSet
listeners map[EndpointUpdateListener]Port
endpoints *EndpointsWatcher
log *logging.Entry
// All access to the servicePublisher and its portPublishers is explicitly synchronized by
// this mutex.
sync.Mutex
}
)
// NewIPWatcher creates an IPWatcher and begins watching the k8sAPI for service
// changes.
func NewIPWatcher(k8sAPI *k8s.API, endpoints *EndpointsWatcher, log *logging.Entry) *IPWatcher {
iw := &IPWatcher{
publishers: make(map[string]*serviceSubscriptions),
endpoints: endpoints,
k8sAPI: k8sAPI,
log: log.WithFields(logging.Fields{
"component": "ip-watcher",
}),
}
k8sAPI.Svc().Informer().AddIndexers(cache.Indexers{podIPIndex: func(obj interface{}) ([]string, error) {
if svc, ok := obj.(*corev1.Service); ok {
return []string{svc.Spec.ClusterIP}, nil
}
return []string{""}, fmt.Errorf("object is not a service")
}})
k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: iw.addService,
DeleteFunc: iw.deleteService,
UpdateFunc: func(before interface{}, after interface{}) {
iw.deleteService(before)
iw.addService(after)
},
})
k8sAPI.Pod().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: iw.addPod,
DeleteFunc: iw.deletePod,
UpdateFunc: func(_ interface{}, obj interface{}) {
iw.addPod(obj)
},
})
return iw
}
////////////////////////
/// IPWatcher ///
////////////////////////
// Subscribe to an authority.
// The provided listener will be updated each time the address set for the
// given authority is changed.
func (iw *IPWatcher) Subscribe(clusterIP string, port Port, listener EndpointUpdateListener) error {
iw.log.Infof("Establishing watch on service cluster ip [%s:%d]", clusterIP, port)
ss := iw.getOrNewServiceSubscriptions(clusterIP)
return ss.subscribe(port, listener)
}
// Unsubscribe removes a listener from the subscribers list for this authority.
func (iw *IPWatcher) Unsubscribe(clusterIP string, port Port, listener EndpointUpdateListener) {
iw.log.Infof("Stopping watch on service cluster ip [%s:%d]", clusterIP, port)
ss, ok := iw.getServiceSubscriptions(clusterIP)
if !ok {
iw.log.Errorf("Cannot unsubscribe from unknown service ip [%s:%d]", clusterIP, port)
return
}
ss.unsubscribe(port, listener)
}
// GetSvc returns the service that corresponds to an IP address if one exists.
func (iw *IPWatcher) GetSvc(clusterIP string) (*ServiceID, error) {
objs, err := iw.k8sAPI.Svc().Informer().GetIndexer().ByIndex(podIPIndex, clusterIP)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
if len(objs) > 1 {
return nil, status.Errorf(codes.FailedPrecondition, "Service cluster IP conflict: %v, %v", objs[0], objs[1])
}
if len(objs) == 1 {
if svc, ok := objs[0].(*corev1.Service); ok {
service := &ServiceID{
Namespace: svc.Namespace,
Name: svc.Name,
}
return service, nil
}
}
// `clusterIP` does not map to a service that the indexer is aware of.
return nil, nil
}
func (iw *IPWatcher) addService(obj interface{}) {
service := obj.(*corev1.Service)
if service.Namespace == kubeSystem || service.Spec.ClusterIP == "None" {
return
}
ss := iw.getOrNewServiceSubscriptions(service.Spec.ClusterIP)
ss.updateService(service)
}
func (iw *IPWatcher) deleteService(obj interface{}) {
service, ok := obj.(*corev1.Service)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
iw.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
return
}
service, ok = tombstone.Obj.(*corev1.Service)
if !ok {
iw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj)
return
}
}
if service.Namespace == kubeSystem {
return
}
ss, ok := iw.getServiceSubscriptions(service.Spec.ClusterIP)
if ok {
ss.deleteService()
}
}
func (iw *IPWatcher) addPod(obj interface{}) {
pod := obj.(*corev1.Pod)
if pod.Namespace == kubeSystem {
return
}
if pod.Status.PodIP == "" {
// Pod has not yet been assigned an IP address.
return
}
if pod.Spec.HostNetwork {
// Don't trigger updates for watches on host IP addresses.
return
}
ss := iw.getOrNewServiceSubscriptions(pod.Status.PodIP)
ss.updatePod(iw.podToAddressSet(pod))
}
func (iw *IPWatcher) deletePod(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
iw.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
return
}
pod, ok = tombstone.Obj.(*corev1.Pod)
if !ok {
iw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Pod %#v", obj)
return
}
}
if pod.Namespace == kubeSystem {
return
}
ss, ok := iw.getServiceSubscriptions(pod.Status.PodIP)
if ok {
ss.deletePod()
}
}
// Returns the serviceSubscriptions for the given clusterIP if it exists. Otherwise,
// create a new one and return it.
func (iw *IPWatcher) getOrNewServiceSubscriptions(clusterIP string) *serviceSubscriptions {
iw.Lock()
defer iw.Unlock()
// If the service doesn't yet exist, create a stub for it so the listener can
// be registered.
ss, ok := iw.publishers[clusterIP]
if !ok {
ss = &serviceSubscriptions{
clusterIP: clusterIP,
listeners: make(map[EndpointUpdateListener]Port),
endpoints: iw.endpoints,
log: iw.log.WithField("clusterIP", clusterIP),
}
objs, err := iw.k8sAPI.Svc().Informer().GetIndexer().ByIndex(podIPIndex, clusterIP)
if err != nil {
iw.log.Error(err)
} else {
if len(objs) > 1 {
iw.log.Errorf("Service cluster IP conflict: %v, %v", objs[0], objs[1])
}
if len(objs) == 1 {
if svc, ok := objs[0].(*corev1.Service); ok {
ss.service = ServiceID{
Namespace: svc.Namespace,
Name: svc.Name,
}
}
}
}
objs, err = iw.k8sAPI.Pod().Informer().GetIndexer().ByIndex(podIPIndex, clusterIP)
if err != nil {
iw.log.Error(err)
} else {
pods := []*corev1.Pod{}
for _, obj := range objs {
if pod, ok := obj.(*corev1.Pod); ok {
// Skip pods with HostNetwork.
if !pod.Spec.HostNetwork {
pods = append(pods, pod)
}
}
}
if len(pods) > 1 {
iw.log.Errorf("Pod IP conflict: %v, %v", objs[0], objs[1])
}
if len(pods) == 1 {
ss.pod = iw.podToAddressSet(pods[0])
}
}
iw.publishers[clusterIP] = ss
}
return ss
}
func (iw *IPWatcher) getServiceSubscriptions(clusterIP string) (ss *serviceSubscriptions, ok bool) {
iw.RLock()
defer iw.RUnlock()
ss, ok = iw.publishers[clusterIP]
return
}
func (iw *IPWatcher) podToAddressSet(pod *corev1.Pod) AddressSet {
ownerKind, ownerName := iw.k8sAPI.GetOwnerKindAndName(pod, true)
return AddressSet{
Addresses: map[PodID]Address{
PodID{
Name: pod.Name,
Namespace: pod.Namespace,
}: Address{
IP: pod.Status.PodIP,
Port: 0, // Will be set by individual subscriptions
Pod: pod,
OwnerName: ownerName,
OwnerKind: ownerKind,
},
},
Labels: map[string]string{"namespace": pod.Namespace},
}
}
////////////////////////
/// serviceSubscriptions ///
////////////////////////
func (ss *serviceSubscriptions) updateService(service *corev1.Service) {
ss.Lock()
defer ss.Unlock()
id := ServiceID{
Namespace: service.Namespace,
Name: service.Name,
}
if id != ss.service {
for listener, port := range ss.listeners {
ss.endpoints.Unsubscribe(ss.service, port, "", listener)
listener.NoEndpoints(true) // Clear out previous endpoints.
err := ss.endpoints.Subscribe(id, port, "", listener)
if err != nil {
ss.log.Warnf("failed to subscribe to %s: %s", id, err)
listener.NoEndpoints(true) // Clear out previous endpoints.
listener.Add(singletonAddress(ss.clusterIP, port))
}
}
ss.service = id
ss.pod = AddressSet{}
}
}
func (ss *serviceSubscriptions) deleteService() {
ss.Lock()
defer ss.Unlock()
for listener, port := range ss.listeners {
ss.endpoints.Unsubscribe(ss.service, port, "", listener)
listener.NoEndpoints(true) // Clear out previous endpoints.
listener.Add(singletonAddress(ss.clusterIP, port))
}
ss.service = ServiceID{}
}
func (ss *serviceSubscriptions) updatePod(podSet AddressSet) {
ss.Lock()
defer ss.Unlock()
for listener, port := range ss.listeners {
// Pod IP addresses never change. Therefore, we don't need to send a
// NoEndpoints update to clear our the previous address; sending an
// Add with the same address will replace the previous one.
if len(podSet.Addresses) != 0 {
podSetWithPort := withPort(podSet, port)
listener.Add(podSetWithPort)
}
}
ss.pod = podSet
}
func (ss *serviceSubscriptions) deletePod() {
ss.Lock()
defer ss.Unlock()
for listener, port := range ss.listeners {
listener.NoEndpoints(true) // Clear out previous endpoints.
listener.Add(singletonAddress(ss.clusterIP, port))
}
ss.pod = AddressSet{}
}
func (ss *serviceSubscriptions) subscribe(port Port, listener EndpointUpdateListener) error {
ss.Lock()
defer ss.Unlock()
if (ss.service != ServiceID{}) {
err := ss.endpoints.Subscribe(ss.service, port, "", listener)
if err != nil {
return err
}
} else if len(ss.pod.Addresses) != 0 {
podSetWithPort := withPort(ss.pod, port)
listener.Add(podSetWithPort)
} else {
listener.Add(singletonAddress(ss.clusterIP, port))
}
ss.listeners[listener] = port
return nil
}
func (ss *serviceSubscriptions) unsubscribe(port Port, listener EndpointUpdateListener) {
ss.Lock()
defer ss.Unlock()
if (ss.service != ServiceID{}) {
ss.endpoints.Unsubscribe(ss.service, port, "", listener)
}
delete(ss.listeners, listener)
}
func withPort(pods AddressSet, port Port) AddressSet {
wp := AddressSet{
Addresses: map[PodID]Address{},
Labels: pods.Labels,
}
for id, addr := range pods.Addresses {
addr.Port = port
wp.Addresses[id] = addr
}
return wp
}
func singletonAddress(ip string, port Port) AddressSet {
return AddressSet{
Addresses: map[PodID]Address{
PodID{}: Address{
IP: ip,
Port: port,
},
},
Labels: map[string]string{},
}
}