Handle pod lookups for pods that map to a host IP and host port (#5904)

This fixes an issue where pod lookups by host IP and host port fail even though
the cluster has a matching pod.

Usually these manifested as `FailedPrecondition` errors, but the messages were
too long and resulted in http/2 errors. This change depends on #5893 which fixes
that separate issue.

This changes how often those `FailedPrecondition` errors actually occur. The
destination service now considers pod host IPs and should reduce the frequency
of those errors.

Closes #5881 

---

Lookups like this happen when a pod is created with a host IP and host port set
in its spec. It still has a pod IP when running, but requests to
`hostIP:hostPort` will also be redirected to the pod. Combinations of host IP
and host Port are unique to the cluster and enforced by Kubernetes.

Currently, the destination services fails to find pods in this scenario because
we only keep an index with pod and their pod IPs, not pods and their host IPs.
To fix this, we now also keep an index of pods and their host IPs—if and only if
they have the host IP set.

Now when doing a pod lookup, we consider both the IP and the port. We perform
the following steps:

1. Do a lookup by IP in the pod podIP index
  - If only one pod is found then return it
2. 0 or more than 1 pods have the same pod IP
3. Do a lookup by IP in the pod hostIP index
  - If any number of pods were found, we know that IP maps to a node IP.
    Therefore, we search for a pod with a matching host Port. If one exists then
    return it; if not then there is no pod that matches `hostIP:port`
4. The IP does not map to a host IP
5. If multiple pods were found in `1`, then we know there are pods with
   conflicting podIPs and an error is returned
6. If no pounds were found in `1` then there is no pod that matches `IP:port`

---

Aside from the additional IP watcher test being added, this can be tested with
the following steps:

1. Create a kind cluster. kind is required because it's pods in `kube-system`
   have the same pod IPs; this not the case with k3d: `bin/kind create cluster`
2. Install Linkerd with `4445` marked as opaque: `linkerd install --set
   proxy.opaquePorts="4445" |kubectl apply -f -`
2. Get the node IP: `kubectl get -o wide nodes`
3. Pull my fork of `tcp-echo`:

```
$ git clone https://github.com/kleimkuhler/tcp-echo
...
$ git checkout --track kleimkuhler/host-pod-repro
```

5. `helm package .`
7. Install `tcp-echo` with the server not injected and correct host IP: `helm
   install tcp-echo tcp-echo-0.1.0.tgz --set server.linkerdInject="false" --set
   hostIP="..."`
8. Looking at the client's proxy logs, you should not observe any errors or
   protocol detection timeouts.
9. Looking at the server logs, you should see all the requests coming through
   correctly.

Signed-off-by: Kevin Leimkuhler <kevin@kleimkuhler.com>
This commit is contained in:
Kevin Leimkuhler 2021-03-18 13:29:43 -04:00 committed by GitHub
parent fe6f0c8687
commit 3f72c998b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 181 additions and 13 deletions

View File

@ -204,7 +204,7 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
fqn = fmt.Sprintf("%s.%s.svc.%s", service.Name, service.Namespace, s.clusterDomain)
} else {
// If the IP does not map to a service, check if it maps to a pod
pod, err := s.ips.GetPod(ip.String())
pod, err := s.ips.GetPod(ip.String(), port)
if err != nil {
return err
}

View File

@ -148,6 +148,13 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry, enableEndpointSlic
k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{podIPIndex: func(obj interface{}) ([]string, error) {
if pod, ok := obj.(*corev1.Pod); ok {
// Pods that run in the host network are indexed by the host IP
// indexer in the IP watcher; they should be skipped by the pod
// IP indexer which is responsible only for indexing pod network
// pods.
if pod.Spec.HostNetwork {
return nil, nil
}
return []string{pod.Status.PodIP}, nil
}
return []string{""}, fmt.Errorf("object is not a pod")

View File

@ -3,6 +3,7 @@ package watcher
import (
"context"
"fmt"
"strings"
"sync"
"github.com/linkerd/linkerd2/controller/k8s"
@ -13,6 +14,8 @@ import (
"k8s.io/client-go/tools/cache"
)
const hostIPIndex = "hostIP"
type (
// IPWatcher wraps a EndpointsWatcher and allows subscriptions by
// IP address. It watches all services in the cluster to keep an index
@ -93,6 +96,27 @@ func NewIPWatcher(k8sAPI *k8s.API, endpoints *EndpointsWatcher, log *logging.Ent
},
})
k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{hostIPIndex: func(obj interface{}) ([]string, error) {
if pod, ok := obj.(*corev1.Pod); ok {
var hostIPPods []string
if pod.Status.HostIP != "" {
// If the pod is reachable from the host network, then for
// each of its containers' ports that exposes a host port, add
// that hostIP:hostPort endpoint to the indexer.
for _, c := range pod.Spec.Containers {
for _, p := range c.Ports {
if p.HostPort != 0 {
addr := fmt.Sprintf("%s:%d", pod.Status.HostIP, p.HostPort)
hostIPPods = append(hostIPPods, addr)
}
}
}
}
return hostIPPods, nil
}
return nil, fmt.Errorf("object is not a pod")
}})
return iw
}
@ -133,7 +157,12 @@ func (iw *IPWatcher) GetSvcID(clusterIP string) (*ServiceID, error) {
services = append(services, service)
}
if len(services) > 1 {
return nil, status.Errorf(codes.FailedPrecondition, "found %d services with conflicting cluster IP %s; first two: %s/%s, %s/%s", len(services), clusterIP, services[0].Namespace, services[0].Name, services[1].Namespace, services[1].Name)
conflictingServices := []string{}
for _, service := range services {
conflictingServices = append(conflictingServices, fmt.Sprintf("%s:%s", service.Namespace, service.Name))
}
iw.log.Warnf("found conflicting %s cluster IP: %s", clusterIP, strings.Join(conflictingServices, ","))
return nil, status.Errorf(codes.FailedPrecondition, "found %d services with conflicting cluster IP %s", len(services), clusterIP)
}
if len(services) == 0 {
return nil, nil
@ -145,12 +174,58 @@ func (iw *IPWatcher) GetSvcID(clusterIP string) (*ServiceID, error) {
return service, nil
}
// GetPod returns the pod that corresponds to an IP address if one exists.
func (iw *IPWatcher) GetPod(podIP string) (*corev1.Pod, error) {
objs, err := iw.k8sAPI.Pod().Informer().GetIndexer().ByIndex(podIPIndex, podIP)
// GetPod returns a pod that maps to the given IP address. The pod can either
// be in the host network or the pod network. If the pod is in the host
// network, then it must have a container port that exposes `port` as a host
// port.
func (iw *IPWatcher) GetPod(podIP string, port uint32) (*corev1.Pod, error) {
// First we check if the address maps to a pod in the host network.
addr := fmt.Sprintf("%s:%d", podIP, port)
hostIPPods, err := iw.getIndexedPods(hostIPIndex, addr)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
if len(hostIPPods) == 1 {
iw.log.Debugf("found %s:%d on the host network", podIP, port)
return hostIPPods[0], nil
}
if len(hostIPPods) > 1 {
conflictingPods := []string{}
for _, pod := range hostIPPods {
conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name))
}
iw.log.Warnf("found conflicting %s:%d endpoint on the host network: %s", podIP, port, strings.Join(conflictingPods, ","))
return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting host network endpoint %s:%d", len(hostIPPods), podIP, port)
}
// The address did not map to a pod in the host network, so now we check
// if the IP maps to a pod IP in the pod network.
podIPPods, err := iw.getIndexedPods(podIPIndex, podIP)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
if len(podIPPods) == 1 {
iw.log.Debugf("found %s on the pod network", podIP)
return podIPPods[0], nil
}
if len(podIPPods) > 1 {
conflictingPods := []string{}
for _, pod := range podIPPods {
conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name))
}
iw.log.Warnf("found conflicting %s IP on the pod network: %s", podIP, strings.Join(conflictingPods, ","))
return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting pod network IP %s", len(podIPPods), podIP)
}
iw.log.Debugf("no pod found for %s:%d", podIP, port)
return nil, nil
}
func (iw *IPWatcher) getIndexedPods(indexName string, podIP string) ([]*corev1.Pod, error) {
objs, err := iw.k8sAPI.Pod().Informer().GetIndexer().ByIndex(indexName, podIP)
if err != nil {
return nil, fmt.Errorf("failed getting %s indexed pods: %s", indexName, err)
}
pods := make([]*corev1.Pod, 0)
for _, obj := range objs {
pod := obj.(*corev1.Pod)
@ -159,13 +234,7 @@ func (iw *IPWatcher) GetPod(podIP string) (*corev1.Pod, error) {
}
pods = append(pods, pod)
}
if len(pods) > 1 {
return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with conflicting pod IP %s; first two: %s/%s, %s/%s", len(pods), podIP, pods[0].Namespace, pods[0].Name, pods[1].Namespace, pods[1].Name)
}
if len(pods) == 0 {
return nil, nil
}
return pods[0], nil
return pods, nil
}
func podTerminated(pod *corev1.Pod) bool {

View File

@ -2,6 +2,7 @@ package watcher
import (
"fmt"
"strings"
"testing"
"k8s.io/client-go/tools/cache"
@ -677,7 +678,7 @@ status:
}
}
func TestIpWatcherGetSvc(t *testing.T) {
func TestIpWatcherGetSvcID(t *testing.T) {
name := "service"
namespace := "test"
clusterIP := "10.256.0.1"
@ -736,3 +737,94 @@ spec:
}
})
}
func TestIpWatcherGetPod(t *testing.T) {
podIP := "10.255.0.1"
hostIP := "172.0.0.1"
var hostPort1 uint32 = 12345
var hostPort2 uint32 = 12346
expectedPodName := "hostPortPod1"
k8sConfigs := []string{`
apiVersion: v1
kind: Pod
metadata:
name: hostPortPod1
namespace: ns
spec:
containers:
- image: test
name: hostPortContainer1
ports:
- containerPort: 12345
hostIP: 172.0.0.1
hostPort: 12345
- image: test
name: hostPortContainer2
ports:
- containerPort: 12346
hostIP: 172.0.0.1
hostPort: 12346
status:
phase: Running
podIP: 10.255.0.1
hostIP: 172.0.0.1`,
`
apiVersion: v1
kind: Pod
metadata:
name: pod
namespace: ns
status:
phase: Running
podIP: 10.255.0.1`,
}
t.Run("get pod by host IP and host port", func(t *testing.T) {
k8sAPI, err := k8s.NewFakeAPI(k8sConfigs...)
if err != nil {
t.Fatalf("failed to create new fake API: %s", err)
}
endpoints := NewEndpointsWatcher(k8sAPI, logging.WithField("test", t.Name()), false)
watcher := NewIPWatcher(k8sAPI, endpoints, logging.WithField("test", t.Name()))
k8sAPI.Sync(nil)
// Get host IP pod that is mapped to the port `hostPort1`
pod, err := watcher.GetPod(hostIP, hostPort1)
if err != nil {
t.Fatalf("failed to get pod: %s", err)
}
if pod == nil {
t.Fatalf("failed to find pod mapped to %s:%d", hostIP, hostPort1)
}
if pod.Name != expectedPodName {
t.Fatalf("expected pod name to be %s, but got %s", expectedPodName, pod.Name)
}
// Get host IP pod that is mapped to the port `hostPort2`; this tests
// that the indexer properly adds multiple containers from a single
// pod.
pod, err = watcher.GetPod(hostIP, hostPort2)
if err != nil {
t.Fatalf("failed to get pod: %s", err)
}
if pod == nil {
t.Fatalf("failed to find pod mapped to %s:%d", hostIP, hostPort2)
}
if pod.Name != expectedPodName {
t.Fatalf("expected pod name to be %s, but got %s", expectedPodName, pod.Name)
}
// Get host IP pod with unmapped host port
pod, err = watcher.GetPod(hostIP, 12347)
if err != nil {
t.Fatalf("expected no error when getting host IP pod with unmapped host port, but got: %s", err)
}
if pod != nil {
t.Fatal("expected no pod to be found with unmapped host port")
}
// Get pod IP pod and expect an error
_, err = watcher.GetPod(podIP, 12346)
if err == nil {
t.Fatal("expected error when getting by pod IP and unmapped host port, but got none")
}
if !strings.Contains(err.Error(), "pods with a conflicting pod network IP") {
t.Fatalf("expected error to be pod IP address conflict, but got: %s", err)
}
})
}