Controller: add more destination labels, fix service label (#731)

* Add more destination labels, fix service label

* Update owner labels to match proxy metrics docs

Signed-off-by: Kevin Lingerfelt <kl@buoyant.io>
This commit is contained in:
Kevin Lingerfelt 2018-04-11 10:44:52 -07:00 committed by GitHub
parent 7f54b5253d
commit e1e1b6b599
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 141 additions and 35 deletions

View File

@ -22,23 +22,32 @@ type k8sResolver struct {
dnsWatcher DnsWatcher
}
type serviceId struct {
namespace string
name string
}
func (s *serviceId) String() string {
return fmt.Sprintf("%s/%s", s.namespace, s.name)
}
func (k *k8sResolver) canResolve(host string, port int) (bool, error) {
name, err := k.localKubernetesServiceIdFromDNSName(host)
id, err := k.localKubernetesServiceIdFromDNSName(host)
if err != nil {
return false, err
}
return name != nil, nil
return id != nil, nil
}
func (k *k8sResolver) streamResolution(host string, port int, listener updateListener) error {
serviceName, err := k.localKubernetesServiceIdFromDNSName(host)
id, err := k.localKubernetesServiceIdFromDNSName(host)
if err != nil {
log.Error(err)
return err
}
if serviceName == nil {
if id == nil {
// TODO: Resolve name using DNS similar to Kubernetes' ClusterFirst
// resolution.
err = fmt.Errorf("cannot resolve service that isn't a local Kubernetes service: %s", host)
@ -46,17 +55,19 @@ func (k *k8sResolver) streamResolution(host string, port int, listener updateLis
return err
}
svc, exists, err := k.endpointsWatcher.GetService(*serviceName)
svc, exists, err := k.endpointsWatcher.GetService(id.String())
if err != nil {
log.Errorf("error retrieving service [%s]: %s", *serviceName, err)
log.Errorf("error retrieving service [%s]: %s", id.String(), err)
return err
}
listener.SetServiceId(id)
if exists && svc.Spec.Type == v1.ServiceTypeExternalName {
return k.resolveExternalName(svc.Spec.ExternalName, listener)
}
return k.resolveKubernetesService(*serviceName, port, listener)
return k.resolveKubernetesService(id.String(), port, listener)
}
func (k *k8sResolver) resolveKubernetesService(id string, port int, listener updateListener) error {
@ -83,7 +94,7 @@ func (k *k8sResolver) resolveExternalName(externalName string, listener updateLi
// "namespace-name/service-name" form if `host` is a DNS name in a form used
// for local Kubernetes services. It returns nil if `host` isn't in such a
// form.
func (k *k8sResolver) localKubernetesServiceIdFromDNSName(host string) (*string, error) {
func (k *k8sResolver) localKubernetesServiceIdFromDNSName(host string) (*serviceId, error) {
hostLabels, err := splitDNSName(host)
if err != nil {
return nil, err
@ -124,11 +135,11 @@ func (k *k8sResolver) localKubernetesServiceIdFromDNSName(host string) (*string,
if len(hostLabels) != 2 {
return nil, fmt.Errorf("not a service: %s", host)
}
service := hostLabels[0]
namespace := hostLabels[1]
id := namespace + "/" + service
return &id, nil
return &serviceId{
namespace: hostLabels[1],
name: hostLabels[0],
}, nil
}
func splitDNSName(dnsName string) ([]string, error) {

View File

@ -321,7 +321,7 @@ func assertIsResolved(t *testing.T, resolver *k8sResolver, nameToExpectedResolve
t.Fatalf("Expected name [%s] to resolve to [%s], but got [%v]", name, expectedResolvedName, resolvedName)
}
if *resolvedName != expectedResolvedName {
if resolvedName.String() != expectedResolvedName {
t.Fatalf("Expected name [%s] to resolve to [%s], but got [%s]", name, expectedResolvedName, *resolvedName)
}
}

View File

@ -5,6 +5,7 @@ import (
pb "github.com/runconduit/conduit/controller/gen/proxy/destination"
"github.com/runconduit/conduit/controller/k8s"
"github.com/runconduit/conduit/controller/util"
pkgK8s "github.com/runconduit/conduit/pkg/k8s"
log "github.com/sirupsen/logrus"
)
@ -12,19 +13,29 @@ type updateListener interface {
Update(add []common.TcpAddress, remove []common.TcpAddress)
Done() <-chan struct{}
NoEndpoints(exists bool)
SetServiceId(id *serviceId)
}
// implements the updateListener interface
type endpointListener struct {
serviceName string
stream pb.Destination_GetServer
podsByIp k8s.PodIndex
stream pb.Destination_GetServer
podsByIp k8s.PodIndex
labels map[string]string
}
func (l *endpointListener) Done() <-chan struct{} {
return l.stream.Context().Done()
}
func (l *endpointListener) SetServiceId(id *serviceId) {
if id != nil {
l.labels = map[string]string{
"namespace": id.namespace,
"service": id.name,
}
}
}
func (l *endpointListener) Update(add []common.TcpAddress, remove []common.TcpAddress) {
if len(add) > 0 {
update := &pb.Update{
@ -62,7 +73,6 @@ func (l *endpointListener) NoEndpoints(exists bool) {
}
func (l *endpointListener) toWeightedAddrSet(endpoints []common.TcpAddress) *pb.WeightedAddrSet {
var namespace string
addrs := make([]*pb.WeightedAddr, 0)
for i, address := range endpoints {
metricLabelsForPod := map[string]string{}
@ -76,11 +86,8 @@ func (l *endpointListener) toWeightedAddrSet(endpoints []common.TcpAddress) *pb.
log.Errorf("Could not find pod for IP [%s], this IP will be sent with no metric labels.", ipAsString)
} else {
pod := resultingPods[0]
metricLabelsForPod = map[string]string{
"pod": pod.Name,
}
namespace = pod.Namespace
metricLabelsForPod = pkgK8s.GetOwnerLabels(pod.ObjectMeta)
metricLabelsForPod["pod"] = pod.Name
}
}
@ -91,14 +98,9 @@ func (l *endpointListener) toWeightedAddrSet(endpoints []common.TcpAddress) *pb.
})
}
globalMetricLabels := map[string]string{
"service": l.serviceName,
"namespace": namespace,
}
return &pb.WeightedAddrSet{
Addrs: addrs,
MetricLabels: globalMetricLabels,
MetricLabels: l.labels,
}
}

View File

@ -9,6 +9,7 @@ import (
pb "github.com/runconduit/conduit/controller/gen/proxy/destination"
"github.com/runconduit/conduit/controller/k8s"
"github.com/runconduit/conduit/controller/util"
pkgK8s "github.com/runconduit/conduit/pkg/k8s"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -91,6 +92,7 @@ func TestEndpointListener(t *testing.T) {
expectedServiceName := "service-name"
expectedPodName := "pod1"
expectedNamespace := "this-namespace"
expectedReplicationControllerName := "rc-name"
addedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 666}}, Port: 1}
ipForAddr1 := util.IPToString(addedAddress1.Ip)
@ -98,6 +100,9 @@ func TestEndpointListener(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: expectedPodName,
Namespace: expectedNamespace,
Labels: map[string]string{
pkgK8s.ProxyReplicationControllerLabel: expectedReplicationControllerName,
},
},
}
addedAddress2 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 222}}, Port: 22}
@ -105,9 +110,12 @@ func TestEndpointListener(t *testing.T) {
mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}}
listener := &endpointListener{
podsByIp: podIndex,
serviceName: expectedServiceName,
stream: mockGetServer,
podsByIp: podIndex,
labels: map[string]string{
"service": expectedServiceName,
"namespace": expectedNamespace,
},
stream: mockGetServer,
}
listener.Update([]common.TcpAddress{addedAddress1, addedAddress2}, nil)
@ -119,7 +127,10 @@ func TestEndpointListener(t *testing.T) {
}
actualAddedAddress1MetricLabels := mockGetServer.updatesReceived[0].GetAdd().Addrs[0].MetricLabels
expectedAddedAddress1MetricLabels := map[string]string{"pod": expectedPodName}
expectedAddedAddress1MetricLabels := map[string]string{
"pod": expectedPodName,
"replication_controller": expectedReplicationControllerName,
}
if !reflect.DeepEqual(actualAddedAddress1MetricLabels, expectedAddedAddress1MetricLabels) {
t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedAddedAddress1MetricLabels, actualAddedAddress1MetricLabels)
}

View File

@ -107,8 +107,7 @@ func (s *server) Get(dest *common.Destination, stream pb.Destination_GetServer)
}
func (s *server) streamResolutionUsingCorrectResolverFor(host string, port int, stream pb.Destination_GetServer) error {
serviceName := fmt.Sprintf("%s:%d", host, port)
listener := &endpointListener{serviceName: serviceName, stream: stream, podsByIp: s.podsByIp}
listener := &endpointListener{stream: stream, podsByIp: s.podsByIp}
for _, resolver := range s.resolvers {
resolverCanResolve, err := resolver.canResolve(host, port)

View File

@ -24,6 +24,8 @@ func (c *collectUpdateListener) Done() <-chan struct{} {
func (c *collectUpdateListener) NoEndpoints(exists bool) {}
func (c *collectUpdateListener) SetServiceId(id *serviceId) {}
func newCollectUpdateListener() (*collectUpdateListener, context.CancelFunc) {
ctx, cancelFn := context.WithCancel(context.Background())
return &collectUpdateListener{context: ctx}, cancelFn

View File

@ -60,7 +60,7 @@ The following labels are only applicable on `response_*` metrics.
The following labels are only applicable if `direction=outbound`.
* `dst_deployment`: The deployment to which this request is being sent.
* `dst_job`: The job to which this request is being sent.
* `dst_k8s_job`: The job to which this request is being sent.
* `dst_replica_set`: The replica set to which this request is being sent.
* `dst_daemon_set`: The daemon set to which this request is being sent.
* `dst_replication_controller`: The replication controller to which this request

View File

@ -7,8 +7,10 @@ package k8s
import (
"fmt"
"strings"
"github.com/runconduit/conduit/pkg/version"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
@ -57,8 +59,39 @@ const (
ProxyVersionAnnotation = "conduit.io/proxy-version"
)
var proxyLabels = []string{
ProxyDeploymentLabel,
ProxyReplicationControllerLabel,
ProxyReplicaSetLabel,
ProxyJobLabel,
ProxyDaemonSetLabel,
}
// CreatedByAnnotationValue returns the value associated with
// CreatedByAnnotation.
func CreatedByAnnotationValue() string {
return fmt.Sprintf("conduit/cli %s", version.Version)
}
// GetOwnerLabels returns the set of prometheus owner labels that can be
// extracted from the proxy labels that have been added to an injected
// kubernetes resource
func GetOwnerLabels(objectMeta meta.ObjectMeta) map[string]string {
labels := make(map[string]string)
for _, label := range proxyLabels {
if labelValue, ok := objectMeta.Labels[label]; ok {
labels[toOwnerLabel(label)] = labelValue
}
}
return labels
}
// toOwnerLabel converts a proxy label to a prometheus label, following the
// relabel conventions from the prometheus scrape config file
func toOwnerLabel(proxyLabel string) string {
stripped := strings.TrimPrefix(proxyLabel, "conduit.io/proxy-")
if stripped == "job" {
return "k8s_job"
}
return strings.Replace(stripped, "-", "_", -1)
}

48
pkg/k8s/labels_test.go Normal file
View File

@ -0,0 +1,48 @@
package k8s
import (
"reflect"
"testing"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestGetOwnerLabels(t *testing.T) {
t.Run("Maps proxy labels to prometheus labels", func(t *testing.T) {
metadata := meta.ObjectMeta{
Labels: map[string]string{
ProxyDeploymentLabel: "test-deployment",
ProxyReplicationControllerLabel: "test-replication-controller",
ProxyReplicaSetLabel: "test-replica-set",
ProxyJobLabel: "test-job",
ProxyDaemonSetLabel: "test-daemon-set",
},
}
expectedLabels := map[string]string{
"deployment": "test-deployment",
"replication_controller": "test-replication-controller",
"replica_set": "test-replica-set",
"k8s_job": "test-job",
"daemon_set": "test-daemon-set",
}
ownerLabels := GetOwnerLabels(metadata)
if !reflect.DeepEqual(ownerLabels, expectedLabels) {
t.Fatalf("Expected owner labels [%v] but got [%v]", expectedLabels, ownerLabels)
}
})
t.Run("Ignores non-proxy labels", func(t *testing.T) {
metadata := meta.ObjectMeta{
Labels: map[string]string{"app": "foo"},
}
ownerLabels := GetOwnerLabels(metadata)
if len(ownerLabels) != 0 {
t.Fatalf("Expected no owner labels but got [%v]", ownerLabels)
}
})
}