Ignore pods with status.phase=Succeeded when watching IP addresses (#5412)

Ignore pods with status.phase=Succeeded when watching IP addresses

When a pod terminates successfully, some CNIs will assign its IP address
to newly created pods. This can lead to duplicate pod IPs in the same
Kubernetes cluster.

Filter out pods which are in a Succeeded phase since they are not 
routable anymore.

Fixes #5394

Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
This commit is contained in:
Filip Petkovski 2021-01-12 18:25:37 +01:00 committed by GitHub
parent 5e7586340b
commit 40192e258a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 4 deletions

View File

@ -93,6 +93,24 @@ spec:
value: 0.0.0.0:4143
name: linkerd-proxy`,
`
apiVersion: v1
kind: Pod
metadata:
name: name2-2
namespace: ns
status:
phase: Succeeded
podIP: 172.17.0.13`,
`
apiVersion: v1
kind: Pod
metadata:
name: name2-3
namespace: ns
status:
phase: Failed
podIP: 172.17.0.13`,
`
apiVersion: linkerd.io/v1alpha2
kind: ServiceProfile
metadata:

View File

@ -154,10 +154,23 @@ func (iw *IPWatcher) GetPod(podIP string) (*corev1.Pod, error) {
}
func getResource(ip string, informer cache.SharedIndexInformer) (interface{}, error) {
objs, err := informer.GetIndexer().ByIndex(podIPIndex, ip)
matchingObjs, err := informer.GetIndexer().ByIndex(podIPIndex, ip)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
objs := make([]interface{}, 0)
for _, obj := range matchingObjs {
// Ignore terminated pods,
// their IPs can be reused for new Running pods
pod, ok := obj.(*corev1.Pod)
if ok && podTerminated(pod) {
continue
}
objs = append(objs, obj)
}
if len(objs) > 1 {
return nil, status.Errorf(codes.FailedPrecondition, "IP address conflict: %v, %v", objs[0], objs[1])
}
@ -167,6 +180,11 @@ func getResource(ip string, informer cache.SharedIndexInformer) (interface{}, er
return nil, nil
}
func podTerminated(pod *corev1.Pod) bool {
phase := pod.Status.Phase
return phase == corev1.PodSucceeded || phase == corev1.PodFailed
}
func (iw *IPWatcher) addService(obj interface{}) {
service := obj.(*corev1.Service)
if service.Namespace == kubeSystem || service.Spec.ClusterIP == "None" {
@ -285,10 +303,18 @@ func (iw *IPWatcher) getOrNewServiceSubscriptions(clusterIP string) *serviceSubs
pods := []*corev1.Pod{}
for _, obj := range objs {
if pod, ok := obj.(*corev1.Pod); ok {
// Skip pods with HostNetwork.
if !pod.Spec.HostNetwork {
pods = append(pods, pod)
// Skip pods with HostNetwork
if pod.Spec.HostNetwork {
continue
}
// Ignore terminated pods,
// their IPs can be reused for new Running pods
if podTerminated(pod) {
continue
}
pods = append(pods, pod)
}
}
if len(pods) > 1 {