From 9e8cce08389afd3ee0168a2bf61c86ae77b89d8e Mon Sep 17 00:00:00 2001 From: Andrew Seigner Date: Tue, 17 Apr 2018 14:42:54 -0700 Subject: [PATCH] Destination service returns "Running" pod labels (#781) When the Destination sees an IP address, it looks up Pods by that IP, and associates Pod label data to it. If the lookup by IP returned more than one Pod, it simply picked the first one. This is not correct, specifically in cases where one pod is in a Running state, and others are not. Modify the Destination service to only return label data for Pods in the Running state. Fixes #773 Signed-off-by: Andrew Seigner --- controller/destination/listener.go | 54 ++++++----- controller/destination/listener_test.go | 119 +++++++++++++++++++++--- controller/k8s/test_helper.go | 14 +-- 3 files changed, 145 insertions(+), 42 deletions(-) diff --git a/controller/destination/listener.go b/controller/destination/listener.go index 0de81e2a6..34f447b49 100644 --- a/controller/destination/listener.go +++ b/controller/destination/listener.go @@ -7,6 +7,7 @@ import ( "github.com/runconduit/conduit/controller/util" pkgK8s "github.com/runconduit/conduit/pkg/k8s" log "github.com/sirupsen/logrus" + coreV1 "k8s.io/api/core/v1" ) type updateListener interface { @@ -74,28 +75,8 @@ func (l *endpointListener) NoEndpoints(exists bool) { func (l *endpointListener) toWeightedAddrSet(endpoints []common.TcpAddress) *pb.WeightedAddrSet { addrs := make([]*pb.WeightedAddr, 0) - for i, address := range endpoints { - metricLabelsForPod := map[string]string{} - - ipAsString := util.IPToString(address.Ip) - resultingPods, err := l.podsByIp.GetPodsByIndex(ipAsString) - if err != nil { - log.Errorf("Error while finding pod for IP [%s], this IP will be sent with no metric labels: %v", ipAsString, err) - } else { - if len(resultingPods) == 0 || resultingPods[0] == nil { - log.Errorf("Could not find pod for IP [%s], this IP will be sent with no metric labels.", ipAsString) - } else { - pod := resultingPods[0] - metricLabelsForPod = pkgK8s.GetOwnerLabels(pod.ObjectMeta) - metricLabelsForPod["pod"] = pod.Name - } - } - - addrs = append(addrs, &pb.WeightedAddr{ - Addr: &endpoints[i], - Weight: 1, - MetricLabels: metricLabelsForPod, - }) + for _, address := range endpoints { + addrs = append(addrs, l.toWeightedAddr(address)) } return &pb.WeightedAddrSet{ @@ -104,6 +85,35 @@ func (l *endpointListener) toWeightedAddrSet(endpoints []common.TcpAddress) *pb. } } +func (l *endpointListener) toWeightedAddr(address common.TcpAddress) *pb.WeightedAddr { + metricLabelsForPod := map[string]string{} + ipAsString := util.IPToString(address.Ip) + + resultingPods, err := l.podsByIp.GetPodsByIndex(ipAsString) + if err != nil { + log.Errorf("Error while finding pod for IP [%s], this IP will be sent with no metric labels: %v", ipAsString, err) + } else { + podFound := false + for _, pod := range resultingPods { + if pod.Status.Phase == coreV1.PodRunning { + podFound = true + metricLabelsForPod = pkgK8s.GetOwnerLabels(pod.ObjectMeta) + metricLabelsForPod["pod"] = pod.Name + break + } + } + if !podFound { + log.Errorf("Could not find running pod for IP [%s], this IP will be sent with no metric labels.", ipAsString) + } + } + + return &pb.WeightedAddr{ + Addr: &address, + Weight: 1, + MetricLabels: metricLabelsForPod, + } +} + func (l *endpointListener) toAddrSet(endpoints []common.TcpAddress) *pb.AddrSet { addrs := make([]*common.TcpAddress, 0) for i := range endpoints { diff --git a/controller/destination/listener_test.go b/controller/destination/listener_test.go index 5d95c7a38..699cd3644 100644 --- a/controller/destination/listener_test.go +++ b/controller/destination/listener_test.go @@ -14,6 +14,20 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +type podExpected struct { + pod string + namespace string + replicationController string + phase v1.PodPhase +} + +type listenerExpected struct { + pods []podExpected + address common.TcpAddress + listenerLabels map[string]string + addressLabels map[string]string +} + func TestEndpointListener(t *testing.T) { t.Run("Sends one update for add and another for remove", func(t *testing.T) { mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} @@ -104,9 +118,12 @@ func TestEndpointListener(t *testing.T) { pkgK8s.ProxyReplicationControllerLabel: expectedReplicationControllerName, }, }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, } addedAddress2 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 222}}, Port: 22} - podIndex := &k8s.InMemoryPodIndex{BackingMap: map[string]*v1.Pod{ipForAddr1: podForAddedAddress1}} + podIndex := &k8s.InMemoryPodIndex{BackingMap: map[string][]*v1.Pod{ipForAddr1: []*v1.Pod{podForAddedAddress1}}} mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} listener := &endpointListener{ @@ -136,23 +153,97 @@ func TestEndpointListener(t *testing.T) { } }) - t.Run("It returns when the underlying context is done", func(t *testing.T) { - context, cancelFn := context.WithCancel(context.Background()) - mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}, contextToReturn: context} - listener := &endpointListener{stream: mockGetServer, podsByIp: k8s.NewEmptyPodIndex()} + t.Run("It only returns pods in a running state", func(t *testing.T) { + expectations := []listenerExpected{ + listenerExpected{ + pods: []podExpected{ + podExpected{ + pod: "pod1", + namespace: "this-namespace", + replicationController: "rc-name", + phase: v1.PodPending, + }, + }, + address: common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 666}}, Port: 1}, + listenerLabels: map[string]string{ + "service": "service-name", + "namespace": "this-namespace", + }, + addressLabels: map[string]string{}, + }, + listenerExpected{ + pods: []podExpected{ + podExpected{ + pod: "pod1", + namespace: "this-namespace", + replicationController: "rc-name", + phase: v1.PodPending, + }, + podExpected{ + pod: "pod2", + namespace: "this-namespace", + replicationController: "rc-name", + phase: v1.PodRunning, + }, + podExpected{ + pod: "pod3", + namespace: "this-namespace", + replicationController: "rc-name", + phase: v1.PodSucceeded, + }, + }, + address: common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 666}}, Port: 1}, + listenerLabels: map[string]string{ + "service": "service-name", + "namespace": "this-namespace", + }, + addressLabels: map[string]string{ + "pod": "pod2", + "replication_controller": "rc-name", + }, + }, + } - completed := make(chan bool) - go func() { - <-listener.Done() - completed <- true - }() + for _, exp := range expectations { + backingMap := map[string][]*v1.Pod{} - cancelFn() + for _, pod := range exp.pods { + ipForAddr := util.IPToString(exp.address.Ip) + podForAddedAddress := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.pod, + Namespace: pod.namespace, + Labels: map[string]string{ + pkgK8s.ProxyReplicationControllerLabel: pod.replicationController, + }, + }, + Status: v1.PodStatus{ + Phase: pod.phase, + }, + } - c := <-completed + backingMap[ipForAddr] = append(backingMap[ipForAddr], podForAddedAddress) + } + podIndex := &k8s.InMemoryPodIndex{BackingMap: backingMap} - if !c { - t.Fatalf("Expected function to be completed after the cancel()") + mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} + listener := &endpointListener{ + podsByIp: podIndex, + labels: exp.listenerLabels, + stream: mockGetServer, + } + + listener.Update([]common.TcpAddress{exp.address}, nil) + + actualGlobalMetricLabels := mockGetServer.updatesReceived[0].GetAdd().MetricLabels + if !reflect.DeepEqual(actualGlobalMetricLabels, exp.listenerLabels) { + t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", exp.listenerLabels, actualGlobalMetricLabels) + } + + actualAddedAddressMetricLabels := mockGetServer.updatesReceived[0].GetAdd().Addrs[0].MetricLabels + if !reflect.DeepEqual(actualAddedAddressMetricLabels, exp.addressLabels) { + t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", exp.addressLabels, actualAddedAddressMetricLabels) + } } }) } diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index 9204ceb8d..b4f0b5088 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -30,21 +30,23 @@ func (m *MockEndpointsWatcher) Run() error { func (m *MockEndpointsWatcher) Stop() {} type InMemoryPodIndex struct { - BackingMap map[string]*v1.Pod + BackingMap map[string][]*v1.Pod } func (i *InMemoryPodIndex) GetPod(key string) (*v1.Pod, error) { - return i.BackingMap[key], nil + return i.BackingMap[key][0], nil } func (i *InMemoryPodIndex) GetPodsByIndex(key string) ([]*v1.Pod, error) { - return []*v1.Pod{i.BackingMap[key]}, nil + return i.BackingMap[key], nil } func (i *InMemoryPodIndex) List() ([]*v1.Pod, error) { var pods []*v1.Pod - for _, value := range i.BackingMap { - pods = append(pods, value) + for _, byIndex := range i.BackingMap { + for _, pod := range byIndex { + pods = append(pods, pod) + } } return pods, nil @@ -53,5 +55,5 @@ func (i *InMemoryPodIndex) Run() error { return nil } func (i *InMemoryPodIndex) Stop() {} func NewEmptyPodIndex() PodIndex { - return &InMemoryPodIndex{BackingMap: map[string]*v1.Pod{}} + return &InMemoryPodIndex{BackingMap: map[string][]*v1.Pod{}} }