Destination service returns "Running" pod labels (#781)

When the Destination sees an IP address, it looks up Pods by that IP,
and associates Pod label data to it. If the lookup by IP returned more
than one Pod, it simply picked the first one. This is not correct,
specifically in cases where one pod is in a Running state, and others
are not.

Modify the Destination service to only return label data for Pods in the
Running state.

Fixes #773

Signed-off-by: Andrew Seigner <siggy@buoyant.io>
This commit is contained in:
Andrew Seigner 2018-04-17 14:42:54 -07:00 committed by GitHub
parent 6121afb6f2
commit 9e8cce0838
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 145 additions and 42 deletions

View File

@ -7,6 +7,7 @@ import (
"github.com/runconduit/conduit/controller/util"
pkgK8s "github.com/runconduit/conduit/pkg/k8s"
log "github.com/sirupsen/logrus"
coreV1 "k8s.io/api/core/v1"
)
type updateListener interface {
@ -74,28 +75,8 @@ func (l *endpointListener) NoEndpoints(exists bool) {
func (l *endpointListener) toWeightedAddrSet(endpoints []common.TcpAddress) *pb.WeightedAddrSet {
addrs := make([]*pb.WeightedAddr, 0)
for i, address := range endpoints {
metricLabelsForPod := map[string]string{}
ipAsString := util.IPToString(address.Ip)
resultingPods, err := l.podsByIp.GetPodsByIndex(ipAsString)
if err != nil {
log.Errorf("Error while finding pod for IP [%s], this IP will be sent with no metric labels: %v", ipAsString, err)
} else {
if len(resultingPods) == 0 || resultingPods[0] == nil {
log.Errorf("Could not find pod for IP [%s], this IP will be sent with no metric labels.", ipAsString)
} else {
pod := resultingPods[0]
metricLabelsForPod = pkgK8s.GetOwnerLabels(pod.ObjectMeta)
metricLabelsForPod["pod"] = pod.Name
}
}
addrs = append(addrs, &pb.WeightedAddr{
Addr: &endpoints[i],
Weight: 1,
MetricLabels: metricLabelsForPod,
})
for _, address := range endpoints {
addrs = append(addrs, l.toWeightedAddr(address))
}
return &pb.WeightedAddrSet{
@ -104,6 +85,35 @@ func (l *endpointListener) toWeightedAddrSet(endpoints []common.TcpAddress) *pb.
}
}
func (l *endpointListener) toWeightedAddr(address common.TcpAddress) *pb.WeightedAddr {
metricLabelsForPod := map[string]string{}
ipAsString := util.IPToString(address.Ip)
resultingPods, err := l.podsByIp.GetPodsByIndex(ipAsString)
if err != nil {
log.Errorf("Error while finding pod for IP [%s], this IP will be sent with no metric labels: %v", ipAsString, err)
} else {
podFound := false
for _, pod := range resultingPods {
if pod.Status.Phase == coreV1.PodRunning {
podFound = true
metricLabelsForPod = pkgK8s.GetOwnerLabels(pod.ObjectMeta)
metricLabelsForPod["pod"] = pod.Name
break
}
}
if !podFound {
log.Errorf("Could not find running pod for IP [%s], this IP will be sent with no metric labels.", ipAsString)
}
}
return &pb.WeightedAddr{
Addr: &address,
Weight: 1,
MetricLabels: metricLabelsForPod,
}
}
func (l *endpointListener) toAddrSet(endpoints []common.TcpAddress) *pb.AddrSet {
addrs := make([]*common.TcpAddress, 0)
for i := range endpoints {

View File

@ -14,6 +14,20 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type podExpected struct {
pod string
namespace string
replicationController string
phase v1.PodPhase
}
type listenerExpected struct {
pods []podExpected
address common.TcpAddress
listenerLabels map[string]string
addressLabels map[string]string
}
func TestEndpointListener(t *testing.T) {
t.Run("Sends one update for add and another for remove", func(t *testing.T) {
mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}}
@ -104,9 +118,12 @@ func TestEndpointListener(t *testing.T) {
pkgK8s.ProxyReplicationControllerLabel: expectedReplicationControllerName,
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
}
addedAddress2 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 222}}, Port: 22}
podIndex := &k8s.InMemoryPodIndex{BackingMap: map[string]*v1.Pod{ipForAddr1: podForAddedAddress1}}
podIndex := &k8s.InMemoryPodIndex{BackingMap: map[string][]*v1.Pod{ipForAddr1: []*v1.Pod{podForAddedAddress1}}}
mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}}
listener := &endpointListener{
@ -136,23 +153,97 @@ func TestEndpointListener(t *testing.T) {
}
})
t.Run("It returns when the underlying context is done", func(t *testing.T) {
context, cancelFn := context.WithCancel(context.Background())
mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}, contextToReturn: context}
listener := &endpointListener{stream: mockGetServer, podsByIp: k8s.NewEmptyPodIndex()}
t.Run("It only returns pods in a running state", func(t *testing.T) {
expectations := []listenerExpected{
listenerExpected{
pods: []podExpected{
podExpected{
pod: "pod1",
namespace: "this-namespace",
replicationController: "rc-name",
phase: v1.PodPending,
},
},
address: common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 666}}, Port: 1},
listenerLabels: map[string]string{
"service": "service-name",
"namespace": "this-namespace",
},
addressLabels: map[string]string{},
},
listenerExpected{
pods: []podExpected{
podExpected{
pod: "pod1",
namespace: "this-namespace",
replicationController: "rc-name",
phase: v1.PodPending,
},
podExpected{
pod: "pod2",
namespace: "this-namespace",
replicationController: "rc-name",
phase: v1.PodRunning,
},
podExpected{
pod: "pod3",
namespace: "this-namespace",
replicationController: "rc-name",
phase: v1.PodSucceeded,
},
},
address: common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 666}}, Port: 1},
listenerLabels: map[string]string{
"service": "service-name",
"namespace": "this-namespace",
},
addressLabels: map[string]string{
"pod": "pod2",
"replication_controller": "rc-name",
},
},
}
completed := make(chan bool)
go func() {
<-listener.Done()
completed <- true
}()
for _, exp := range expectations {
backingMap := map[string][]*v1.Pod{}
cancelFn()
for _, pod := range exp.pods {
ipForAddr := util.IPToString(exp.address.Ip)
podForAddedAddress := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pod.pod,
Namespace: pod.namespace,
Labels: map[string]string{
pkgK8s.ProxyReplicationControllerLabel: pod.replicationController,
},
},
Status: v1.PodStatus{
Phase: pod.phase,
},
}
c := <-completed
backingMap[ipForAddr] = append(backingMap[ipForAddr], podForAddedAddress)
}
podIndex := &k8s.InMemoryPodIndex{BackingMap: backingMap}
if !c {
t.Fatalf("Expected function to be completed after the cancel()")
mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}}
listener := &endpointListener{
podsByIp: podIndex,
labels: exp.listenerLabels,
stream: mockGetServer,
}
listener.Update([]common.TcpAddress{exp.address}, nil)
actualGlobalMetricLabels := mockGetServer.updatesReceived[0].GetAdd().MetricLabels
if !reflect.DeepEqual(actualGlobalMetricLabels, exp.listenerLabels) {
t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", exp.listenerLabels, actualGlobalMetricLabels)
}
actualAddedAddressMetricLabels := mockGetServer.updatesReceived[0].GetAdd().Addrs[0].MetricLabels
if !reflect.DeepEqual(actualAddedAddressMetricLabels, exp.addressLabels) {
t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", exp.addressLabels, actualAddedAddressMetricLabels)
}
}
})
}

