Add support for stateful sets (#3113)

We add support for looking up individual pods in a stateful set with the destination service.  This allows Linkerd to correctly proxy requests which address individual pods.  The authority structure for such a request is `<pod-name>.<service>.<namespace>.svc.cluster.local:<port>`.

Fixes #2266 

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2019-07-24 14:09:46 -07:00 committed by GitHub
parent 808fa381f9
commit e538a05ce2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 315 additions and 175 deletions

View File

@ -28,25 +28,20 @@ func newEndpointTranslator(
controllerNS string,
identityTrustDomain string,
enableH2Upgrade bool,
authority string,
service watcher.ServiceID,
stream pb.Destination_GetServer,
log *logging.Entry,
) (*endpointTranslator, error) {
) *endpointTranslator {
log = log.WithFields(logging.Fields{
"component": "endpoint-translator",
"service": authority,
"service": service,
})
service, _, err := watcher.GetServiceAndPort(authority)
if err != nil {
return nil, err
}
labels := map[string]string{
"namespace": service.Namespace,
"service": service.Name,
}
return &endpointTranslator{controllerNS, identityTrustDomain, enableH2Upgrade, labels, stream, log}, nil
return &endpointTranslator{controllerNS, identityTrustDomain, enableH2Upgrade, labels, stream, log}
}
func (et *endpointTranslator) Add(set watcher.PodSet) {

View File

@ -96,17 +96,14 @@ var (
func makeEndpointTranslator(t *testing.T) (*mockDestinationGetServer, *endpointTranslator) {
mockGetServer := &mockDestinationGetServer{updatesReceived: []*pb.Update{}}
translator, err := newEndpointTranslator(
translator := newEndpointTranslator(
"linkerd",
"trust.domain",
false,
"service-name.service-ns.svc.mycluster.local",
watcher.ServiceID{Name: "service-name", Namespace: "service-ns"},
mockGetServer,
logging.WithField("test", t.Name),
)
if err != nil {
t.Fatal(err)
}
return mockGetServer, translator
}

View File

@ -2,6 +2,9 @@ package destination
import (
"context"
"fmt"
"strconv"
"strings"
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
@ -24,6 +27,7 @@ type (
enableH2Upgrade bool
controllerNS string
identityTrustDomain string
clusterDomain string
log *logging.Entry
shutdown <-chan struct{}
@ -48,6 +52,7 @@ func NewServer(
identityTrustDomain string,
enableH2Upgrade bool,
k8sAPI *k8s.API,
clusterDomain string,
shutdown <-chan struct{},
) *grpc.Server {
log := logging.WithFields(logging.Fields{
@ -65,6 +70,7 @@ func NewServer(
enableH2Upgrade,
controllerNS,
identityTrustDomain,
clusterDomain,
log,
shutdown,
}
@ -85,25 +91,27 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
}
log.Debugf("Get %s", dest.GetPath())
translator, err := newEndpointTranslator(
s.controllerNS,
s.identityTrustDomain,
s.enableH2Upgrade,
dest.GetPath(),
stream,
log,
)
service, port, hostname, err := parseServiceAuthority(dest.GetPath(), s.clusterDomain)
if err != nil {
log.Error(err)
log.Errorf("Invalid authority %s", dest.GetPath())
return err
}
err = s.endpoints.Subscribe(dest.GetPath(), translator)
translator := newEndpointTranslator(
s.controllerNS,
s.identityTrustDomain,
s.enableH2Upgrade,
service,
stream,
log,
)
err = s.endpoints.Subscribe(service, port, hostname, translator)
if err != nil {
log.Errorf("Failed to subscribe to %s: %s", dest.GetPath(), err)
return err
}
defer s.endpoints.Unsubscribe(dest.GetPath(), translator)
defer s.endpoints.Unsubscribe(service, port, hostname, translator)
select {
case <-s.shutdown:
@ -127,7 +135,7 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
// and pushes them onto the gRPC stream.
translator := newProfileTranslator(stream, log)
service, port, err := watcher.GetServiceAndPort(dest.GetPath())
service, port, _, err := parseServiceAuthority(dest.GetPath(), s.clusterDomain)
if err != nil {
return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
}
@ -153,20 +161,29 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
// up to the fallbackProfileListener to merge updates from the primary and
// secondary listeners and send the appropriate updates to the stream.
if dest.GetContextToken() != "" {
err := s.profiles.Subscribe(dest.GetPath(), dest.GetContextToken(), primary)
profile, err := profileID(dest.GetPath(), dest.GetContextToken(), s.clusterDomain)
if err != nil {
return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
}
err = s.profiles.Subscribe(profile, primary)
if err != nil {
log.Warnf("Failed to subscribe to profile %s: %s", dest.GetPath(), err)
return err
}
defer s.profiles.Unsubscribe(dest.GetPath(), dest.GetContextToken(), primary)
defer s.profiles.Unsubscribe(profile, primary)
}
err = s.profiles.Subscribe(dest.GetPath(), "", secondary)
profile, err := profileID(dest.GetPath(), "", s.clusterDomain)
if err != nil {
return status.Errorf(codes.InvalidArgument, "Invalid authority: %s", dest.GetPath())
}
err = s.profiles.Subscribe(profile, secondary)
if err != nil {
log.Warnf("Failed to subscribe to profile %s: %s", dest.GetPath(), err)
return err
}
defer s.profiles.Unsubscribe(dest.GetPath(), "", secondary)
defer s.profiles.Unsubscribe(profile, secondary)
select {
case <-s.shutdown:
@ -181,3 +198,99 @@ func (s *server) Endpoints(ctx context.Context, params *discoveryPb.EndpointsPar
s.log.Debugf("serving endpoints request")
return nil, status.Error(codes.Unimplemented, "Not implemented")
}
////////////
/// util ///
////////////
func nsFromToken(token string) string {
// ns:<namespace>
parts := strings.Split(token, ":")
if len(parts) == 2 && parts[0] == "ns" {
return parts[1]
}
return ""
}
func profileID(authority string, contextToken string, clusterDomain string) (watcher.ProfileID, error) {
service, _, _, err := parseServiceAuthority(authority, clusterDomain)
if err != nil {
return watcher.ProfileID{}, err
}
id := watcher.ProfileID{
Name: fmt.Sprintf("%s.%s.svc.%s", service.Name, service.Namespace, clusterDomain),
Namespace: service.Namespace,
}
if contextNs := nsFromToken(contextToken); contextNs != "" {
id.Namespace = contextNs
}
return id, nil
}
func getHostAndPort(authority string) (string, watcher.Port, error) {
hostPort := strings.Split(authority, ":")
if len(hostPort) > 2 {
return "", 0, fmt.Errorf("Invalid destination %s", authority)
}
host := hostPort[0]
port := 80
if len(hostPort) == 2 {
var err error
port, err = strconv.Atoi(hostPort[1])
if err != nil {
return "", 0, fmt.Errorf("Invalid port %s", hostPort[1])
}
}
return host, watcher.Port(port), nil
}
// parseServiceAuthority is a utility function that destructures an authority
// into a service, port, and optionally a pod hostname. If the authority does
// not represent a Kubernetes service, an error is returned. If no port is
// specified in the authority, the HTTP default (80) is returned as the port
// number. If the authority is a pod DNS name then the pod hostname is returned
// as the 3rd return value. See https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/.
func parseServiceAuthority(authority string, clusterDomain string) (watcher.ServiceID, watcher.Port, string, error) {
host, port, err := getHostAndPort(authority)
if err != nil {
return watcher.ServiceID{}, 0, "", err
}
domains := strings.Split(host, ".")
suffix := append([]string{"svc"}, strings.Split(clusterDomain, ".")...)
n := len(domains)
// S.N.{suffix}
if n < 2+len(suffix) {
return watcher.ServiceID{}, 0, "", fmt.Errorf("Invalid k8s service %s", host)
}
if !hasSuffix(domains, suffix) {
return watcher.ServiceID{}, 0, "", fmt.Errorf("Invalid k8s service %s", host)
}
if n == 2+len(suffix) {
// <service>.<namespace>.<suffix>
service := watcher.ServiceID{
Name: domains[0],
Namespace: domains[1],
}
return service, port, "", nil
}
if n == 3+len(suffix) {
// <hostname>.<service>.<namespace>.<suffix>
service := watcher.ServiceID{
Name: domains[1],
Namespace: domains[2],
}
return service, port, domains[0], nil
}
return watcher.ServiceID{}, 0, "", fmt.Errorf("Invalid k8s service %s", host)
}
func hasSuffix(slice []string, suffix []string) bool {
for i, s := range slice[len(slice)-len(suffix):] {
if s != suffix[i] {
return false
}
}
return true
}

View File

@ -111,6 +111,7 @@ spec:
false,
"linkerd",
"trust.domain",
"mycluster.local",
log,
make(<-chan struct{}),
}

View File

@ -35,6 +35,11 @@ type (
// PodSet is a set of pods, indexed by IP.
PodSet = map[PodID]Address
portAndHostname struct {
port Port
hostname string
}
// EndpointsWatcher watches all endpoints and services in the Kubernetes
// cluster. Listeners can subscribe to a particular service and port and
// EndpointsWatcher will publish the address set and all future changes for
@ -47,24 +52,35 @@ type (
sync.RWMutex // This mutex protects modification of the map itself.
}
// servicePublisher represents a service along with a port number. Multiple
// listeners may be subscribed to a servicePublisher. servicePublisher maintains the
// current state of the address set and publishes diffs to all listeners when
// updates come from either the endpoints API or the service API.
// servicePublisher represents a service. It keeps a map of portPublishers
// keyed by port and hostname. This is because each watch on a service
// will have a port and optionally may specify a hostname. The port
// and hostname will influence the endpoint set which is why a separate
// portPublisher is required for each port and hostname combination. The
// service's port mapping will be applied to the requested port and the
// mapped port will be used in the addresses set. If a hostname is
// requested, the address set will be filtered to only include addresses
// with the requested hostname.
servicePublisher struct {
id ServiceID
log *logging.Entry
k8sAPI *k8s.API
ports map[Port]*portPublisher
ports map[portAndHostname]*portPublisher
// All access to the servicePublisher and its portPublishers is explicitly synchronized by
// this mutex.
sync.Mutex
}
// portPublisher represents a service along with a port and optionally a
// hostname. Multiple listeners may be subscribed to a portPublisher.
// portPublisher maintains the current state of the address set and
// publishes diffs to all listeners when updates come from either the
// endpoints API or the service API.
portPublisher struct {
id ServiceID
targetPort namedPort
hostname string
log *logging.Entry
k8sAPI *k8s.API
@ -124,34 +140,33 @@ func NewEndpointsWatcher(k8sAPI *k8s.API, log *logging.Entry) *EndpointsWatcher
// Subscribe to an authority.
// The provided listener will be updated each time the address set for the
// given authority is changed.
func (ew *EndpointsWatcher) Subscribe(authority string, listener EndpointUpdateListener) error {
id, port, err := GetServiceAndPort(authority)
if err != nil {
return err
func (ew *EndpointsWatcher) Subscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener) error {
if hostname == "" {
ew.log.Infof("Establishing watch on endpoint [%s:%d]", id, port)
} else {
ew.log.Infof("Establishing watch on endpoint [%s.%s:%d]", hostname, id, port)
}
ew.log.Infof("Establishing watch on endpoint [%s:%d]", id, port)
sp := ew.getOrNewServicePublisher(id)
sp.subscribe(port, listener)
sp.subscribe(port, hostname, listener)
return nil
}
// Unsubscribe removes a listener from the subscribers list for this authority.
func (ew *EndpointsWatcher) Unsubscribe(authority string, listener EndpointUpdateListener) {
id, port, err := GetServiceAndPort(authority)
if err != nil {
ew.log.Errorf("Invalid service name [%s]", authority)
return
func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener) {
if hostname == "" {
ew.log.Infof("Stopping watch on endpoint [%s:%d]", id, port)
} else {
ew.log.Infof("Stopping watch on endpoint [%s.%s:%d]", hostname, id, port)
}
ew.log.Infof("Stopping watch on endpoint [%s:%d]", id, port)
sp, ok := ew.getServicePublisher(id)
if !ok {
ew.log.Errorf("Cannot unsubscribe from unknown service [%s:%d]", id, port)
return
}
sp.unsubscribe(port, listener)
sp.unsubscribe(port, hostname, listener)
}
func (ew *EndpointsWatcher) addService(obj interface{}) {
@ -234,7 +249,7 @@ func (ew *EndpointsWatcher) getOrNewServicePublisher(id ServiceID) *servicePubli
"svc": id.Name,
}),
k8sAPI: ew.k8sAPI,
ports: make(map[Port]*portPublisher),
ports: make(map[portAndHostname]*portPublisher),
}
ew.publishers[id] = sp
}
@ -277,41 +292,49 @@ func (sp *servicePublisher) updateService(newService *corev1.Service) {
defer sp.Unlock()
sp.log.Debugf("Updating service for %s", sp.id)
for srcPort, port := range sp.ports {
newTargetPort := getTargetPort(newService, srcPort)
for key, port := range sp.ports {
newTargetPort := getTargetPort(newService, key.port)
if newTargetPort != port.targetPort {
port.updatePort(newTargetPort)
}
}
}
func (sp *servicePublisher) subscribe(srcPort Port, listener EndpointUpdateListener) {
func (sp *servicePublisher) subscribe(srcPort Port, hostname string, listener EndpointUpdateListener) {
sp.Lock()
defer sp.Unlock()
port, ok := sp.ports[srcPort]
key := portAndHostname{
port: srcPort,
hostname: hostname,
}
port, ok := sp.ports[key]
if !ok {
port = sp.newPortPublisher(srcPort)
sp.ports[srcPort] = port
port = sp.newPortPublisher(srcPort, hostname)
sp.ports[key] = port
}
port.subscribe(listener)
}
func (sp *servicePublisher) unsubscribe(srcPort Port, listener EndpointUpdateListener) {
func (sp *servicePublisher) unsubscribe(srcPort Port, hostname string, listener EndpointUpdateListener) {
sp.Lock()
defer sp.Unlock()
port, ok := sp.ports[srcPort]
key := portAndHostname{
port: srcPort,
hostname: hostname,
}
port, ok := sp.ports[key]
if ok {
port.unsubscribe(listener)
if len(port.listeners) == 0 {
endpointsVecs.unregister(sp.metricsLabels(srcPort))
delete(sp.ports, srcPort)
endpointsVecs.unregister(sp.metricsLabels(srcPort, hostname))
delete(sp.ports, key)
}
}
}
func (sp *servicePublisher) newPortPublisher(srcPort Port) *portPublisher {
func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *portPublisher {
targetPort := intstr.FromInt(int(srcPort))
svc, err := sp.k8sAPI.Svc().Lister().Services(sp.id.Namespace).Get(sp.id.Name)
if err != nil && !apierrors.IsNotFound(err) {
@ -334,10 +357,11 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port) *portPublisher {
port := &portPublisher{
listeners: []EndpointUpdateListener{},
targetPort: targetPort,
hostname: hostname,
exists: exists,
k8sAPI: sp.k8sAPI,
log: log,
metrics: endpointsVecs.newEndpointsMetrics(sp.metricsLabels(srcPort)),
metrics: endpointsVecs.newEndpointsMetrics(sp.metricsLabels(srcPort, hostname)),
}
endpoints, err := sp.k8sAPI.Endpoint().Lister().Endpoints(sp.id.Namespace).Get(sp.id.Name)
@ -351,8 +375,8 @@ func (sp *servicePublisher) newPortPublisher(srcPort Port) *portPublisher {
return port
}
func (sp *servicePublisher) metricsLabels(port Port) prometheus.Labels {
return endpointsLabels(sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)))
func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus.Labels {
return endpointsLabels(sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname)
}
/////////////////////
@ -393,6 +417,9 @@ func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) PodSe
for _, subset := range endpoints.Subsets {
resolvedPort := pp.resolveTargetPort(subset)
for _, endpoint := range subset.Addresses {
if pp.hostname != "" && pp.hostname != endpoint.Hostname {
continue
}
if endpoint.TargetRef.Kind == "Pod" {
id := PodID{
Name: endpoint.TargetRef.Name,

View File

@ -49,7 +49,9 @@ func TestEndpointsWatcher(t *testing.T) {
for _, tt := range []struct {
serviceType string
k8sConfigs []string
authority string
id ServiceID
hostname string
port Port
expectedAddresses []string
expectedNoEndpoints bool
expectedNoEndpointsServiceExists bool
@ -128,7 +130,8 @@ status:
phase: Running
podIP: 172.17.0.20`,
},
authority: "name1.ns.svc.cluster.local:8989",
id: ServiceID{Name: "name1", Namespace: "ns"},
port: 8989,
expectedAddresses: []string{
"172.17.0.12:8989",
"172.17.0.19:8989",
@ -197,7 +200,8 @@ status:
podIp: 10.233.88.244
phase: Running`,
},
authority: "name1.ns.svc.cluster.local:8989",
id: ServiceID{Name: "name1", Namespace: "ns"},
port: 8989,
expectedAddresses: []string{
"10.233.66.239:8990",
"10.233.88.244:8990",
@ -250,7 +254,8 @@ status:
podIp: 10.1.30.135
phase: Running`,
},
authority: "world.ns.svc.cluster.local:7778",
id: ServiceID{Name: "world", Namespace: "ns"},
port: 7778,
expectedAddresses: []string{
"10.1.30.135:7779",
},
@ -307,7 +312,8 @@ status:
phase: Running
podIP: 172.17.0.25`,
},
authority: "name1.ns.svc.cluster.local:8989",
id: ServiceID{Name: "name1", Namespace: "ns"},
port: 8989,
expectedAddresses: []string{
"172.17.0.25:8989",
},
@ -327,7 +333,8 @@ spec:
ports:
- port: 7979`,
},
authority: "name2.ns.svc.cluster.local:7979",
id: ServiceID{Name: "name2", Namespace: "ns"},
port: 7979,
expectedAddresses: []string{},
expectedNoEndpoints: true,
expectedNoEndpointsServiceExists: true,
@ -344,7 +351,8 @@ spec:
type: ExternalName
externalName: foo`,
},
authority: "name3.ns.svc.cluster.local:6969",
id: ServiceID{Name: "name3", Namespace: "ns"},
port: 6969,
expectedAddresses: []string{},
expectedNoEndpoints: true,
expectedNoEndpointsServiceExists: false,
@ -352,11 +360,96 @@ spec:
{
serviceType: "services that do not yet exist",
k8sConfigs: []string{},
authority: "name4.ns.svc.cluster.local:5959",
id: ServiceID{Name: "name4", Namespace: "ns"},
port: 5959,
expectedAddresses: []string{},
expectedNoEndpoints: true,
expectedNoEndpointsServiceExists: false,
},
{
serviceType: "stateful sets",
k8sConfigs: []string{`
apiVersion: v1
kind: Service
metadata:
name: name1
namespace: ns
spec:
type: LoadBalancer
ports:
- port: 8989`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: name1
namespace: ns
subsets:
- addresses:
- ip: 172.17.0.12
hostname: name1-1
targetRef:
kind: Pod
name: name1-1
namespace: ns
- ip: 172.17.0.19
hostname: name1-2
targetRef:
kind: Pod
name: name1-2
namespace: ns
- ip: 172.17.0.20
hostname: name1-3
targetRef:
kind: Pod
name: name1-3
namespace: ns
ports:
- port: 8989`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-1
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
phase: Running
podIP: 172.17.0.12`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-2
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
phase: Running
podIP: 172.17.0.19`,
`
apiVersion: v1
kind: Pod
metadata:
name: name1-3
namespace: ns
ownerReferences:
- kind: ReplicaSet
name: rs-1
status:
phase: Running
podIP: 172.17.0.20`,
},
id: ServiceID{Name: "name1", Namespace: "ns"},
hostname: "name1-3",
port: 5959,
expectedAddresses: []string{"172.17.0.20:5959"},
expectedNoEndpoints: false,
expectedNoEndpointsServiceExists: false,
},
} {
tt := tt // pin
t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
@ -371,7 +464,7 @@ spec:
listener := newBufferingEndpointListener()
watcher.Subscribe(tt.authority, listener)
watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
actualAddresses := make([]string, 0)
actualAddresses = append(actualAddresses, listener.added...)

View File

@ -2,8 +2,6 @@ package watcher
import (
"fmt"
"strconv"
"strings"
"k8s.io/apimachinery/pkg/util/intstr"
)
@ -29,46 +27,3 @@ type (
func (i ID) String() string {
return fmt.Sprintf("%s/%s", i.Namespace, i.Name)
}
func getHostAndPort(authority string) (string, Port, error) {
hostPort := strings.Split(authority, ":")
if len(hostPort) > 2 {
return "", 0, fmt.Errorf("Invalid destination %s", authority)
}
host := hostPort[0]
port := 80
if len(hostPort) == 2 {
var err error
port, err = strconv.Atoi(hostPort[1])
if err != nil {
return "", 0, fmt.Errorf("Invalid port %s", hostPort[1])
}
}
return host, Port(port), nil
}
// GetServiceAndPort is a utility function that destructures an authority into
// a service and port. If the authority does not represent a Kubernetes
// service, an error is returned. If no port is specified in the authority,
// the HTTP default (80) is returned as the port number.
func GetServiceAndPort(authority string) (ServiceID, Port, error) {
host, port, err := getHostAndPort(authority)
if err != nil {
return ServiceID{}, 0, err
}
domains := strings.Split(host, ".")
// S.N.svc.cluster.local
if len(domains) != 5 {
return ServiceID{}, 0, fmt.Errorf("Invalid k8s service %s", host)
}
// Needs to have `".svc."` to be a valid k8s svc
if domains[2] != "svc" {
return ServiceID{}, 0, fmt.Errorf("Invalid k8s service %s", host)
}
service := ServiceID{
Name: domains[0],
Namespace: domains[1],
}
return service, port, nil
}

View File

@ -2,7 +2,6 @@ package watcher
import (
"fmt"
"strings"
"sync"
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
@ -71,12 +70,7 @@ func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) *ProfileWatcher {
// Subscribe to an authority.
// The provided listener will be updated each time the service profile for the
// given authority is changed.
func (pw *ProfileWatcher) Subscribe(authority string, contextToken string, listener ProfileUpdateListener) error {
id, err := profileID(authority, contextToken)
if err != nil {
return err
}
func (pw *ProfileWatcher) Subscribe(id ProfileID, listener ProfileUpdateListener) error {
pw.log.Infof("Establishing watch on profile %s", id)
publisher := pw.getOrNewProfilePublisher(id, nil)
@ -86,11 +80,7 @@ func (pw *ProfileWatcher) Subscribe(authority string, contextToken string, liste
}
// Unsubscribe removes a listener from the subscribers list for this authority.
func (pw *ProfileWatcher) Unsubscribe(authority string, contextToken string, listener ProfileUpdateListener) error {
id, err := profileID(authority, contextToken)
if err != nil {
return err
}
func (pw *ProfileWatcher) Unsubscribe(id ProfileID, listener ProfileUpdateListener) error {
pw.log.Infof("Stopping watch on profile %s", id)
publisher, ok := pw.getProfilePublisher(id)
@ -219,36 +209,3 @@ func (pp *profilePublisher) update(profile *sp.ServiceProfile) {
pp.profileMetrics.incUpdates()
}
////////////
/// util ///
////////////
func nsFromToken(token string) string {
// ns:<namespace>
parts := strings.Split(token, ":")
if len(parts) == 2 && parts[0] == "ns" {
return parts[1]
}
return ""
}
func profileID(authority string, contextToken string) (ProfileID, error) {
host, _, err := getHostAndPort(authority)
if err != nil {
return ProfileID{}, err
}
service, _, err := GetServiceAndPort(authority)
if err != nil {
return ProfileID{}, err
}
id := ProfileID{
Name: host,
Namespace: service.Namespace,
}
if contextNs := nsFromToken(contextToken); contextNs != "" {
id.Namespace = contextNs
}
return id, nil
}

View File

@ -12,8 +12,7 @@ func TestProfileWatcher(t *testing.T) {
for _, tt := range []struct {
name string
k8sConfigs []string
authority string
contextToken string
id ProfileID
expectedProfiles []*sp.ServiceProfileSpec
}{
{
@ -34,8 +33,7 @@ spec:
min: 500
isFailure: true`,
},
authority: "foobar.ns.svc.cluster.local",
contextToken: "ns:linkerd",
id: ProfileID{Name: "foobar.ns.svc.cluster.local", Namespace: "linkerd"},
expectedProfiles: []*sp.ServiceProfileSpec{
{
Routes: []*sp.RouteSpec{
@ -61,7 +59,7 @@ spec:
{
name: "service without profile",
k8sConfigs: []string{},
authority: "foobar.ns.svc.cluster.local",
id: ProfileID{Name: "foobar.ns.svc.cluster.local", Namespace: "ns"},
expectedProfiles: []*sp.ServiceProfileSpec{
nil,
},
@ -80,7 +78,7 @@ spec:
listener := NewBufferingProfileListener()
watcher.Subscribe(tt.authority, tt.contextToken, listener)
watcher.Subscribe(tt.id, listener)
actualProfiles := make([]*sp.ServiceProfileSpec, 0)

View File

@ -58,11 +58,12 @@ func newMetricsVecs(name string, labels []string) metricsVecs {
}
}
func endpointsLabels(namespace, service, port string) prometheus.Labels {
func endpointsLabels(namespace, service, port string, hostname string) prometheus.Labels {
return prometheus.Labels{
"namespace": namespace,
"service": service,
"port": port,
"hostname": hostname,
}
}
@ -75,7 +76,7 @@ func labelNames(labels prometheus.Labels) []string {
}
func newEndpointsMetricsVecs() endpointsMetricsVecs {
labels := labelNames(endpointsLabels("", "", ""))
labels := labelNames(endpointsLabels("", "", "", ""))
vecs := newMetricsVecs("endpoints", labels)
pods := promauto.NewGaugeVec(

View File

@ -43,24 +43,27 @@ func main() {
log.Fatalf("Failed to listen on %s: %s", *addr, err)
}
global, err := config.Global(consts.MountPathGlobalConfig)
if err != nil {
log.Fatalf("Failed to load global config: %s", err)
}
trustDomain := ""
if *disableIdentity {
log.Info("Identity is disabled")
} else {
global, err := config.Global(consts.MountPathGlobalConfig)
if err != nil {
log.Fatalf("Failed to load global config: %s", err)
}
trustDomain = global.GetIdentityContext().GetTrustDomain()
}
clusterDomain := global.GetClusterDomain()
server := destination.NewServer(
*addr,
*controllerNamespace,
trustDomain,
*enableH2Upgrade,
k8sAPI,
clusterDomain,
done,
)