Support pod ip and service cluster ip lookups in the destination service (#3595)

Fixes #3444 
Fixes #3443 

## Background and Behavior

This change adds support for the destination service to resolve Get requests which contain a service clusterIP or pod ip as the `Path` parameter.  It returns the stream of endpoints, just as if `Get` had been called with the service's authority.  This lays the groundwork for allowing the proxy to TLS TCP connections by allowing the proxy to do destination lookups for the SO_ORIG_DST of tcp connections.  When that ip address corresponds to a service cluster ip or pod ip, the destination service will return the endpoints stream, including the pod metadata required to establish identity.

Prior to this change, attempting to look up an ip address in the destination service would result in a `InvalidArgument` error.

Updating the `GetProfile` method to support ip address lookups is out of scope and attempts to look up an ip address with the `GetProfile` method will result in `InvalidArgument`.

## Implementation

We do this by creating a `IPWatcher` which wraps the `EndpointsWatcher` and supports lookups by ip.   `IPWatcher` maintains a mapping up clusterIPs to service ids and translates subscriptions to an IP address into a subscription to the service id using the underlying `EndpointsWatcher`.

Since the service name is no longer always infer-able directly from the input parameters, we restructure `EndpointTranslator` and `PodSet` so that we propagate the service name from the endpoints API response.

## Testing

This can be tested by running the destination service locally, using the current kube context to connect to a Kubernetes cluster:

```
go run controller/cmd/main.go destination -kubeconfig ~/.kube/config
```

Then lookups can be issued using the destination client:

```
go run controller/script/destination-client/main.go -path 192.168.54.78:80 -method get -addr localhost:8086
```

Service cluster ips and pod ips can be used as the `path` argument.

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2019-12-19 09:25:12 -08:00 committed by GitHub
parent 33b3544874
commit 03762cc526
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 954 additions and 61 deletions

View File

@ -14,6 +14,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -136,6 +136,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -136,6 +136,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -128,6 +128,9 @@ rules:
- apiGroups: ["apps"]
resources: ["replicasets"]
verbs: ["list", "get", "watch"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["list", "get", "watch"]
- apiGroups: [""]
resources: ["pods", "endpoints", "services"]
verbs: ["list", "get", "watch"]

View File

@ -19,7 +19,6 @@ type endpointTranslator struct {
controllerNS string
identityTrustDomain string
enableH2Upgrade bool
labels map[string]string
stream pb.Destination_GetServer
log *logging.Entry
}
@ -28,7 +27,7 @@ func newEndpointTranslator(
controllerNS string,
identityTrustDomain string,
enableH2Upgrade bool,
service watcher.ServiceID,
service string,
stream pb.Destination_GetServer,
log *logging.Entry,
) *endpointTranslator {
@ -37,16 +36,12 @@ func newEndpointTranslator(
"service": service,
})
labels := map[string]string{
"namespace": service.Namespace,
"service": service.Name,
}
return &endpointTranslator{controllerNS, identityTrustDomain, enableH2Upgrade, labels, stream, log}
return &endpointTranslator{controllerNS, identityTrustDomain, enableH2Upgrade, stream, log}
}
func (et *endpointTranslator) Add(set watcher.PodSet) {
addrs := []*pb.WeightedAddr{}
for _, address := range set {
for _, address := range set.Pods {
var (
wa *pb.WeightedAddr
err error
@ -63,7 +58,6 @@ func (et *endpointTranslator) Add(set watcher.PodSet) {
Weight: defaultWeight,
}
}
if err != nil {
et.log.Errorf("Failed to translate endpoints to weighted addr: %s", err)
continue
@ -74,7 +68,7 @@ func (et *endpointTranslator) Add(set watcher.PodSet) {
add := &pb.Update{Update: &pb.Update_Add{
Add: &pb.WeightedAddrSet{
Addrs: addrs,
MetricLabels: et.labels,
MetricLabels: set.Labels,
},
}}
@ -86,7 +80,7 @@ func (et *endpointTranslator) Add(set watcher.PodSet) {
func (et *endpointTranslator) Remove(set watcher.PodSet) {
addrs := []*net.TcpAddress{}
for _, address := range set {
for _, address := range set.Pods {
tcpAddr, err := et.toAddr(address)
if err != nil {
et.log.Errorf("Failed to translate endpoints to addr: %s", err)

View File

@ -100,7 +100,7 @@ func makeEndpointTranslator(t *testing.T) (*mockDestinationGetServer, *endpointT
"linkerd",
"trust.domain",
false,
watcher.ServiceID{Name: "service-name", Namespace: "service-ns"},
"service-name.service-ns",
mockGetServer,
logging.WithField("test", t.Name),
)
@ -239,10 +239,13 @@ func TestEndpointTranslator(t *testing.T) {
}
func mkPodSet(pods ...watcher.Address) watcher.PodSet {
set := make(watcher.PodSet)
set := watcher.PodSet{
Pods: make(map[watcher.PodID]watcher.Address),
Labels: map[string]string{"service": "service-name", "namespace": "service-ns"},
}
for _, p := range pods {
id := watcher.PodID{Name: p.Pod.Name, Namespace: p.Pod.Namespace}
set[id] = p
set.Pods[id] = p
}
return set
}

View File

@ -22,6 +22,7 @@ type (
endpoints *watcher.EndpointsWatcher
profiles *watcher.ProfileWatcher
trafficSplits *watcher.TrafficSplitWatcher
ips *watcher.IPWatcher
enableH2Upgrade bool
controllerNS string
@ -61,11 +62,13 @@ func NewServer(
endpoints := watcher.NewEndpointsWatcher(k8sAPI, log)
profiles := watcher.NewProfileWatcher(k8sAPI, log)
trafficSplits := watcher.NewTrafficSplitWatcher(k8sAPI, log)
ips := watcher.NewIPWatcher(k8sAPI, endpoints, log)
srv := server{
endpoints,
profiles,
trafficSplits,
ips,
enableH2Upgrade,
controllerNS,
identityTrustDomain,
@ -88,6 +91,15 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
}
log.Debugf("Get %s", dest.GetPath())
translator := newEndpointTranslator(
s.controllerNS,
s.identityTrustDomain,
s.enableH2Upgrade,
dest.GetPath(),
stream,
log,
)
// The host must be fully-qualified or be an IP address.
host, port, err := getHostAndPort(dest.GetPath())
if err != nil {
@ -96,36 +108,32 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
}
if ip := net.ParseIP(host); ip != nil {
// TODO handle lookup by IP address
log.Debug("Lookup of IP addresses is not yet supported")
return status.Errorf(codes.InvalidArgument, "Cannot resolve IP addresses")
}
err := s.ips.Subscribe(host, port, translator)
if err != nil {
log.Errorf("Failed to subscribe to %s: %s", dest.GetPath(), err)
return err
}
defer s.ips.Unsubscribe(host, port, translator)
service, instanceID, err := parseK8sServiceName(host, s.clusterDomain)
if err != nil {
log.Debugf("Invalid service %s", dest.GetPath())
return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
}
} else {
translator := newEndpointTranslator(
s.controllerNS,
s.identityTrustDomain,
s.enableH2Upgrade,
service,
stream,
log,
)
err = s.endpoints.Subscribe(service, port, instanceID, translator)
if err != nil {
if _, ok := err.(watcher.InvalidService); ok {
service, instanceID, err := parseK8sServiceName(host, s.clusterDomain)
if err != nil {
log.Debugf("Invalid service %s", dest.GetPath())
return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
}
log.Errorf("Failed to subscribe to %s: %s", dest.GetPath(), err)
return err
err = s.endpoints.Subscribe(service, port, instanceID, translator)
if err != nil {
if _, ok := err.(watcher.InvalidService); ok {
log.Debugf("Invalid service %s", dest.GetPath())
return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
}
log.Errorf("Failed to subscribe to %s: %s", dest.GetPath(), err)
return err
}
defer s.endpoints.Unsubscribe(service, port, instanceID, translator)
}
defer s.endpoints.Unsubscribe(service, port, instanceID, translator)
select {
case <-s.shutdown:

View File

@ -104,11 +104,13 @@ spec:
endpoints := watcher.NewEndpointsWatcher(k8sAPI, log)
profiles := watcher.NewProfileWatcher(k8sAPI, log)
trafficSplits := watcher.NewTrafficSplitWatcher(k8sAPI, log)
ips := watcher.NewIPWatcher(k8sAPI, endpoints, log)
return &server{
endpoints,
profiles,
trafficSplits,
ips,
false,
"linkerd",
"trust.domain",

View File

@ -34,7 +34,10 @@ type (
}
// PodSet is a set of pods, indexed by IP.
PodSet = map[PodID]Address
PodSet struct {
Pods map[PodID]Address
Labels map[string]string
}
portAndHostname struct {
port Port
@ -389,17 +392,17 @@ func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus
func (pp *portPublisher) updateEndpoints(endpoints *corev1.Endpoints) {
newPods := pp.endpointsToAddresses(endpoints)
if len(newPods) == 0 {
if len(newPods.Pods) == 0 {
for _, listener := range pp.listeners {
listener.NoEndpoints(true)
}
} else {
add, remove := diffPods(pp.pods, newPods)
for _, listener := range pp.listeners {
if len(remove) > 0 {
if len(remove.Pods) > 0 {
listener.Remove(remove)
}
if len(add) > 0 {
if len(add.Pods) > 0 {
listener.Add(add)
}
}
@ -408,12 +411,12 @@ func (pp *portPublisher) updateEndpoints(endpoints *corev1.Endpoints) {
pp.pods = newPods
pp.metrics.incUpdates()
pp.metrics.setPods(len(pp.pods))
pp.metrics.setPods(len(pp.pods.Pods))
pp.metrics.setExists(true)
}
func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) PodSet {
pods := make(PodSet)
pods := make(map[PodID]Address)
for _, subset := range endpoints.Subsets {
resolvedPort := pp.resolveTargetPort(subset)
for _, endpoint := range subset.Addresses {
@ -456,7 +459,10 @@ func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) PodSe
}
}
}
return pods
return PodSet{
Pods: pods,
Labels: map[string]string{"service": endpoints.Name, "namespace": endpoints.Namespace},
}
}
func (pp *portPublisher) resolveTargetPort(subset corev1.EndpointSubset) Port {
@ -485,7 +491,7 @@ func (pp *portPublisher) updatePort(targetPort namedPort) {
func (pp *portPublisher) noEndpoints(exists bool) {
pp.exists = exists
pp.pods = make(PodSet)
pp.pods = PodSet{}
for _, listener := range pp.listeners {
listener.NoEndpoints(exists)
}
@ -497,7 +503,7 @@ func (pp *portPublisher) noEndpoints(exists bool) {
func (pp *portPublisher) subscribe(listener EndpointUpdateListener) {
if pp.exists {
if len(pp.pods) > 0 {
if len(pp.pods.Pods) > 0 {
listener.Add(pp.pods)
} else {
listener.NoEndpoints(true)
@ -556,17 +562,24 @@ func diffPods(oldPods, newPods PodSet) (add, remove PodSet) {
// TODO: this detects pods which have been added or removed, but does not
// detect pods which have been modified. A modified pod should trigger
// an add of the new version.
add = make(PodSet)
remove = make(PodSet)
for id, pod := range newPods {
if _, ok := oldPods[id]; !ok {
add[id] = pod
addPods := make(map[PodID]Address)
removePods := make(map[PodID]Address)
for id, pod := range newPods.Pods {
if _, ok := oldPods.Pods[id]; !ok {
addPods[id] = pod
}
}
for id, pod := range oldPods {
if _, ok := newPods[id]; !ok {
remove[id] = pod
for id, pod := range oldPods.Pods {
if _, ok := newPods.Pods[id]; !ok {
removePods[id] = pod
}
}
add = PodSet{
Pods: addPods,
Labels: newPods.Labels,
}
remove = PodSet{
Pods: removePods,
}
return
}

View File

@ -29,13 +29,13 @@ func addressString(address Address) string {
}
func (bel *bufferingEndpointListener) Add(set PodSet) {
for _, address := range set {
for _, address := range set.Pods {
bel.added = append(bel.added, addressString(address))
}
}
func (bel *bufferingEndpointListener) Remove(set PodSet) {
for _, address := range set {
for _, address := range set.Pods {
bel.removed = append(bel.removed, addressString(address))
}
}

View File

@ -0,0 +1,362 @@
package watcher
import (
"fmt"
"sync"
"github.com/linkerd/linkerd2/controller/k8s"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
type (
// IPWatcher wraps a EndpointsWatcher and allows subscriptions by
// IP address. It watches all services in the cluster to keep an index
// of service by cluster IP and translates subscriptions by IP address into
// subscriptions on the EndpointWatcher by service name.
IPWatcher struct {
publishers map[string]*serviceSubscriptions
endpoints *EndpointsWatcher
k8sAPI *k8s.API
log *logging.Entry
sync.RWMutex // This mutex protects modification of the map itself.
}
serviceSubscriptions struct {
clusterIP string
// At most one of service or pod may be non-zero.
service ServiceID
pod PodSet
listeners map[EndpointUpdateListener]Port
endpoints *EndpointsWatcher
log *logging.Entry
// All access to the servicePublisher and its portPublishers is explicitly synchronized by
// this mutex.
sync.Mutex
}
)
// NewIPWatcher creates an IPWatcher and begins watching the k8sAPI for service
// changes.
func NewIPWatcher(k8sAPI *k8s.API, endpoints *EndpointsWatcher, log *logging.Entry) *IPWatcher {
iw := &IPWatcher{
publishers: make(map[string]*serviceSubscriptions),
endpoints: endpoints,
k8sAPI: k8sAPI,
log: log.WithFields(logging.Fields{
"component": "ip-watcher",
}),
}
k8sAPI.Svc().Informer().AddIndexers(cache.Indexers{podIPIndex: func(obj interface{}) ([]string, error) {
if svc, ok := obj.(*corev1.Service); ok {
return []string{svc.Spec.ClusterIP}, nil
}
return []string{""}, fmt.Errorf("object is not a service")
}})
k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: iw.addService,
DeleteFunc: iw.deleteService,
UpdateFunc: func(before interface{}, after interface{}) {
iw.deleteService(before)
iw.addService(after)
},
})
k8sAPI.Pod().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: iw.addPod,
DeleteFunc: iw.deletePod,
UpdateFunc: func(_ interface{}, obj interface{}) {
iw.addPod(obj)
},
})
return iw
}
////////////////////////
/// IPWatcher ///
////////////////////////
// Subscribe to an authority.
// The provided listener will be updated each time the address set for the
// given authority is changed.
func (iw *IPWatcher) Subscribe(clusterIP string, port Port, listener EndpointUpdateListener) error {
iw.log.Infof("Establishing watch on service cluster ip [%s:%d]", clusterIP, port)
ss := iw.getOrNewServiceSubscriptions(clusterIP)
return ss.subscribe(port, listener)
}
// Unsubscribe removes a listener from the subscribers list for this authority.
func (iw *IPWatcher) Unsubscribe(clusterIP string, port Port, listener EndpointUpdateListener) {
iw.log.Infof("Stopping watch on service cluster ip [%s:%d]", clusterIP, port)
ss, ok := iw.getServiceSubscriptions(clusterIP)
if !ok {
iw.log.Errorf("Cannot unsubscribe from unknown service ip [%s:%d]", clusterIP, port)
return
}
ss.unsubscribe(port, listener)
}
func (iw *IPWatcher) addService(obj interface{}) {
service := obj.(*corev1.Service)
if service.Namespace == kubeSystem || service.Spec.ClusterIP == "None" {
return
}
ss := iw.getOrNewServiceSubscriptions(service.Spec.ClusterIP)
ss.updateService(service)
}
func (iw *IPWatcher) deleteService(obj interface{}) {
service := obj.(*corev1.Service)
if service.Namespace == kubeSystem {
return
}
ss, ok := iw.getServiceSubscriptions(service.Spec.ClusterIP)
if ok {
ss.deleteService()
}
}
func (iw *IPWatcher) addPod(obj interface{}) {
pod := obj.(*corev1.Pod)
if pod.Namespace == kubeSystem {
return
}
if pod.Status.PodIP == "" {
// Pod has not yet been assigned an IP address.
return
}
ss := iw.getOrNewServiceSubscriptions(pod.Status.PodIP)
ss.updatePod(iw.podToPodSet(pod))
}
func (iw *IPWatcher) deletePod(obj interface{}) {
pod := obj.(*corev1.Pod)
if pod.Namespace == kubeSystem {
return
}
ss, ok := iw.getServiceSubscriptions(pod.Status.PodIP)
if ok {
ss.deletePod()
}
}
// Returns the serviceSubscriptions for the given clusterIP if it exists. Otherwise,
// create a new one and return it.
func (iw *IPWatcher) getOrNewServiceSubscriptions(clusterIP string) *serviceSubscriptions {
iw.Lock()
defer iw.Unlock()
// If the service doesn't yet exist, create a stub for it so the listener can
// be registered.
ss, ok := iw.publishers[clusterIP]
if !ok {
ss = &serviceSubscriptions{
clusterIP: clusterIP,
listeners: make(map[EndpointUpdateListener]Port),
endpoints: iw.endpoints,
log: iw.log.WithField("clusterIP", clusterIP),
}
objs, err := iw.k8sAPI.Svc().Informer().GetIndexer().ByIndex(podIPIndex, clusterIP)
if err != nil {
iw.log.Error(err)
} else {
if len(objs) > 1 {
iw.log.Errorf("Service cluster IP conflict: %v, %v", objs[0], objs[1])
}
if len(objs) == 1 {
if svc, ok := objs[0].(*corev1.Service); ok {
ss.service = ServiceID{
Namespace: svc.Namespace,
Name: svc.Name,
}
}
}
}
objs, err = iw.k8sAPI.Pod().Informer().GetIndexer().ByIndex(podIPIndex, clusterIP)
if err != nil {
iw.log.Error(err)
} else {
pods := []*corev1.Pod{}
for _, obj := range objs {
if pod, ok := obj.(*corev1.Pod); ok {
// Skip pods with HostNetwork.
if !pod.Spec.HostNetwork {
pods = append(pods, pod)
}
}
}
if len(pods) > 1 {
iw.log.Errorf("Pod IP conflict: %v, %v", objs[0], objs[1])
}
if len(pods) == 1 {
ss.pod = iw.podToPodSet(pods[0])
}
}
iw.publishers[clusterIP] = ss
}
return ss
}
func (iw *IPWatcher) getServiceSubscriptions(clusterIP string) (ss *serviceSubscriptions, ok bool) {
iw.RLock()
defer iw.RUnlock()
ss, ok = iw.publishers[clusterIP]
return
}
func (iw *IPWatcher) podToPodSet(pod *corev1.Pod) PodSet {
if pod.Spec.HostNetwork {
return PodSet{}
}
ownerKind, ownerName := iw.k8sAPI.GetOwnerKindAndName(pod, true)
return PodSet{
Pods: map[PodID]Address{
PodID{
Name: pod.Name,
Namespace: pod.Namespace,
}: Address{
IP: pod.Status.PodIP,
Port: 0, // Will be set by individual subscriptions
Pod: pod,
OwnerName: ownerName,
OwnerKind: ownerKind,
},
},
Labels: map[string]string{"namespace": pod.Namespace},
}
}
////////////////////////
/// serviceSubscriptions ///
////////////////////////
func (ss *serviceSubscriptions) updateService(service *corev1.Service) {
ss.Lock()
defer ss.Unlock()
id := ServiceID{
Namespace: service.Namespace,
Name: service.Name,
}
if id != ss.service {
for listener, port := range ss.listeners {
ss.endpoints.Unsubscribe(ss.service, port, "", listener)
listener.NoEndpoints(true) // Clear out previous endpoints.
err := ss.endpoints.Subscribe(id, port, "", listener)
if err != nil {
ss.log.Warnf("failed to subscribe to %s: %s", id, err)
listener.NoEndpoints(true) // Clear out previous endpoints.
listener.Add(singletonAddress(ss.clusterIP, port))
}
}
ss.service = id
ss.pod = PodSet{}
}
}
func (ss *serviceSubscriptions) deleteService() {
ss.Lock()
defer ss.Unlock()
for listener, port := range ss.listeners {
ss.endpoints.Unsubscribe(ss.service, port, "", listener)
listener.NoEndpoints(true) // Clear out previous endpoints.
listener.Add(singletonAddress(ss.clusterIP, port))
}
ss.service = ServiceID{}
}
func (ss *serviceSubscriptions) updatePod(podSet PodSet) {
ss.Lock()
defer ss.Unlock()
for listener, port := range ss.listeners {
listener.NoEndpoints(true) // Clear out previous endpoints.
if len(podSet.Pods) != 0 {
podSetWithPort := withPort(podSet, port)
listener.Add(podSetWithPort)
}
}
ss.pod = podSet
}
func (ss *serviceSubscriptions) deletePod() {
ss.Lock()
defer ss.Unlock()
for listener, port := range ss.listeners {
listener.NoEndpoints(true) // Clear out previous endpoints.
listener.Add(singletonAddress(ss.clusterIP, port))
}
ss.pod = PodSet{}
}
func (ss *serviceSubscriptions) subscribe(port Port, listener EndpointUpdateListener) error {
ss.Lock()
defer ss.Unlock()
if (ss.service != ServiceID{}) {
err := ss.endpoints.Subscribe(ss.service, port, "", listener)
if err != nil {
return err
}
} else if len(ss.pod.Pods) != 0 {
podSetWithPort := withPort(ss.pod, port)
listener.Add(podSetWithPort)
} else {
listener.Add(singletonAddress(ss.clusterIP, port))
}
ss.listeners[listener] = port
return nil
}
func (ss *serviceSubscriptions) unsubscribe(port Port, listener EndpointUpdateListener) {
ss.Lock()
defer ss.Unlock()
if (ss.service != ServiceID{}) {
ss.endpoints.Unsubscribe(ss.service, port, "", listener)
}
delete(ss.listeners, listener)
}
func withPort(pods PodSet, port Port) PodSet {
wp := PodSet{
Pods: map[PodID]Address{},
Labels: pods.Labels,
}
for id, pod := range pods.Pods {
pod.Port = port
wp.Pods[id] = pod
}
return wp
}
func singletonAddress(ip string, port Port) PodSet {
return PodSet{
Pods: map[PodID]Address{
PodID{}: Address{
IP: ip,
Port: port,
},
},
Labels: map[string]string{},
}
}

View File

@ -0,0 +1,462 @@
package watcher
import (
"sort"
"testing"
"github.com/linkerd/linkerd2/controller/k8s"
logging "github.com/sirupsen/logrus"
)
func TestIPWatcher(t *testing.T) {
for _, tt := range []struct {
serviceType string
k8sConfigs []string
host string
port Port
expectedAddresses []string
expectedNoEndpoints bool
expectedNoEndpointsServiceExists bool
expectedError bool
}{
{
serviceType: "local services",
k8sConfigs: []string{`
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: LoadBalancer
clusterIP: 192.168.210.92
ports:
- port: 8989`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: name1
namespace: ns
subsets:
- addresses:
- ip: 172.17.0.12
targetRef:
kind: Pod
name: name1-1
namespace: ns
- ip: 172.17.0.19
targetRef:
kind: Pod
name: name1-2
namespace: ns
- ip: 172.17.0.20
targetRef:
kind: Pod
name: name1-3
namespace: ns
- ip: 172.17.0.21
ports:
- port: 8989`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-1
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
phase: Running
podIP: 172.17.0.12`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-2
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
phase: Running
podIP: 172.17.0.19`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-3
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
phase: Running
podIP: 172.17.0.20`,
},
host: "192.168.210.92",
port: 8989,
expectedAddresses: []string{
"172.17.0.12:8989",
"172.17.0.19:8989",
"172.17.0.20:8989",
"172.17.0.21:8989",
},
expectedNoEndpoints: false,
expectedNoEndpointsServiceExists: false,
expectedError: false,
},
{
// Test for the issue described in linkerd/linkerd2#1405.
serviceType: "local NodePort service with unnamed port",
k8sConfigs: []string{`
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: NodePort
clusterIP: 192.168.210.92
ports:
- port: 8989
targetPort: port1`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: name1
namespace: ns
subsets:
- addresses:
- ip: 10.233.66.239
targetRef:
kind: Pod
name: name1-f748fb6b4-hpwpw
namespace: ns
- ip: 10.233.88.244
targetRef:
kind: Pod
name: name1-f748fb6b4-6vcmw
namespace: ns
ports:
- port: 8990
protocol: TCP`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-f748fb6b4-hpwpw
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
podIp: 10.233.66.239
phase: Running`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-f748fb6b4-6vcmw
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
podIp: 10.233.88.244
phase: Running`,
},
host: "192.168.210.92",
port: 8989,
expectedAddresses: []string{
"10.233.66.239:8990",
"10.233.88.244:8990",
},
expectedNoEndpoints: false,
expectedNoEndpointsServiceExists: false,
expectedError: false,
},
{
// Test for the issue described in linkerd/linkerd2#1853.
serviceType: "local service with named target port and differently-named service port",
k8sConfigs: []string{`
apiVersion: v1
kind: Service
metadata:
name: world
namespace: ns
spec:
clusterIP: 192.168.210.92
type: ClusterIP
ports:
- name: app
port: 7778
targetPort: http`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: world
namespace: ns
subsets:
- addresses:
- ip: 10.1.30.135
targetRef:
kind: Pod
name: world-575bf846b4-tp4hw
namespace: ns
ports:
- name: app
port: 7779
protocol: TCP`,
`
apiVersion: v1
kind: Pod
metadata:
name: world-575bf846b4-tp4hw
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
podIp: 10.1.30.135
phase: Running`,
},
host: "192.168.210.92",
port: 7778,
expectedAddresses: []string{
"10.1.30.135:7779",
},
expectedNoEndpoints: false,
expectedNoEndpointsServiceExists: false,
expectedError: false,
},
{
serviceType: "local services with missing pods",
k8sConfigs: []string{`
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: LoadBalancer
clusterIP: 192.168.210.92
ports:
- port: 8989`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: name1
namespace: ns
subsets:
- addresses:
- ip: 172.17.0.23
targetRef:
kind: Pod
name: name1-1
namespace: ns
- ip: 172.17.0.24
targetRef:
kind: Pod
name: name1-2
namespace: ns
- ip: 172.17.0.25
targetRef:
kind: Pod
name: name1-3
namespace: ns
ports:
- port: 8989`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-3
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
phase: Running
podIP: 172.17.0.25`,
},
host: "192.168.210.92",
port: 8989,
expectedAddresses: []string{
"172.17.0.25:8989",
},
expectedNoEndpoints: false,
expectedNoEndpointsServiceExists: false,
expectedError: false,
},
{
serviceType: "local services with no endpoints",
k8sConfigs: []string{`
apiVersion: v1
kind: Service
metadata:
name: name2
namespace: ns
spec:
type: LoadBalancer
clusterIP: 192.168.210.92
ports:
- port: 7979`,
},
host: "192.168.210.92",
port: 7979,
expectedAddresses: []string{},
expectedNoEndpoints: true,
expectedNoEndpointsServiceExists: true,
expectedError: false,
},
{
serviceType: "external name services",
k8sConfigs: []string{`
apiVersion: v1
kind: Service
metadata:
name: name3
namespace: ns
spec:
type: ExternalName
clusterIP: 192.168.210.92
externalName: foo`,
},
host: "192.168.210.92",
port: 6969,
expectedAddresses: []string{},
expectedNoEndpoints: false,
expectedNoEndpointsServiceExists: false,
expectedError: true,
},
{
serviceType: "services that do not yet exist",
k8sConfigs: []string{},
host: "192.168.210.92",
port: 5959,
expectedAddresses: []string{"192.168.210.92:5959"},
expectedNoEndpoints: false,
expectedNoEndpointsServiceExists: false,
expectedError: false,
},
{
serviceType: "pod ip",
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
name: name1-1
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
phase: Running
podIP: 172.17.0.12`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-2
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
phase: Running
podIP: 172.17.0.19`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-3
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
phase: Running
podIP: 172.17.0.20`,
},
host: "172.17.0.12",
port: 8989,
expectedAddresses: []string{
"172.17.0.12:8989",
},
expectedNoEndpoints: false,
expectedNoEndpointsServiceExists: false,
expectedError: false,
},
{
serviceType: "pod with hostNetwork",
k8sConfigs: []string{`
apiVersion: v1
kind: Pod
metadata:
name: name1-1
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
spec:
hostNetwork: true
status:
phase: Running
podIP: 172.17.0.12`,
},
host: "172.17.0.12",
port: 8989,
expectedAddresses: []string{
"172.17.0.12:8989",
},
expectedNoEndpoints: false,
expectedNoEndpointsServiceExists: false,
expectedError: false,
},
} {
tt := tt // pin
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
endpoints := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name))
watcher := NewIPWatcher(k8sAPI, endpoints, logging.WithField("test", t.Name))
k8sAPI.Sync()
listener := newBufferingEndpointListener()
err = watcher.Subscribe(tt.host, tt.port, listener)
if tt.expectedError && err == nil {
t.Fatal("Expected error but was ok")
}
if !tt.expectedError && err != nil {
t.Fatalf("Expected no error, got [%s]", err)
}
actualAddresses := make([]string, 0)
actualAddresses = append(actualAddresses, listener.added...)
sort.Strings(actualAddresses)
testCompare(t, tt.expectedAddresses, actualAddresses)
if listener.noEndpointsCalled != tt.expectedNoEndpoints {
t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
tt.expectedNoEndpoints, listener.noEndpointsCalled)
}
if listener.noEndpointsExists != tt.expectedNoEndpointsServiceExists {
t.Fatalf("Expected noEndpointsExists to be [%t], got [%t]",
tt.expectedNoEndpointsServiceExists, listener.noEndpointsExists)
}
})
}
}

View File

@ -37,7 +37,7 @@ func Main(args []string) {
k8sAPI, err := k8s.InitializeAPI(
*kubeConfigPath,
k8s.Endpoint, k8s.Pod, k8s.RS, k8s.Svc, k8s.SP, k8s.TS,
k8s.Endpoint, k8s.Pod, k8s.RS, k8s.Svc, k8s.SP, k8s.TS, k8s.Job,
)
if err != nil {
log.Fatalf("Failed to initialize K8s API: %s", err)

View File

@ -59,9 +59,10 @@ var (
// Linkerd commonly logs these errors during testing, remove these once
// they're addressed: https://github.com/linkerd/linkerd2/issues/2453
knownControllerErrorsRegex = regexp.MustCompile(strings.Join([]string{
`.* linkerd-controller-.*-.* tap time=".*" level=error msg="\[.*\] encountered an error: rpc error: code = Canceled desc = context canceled"`,
`.* linkerd-web-.*-.* web time=".*" level=error msg="Post http://linkerd-controller-api\..*\.svc\.cluster\.local:8085/api/v1/Version: context canceled"`,
`.* linkerd-proxy-injector-.*-.* proxy-injector time=".*" level=warning msg="failed to retrieve replicaset from indexer, retrying with get request .*-smoke-test.*/smoke-test-.*-.*: replicaset\.apps \\"smoke-test-.*-.*\\" not found"`,
`.*linkerd-controller-.*-.* tap time=".*" level=error msg="\[.*\] encountered an error: rpc error: code = Canceled desc = context canceled"`,
`.*linkerd-web-.*-.* web time=".*" level=error msg="Post http://linkerd-controller-api\..*\.svc\.cluster\.local:8085/api/v1/Version: context canceled"`,
`.*linkerd-proxy-injector-.*-.* proxy-injector time=".*" level=warning msg="failed to retrieve replicaset from indexer .*-smoke-test.*/smoke-test-.*-.*: replicaset\.apps \\"smoke-test-.*-.*\\" not found"`,
`.*linkerd-destination-.* destination time=".*" level=warning msg="failed to retrieve replicaset from indexer .* not found"`,
}, "|"))
knownProxyErrorsRegex = regexp.MustCompile(strings.Join([]string{