View File

@ -30,21 +30,23 @@ func (m *MockEndpointsWatcher) Run() error {
func (m *MockEndpointsWatcher) Stop() {}
type InMemoryPodIndex struct {
BackingMap map[string]*v1.Pod
BackingMap map[string][]*v1.Pod
}
func (i *InMemoryPodIndex) GetPod(key string) (*v1.Pod, error) {
return i.BackingMap[key], nil
return i.BackingMap[key][0], nil
}
func (i *InMemoryPodIndex) GetPodsByIndex(key string) ([]*v1.Pod, error) {
return []*v1.Pod{i.BackingMap[key]}, nil
return i.BackingMap[key], nil
}
func (i *InMemoryPodIndex) List() ([]*v1.Pod, error) {
var pods []*v1.Pod
for _, value := range i.BackingMap {
pods = append(pods, value)
for _, byIndex := range i.BackingMap {
for _, pod := range byIndex {
pods = append(pods, pod)
}
}
return pods, nil
@ -53,5 +55,5 @@ func (i *InMemoryPodIndex) Run() error { return nil }
func (i *InMemoryPodIndex) Stop() {}
func NewEmptyPodIndex() PodIndex {
return &InMemoryPodIndex{BackingMap: map[string]*v1.Pod{}}
return &InMemoryPodIndex{BackingMap: map[string][]*v1.Pod{}}
}