Update `GetProfile` clients with policy server updates (#7388)

### What

`GetProfile` clients do not receive destinatin profiles that consider Server protocol fields the way that `Get` clients do. If a Server exists for a `GetProfile` destination that specifies the protocol for that destination is `opaque`, this information is not passed back to the client.

#7184 added this for `Get` by subscribing clients to Endpoint/EndpointSlice updates. When there is an update, or there is a Server update, the endpoints watcher passes this information back to the endpoint translator which handles sending the update back to the client.

For `GetProfile` the situation is different. As with `Get`, we only consider Servers when dealing with Pod IPs, but this only occurs in two situations for `GetProfile`.

1. The destination is a Pod IP and port
2. The destionation is an Instance ID and port

In both of these cases, we need to check if a already Server selects the endpoint and we need to subscribe for Server updates incase one is added or deleted which selects the endpoint.

### How

First we check if there is already a Server which selects the endpoint. This is so that when the first destionation profile is returned, the client knows if the destination is `opaque` or not.

After sending that first update, we then subscribe the client for any future updates which will come from a Server being added or deleted.

This is handled by the new `ServerWatcher` which watches for Server updates on the cluster; when an update occurs it sends that to the `endpointProfileTranslator` which translates the protcol update into a DestinationProfile.

By introducing the `endpointProfileTranslator` which only handles protocol updates, we're able to decouple the endpoint logic from `profileTranslator`—it's `endpoint` field has been removed now that it only handles updates for ServiceProfiles for Services.

### Testing

A unit test has been added and below are some manual testing instructions to see how it interacts with Server updates:

<details>
	<summary>app.yaml</summary>

	```yaml
	apiVersion: v1
	kind: Pod
	metadata:
	  name: pod
	  labels:
		app: pod
	spec:
	  containers:
	  - name: app
		image: nginx
		ports:
		  - name: http
			containerPort: 80
	---
	apiVersion: policy.linkerd.io/v1beta1
	kind: Server
	metadata:
	  name: srv
	  labels:
		policy: srv
	spec:
	  podSelector:
		matchLabels:
		  app: pod
	  port: 80
	  proxyProtocol: opaque
	```
</details>

```shell
$ go run ./controller/cmd/main.go destination
```

```shell
$ linkerd inject app.yaml |kubectl apply -f -
...
$ kubectl get pods -o wide
NAME   READY   STATUS    RESTARTS   AGE   IP           NODE                       NOMINATED NODE   READINESS GATES
pod    2/2     Running   0          53m   10.42.0.34   k3d-k3s-default-server-0   <none>           <none>
$ go run ./controller/script/destination-client/main.go -method getProfile -path 10.42.0.34:80
...
```

You can add/delete `srv` as well as edit its `proxyProtocol` field to observe the correct DestinationProfile updates.

Signed-off-by: Kevin Leimkuhler <kleimkuhler@icloud.com>
This commit is contained in:
Kevin Leimkuhler 2021-12-08 12:26:27 -07:00 committed by GitHub
parent ccf38bd71b
commit 147d85dc70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 408 additions and 80 deletions

View File

@ -0,0 +1,63 @@
package destination
import (
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)
type endpointProfileTranslator struct {
pod *v1.Pod
port uint32
endpoint *pb.WeightedAddr
stream pb.Destination_GetProfileServer
log *logrus.Entry
}
// newEndpointProfileTranslator translates protocol updates to
// DestinationProfiles for endpoints. When a Server on the cluster is updated
// it is possible that it selects an endpoint that is being watched, if that
// is the case then an update will be sent to the client if the Server has
// changed the endpoint's supported protocol—mainly being opaque or not.
func newEndpointProfileTranslator(pod *v1.Pod, port uint32, endpoint *pb.WeightedAddr, stream pb.Destination_GetProfileServer, log *logrus.Entry) *endpointProfileTranslator {
return &endpointProfileTranslator{
pod: pod,
port: port,
endpoint: endpoint,
stream: stream,
log: log,
}
}
func (ept *endpointProfileTranslator) UpdateProtocol(opaqueProtocol bool) {
// The protocol for an endpoint should only be updated if there is a pod,
// endpoint, and the endpoint has a protocol hint. If there is an endpoint
// but it does not have a protocol hint, that means we could not determine
// if it has a peer proxy so a opaque traffic would not be supported.
if ept.pod != nil && ept.endpoint != nil && ept.endpoint.ProtocolHint != nil {
if !opaqueProtocol {
ept.endpoint.ProtocolHint.OpaqueTransport = nil
} else if ept.endpoint.ProtocolHint.OpaqueTransport == nil {
port, err := getInboundPort(&ept.pod.Spec)
if err != nil {
ept.log.Error(err)
} else {
ept.endpoint.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
}
}
}
}
profile := ept.createDefaultProfile(opaqueProtocol)
ept.log.Debugf("sending protocol update: %+v", profile)
ept.stream.Send(profile)
}
func (ept *endpointProfileTranslator) createDefaultProfile(opaqueProtocol bool) *pb.DestinationProfile {
return &pb.DestinationProfile{
RetryBudget: defaultRetryBudget(),
Endpoint: ept.endpoint,
OpaqueProtocol: opaqueProtocol,
}
}

View File

@ -220,11 +220,11 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
err error
)
if address.Pod != nil {
opaquePorts, err = getPodOpaquePorts(address.Pod, et.defaultOpaquePorts)
opaquePorts, err = getAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
if err != nil {
et.log.Errorf("failed to get opaque ports for pod %s/%s: %s", address.Pod.Namespace, address.Pod.Name, err)
}
wa, err = toWeightedAddr(address, opaquePorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.log)
wa, err = createWeightedAddr(address, opaquePorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.log)
} else {
var authOverride *pb.AuthorityOverride
if address.AuthorityOverride != "" {
@ -314,7 +314,7 @@ func toAddr(address watcher.Address) (*net.TcpAddress, error) {
}, nil
}
func toWeightedAddr(address watcher.Address, opaquePorts map[uint32]struct{}, enableH2Upgrade bool, identityTrustDomain string, controllerNS string, log *logging.Entry) (*pb.WeightedAddr, error) {
func createWeightedAddr(address watcher.Address, opaquePorts map[uint32]struct{}, enableH2Upgrade bool, identityTrustDomain string, controllerNS string, log *logging.Entry) (*pb.WeightedAddr, error) {
// When converting an address to a weighted addr, it should be backed by a Pod.
if address.Pod == nil {
return nil, fmt.Errorf("endpoint not backed by Pod: %s:%d", address.IP, address.Port)

View File

@ -22,16 +22,14 @@ type profileTranslator struct {
log *logging.Entry
fullyQualifiedName string
port uint32
endpoint *pb.WeightedAddr
}
func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endpoint *pb.WeightedAddr) *profileTranslator {
func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32) *profileTranslator {
return &profileTranslator{
stream: stream,
log: log.WithField("component", "profile-translator"),
fullyQualifiedName: fqn,
port: port,
endpoint: endpoint,
}
}
@ -40,7 +38,7 @@ func (pt *profileTranslator) Update(profile *sp.ServiceProfile) {
pt.stream.Send(pt.defaultServiceProfile())
return
}
destinationProfile, err := pt.toServiceProfile(profile)
destinationProfile, err := pt.createDestinationProfile(profile)
if err != nil {
pt.log.Error(err)
return
@ -54,7 +52,6 @@ func (pt *profileTranslator) defaultServiceProfile() *pb.DestinationProfile {
Routes: []*pb.Route{},
RetryBudget: defaultRetryBudget(),
FullyQualifiedName: pt.fullyQualifiedName,
Endpoint: pt.endpoint,
}
}
@ -78,9 +75,9 @@ func toDuration(d time.Duration) *duration.Duration {
}
}
// toServiceProfile returns a Proxy API DestinationProfile, given a
// createDestinationProfile returns a Proxy API DestinationProfile, given a
// ServiceProfile.
func (pt *profileTranslator) toServiceProfile(profile *sp.ServiceProfile) (*pb.DestinationProfile, error) {
func (pt *profileTranslator) createDestinationProfile(profile *sp.ServiceProfile) (*pb.DestinationProfile, error) {
routes := make([]*pb.Route, 0)
for _, route := range profile.Spec.Routes {
pbRoute, err := toRoute(profile, route)
@ -108,7 +105,6 @@ func (pt *profileTranslator) toServiceProfile(profile *sp.ServiceProfile) (*pb.D
RetryBudget: budget,
DstOverrides: toDstOverrides(profile.Spec.DstOverrides, pt.port),
FullyQualifiedName: pt.fullyQualifiedName,
Endpoint: pt.endpoint,
OpaqueProtocol: opaqueProtocol,
}, nil
}
@ -256,7 +252,7 @@ func toResponseMatch(rspMatch *sp.ResponseMatch) (*pb.ResponseMatch, error) {
}
if len(matches) == 0 {
return nil, errors.New("A response match must have a field set")
return nil, errors.New("a response match must have a field set")
}
if len(matches) == 1 {
return matches[0], nil
@ -350,7 +346,7 @@ func toRequestMatch(reqMatch *sp.RequestMatch) (*pb.RequestMatch, error) {
}
if len(matches) == 0 {
return nil, errors.New("A request match must have a field set")
return nil, errors.New("a request match must have a field set")
}
if len(matches) == 1 {
return matches[0], nil

View File

@ -10,7 +10,6 @@ import (
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
"github.com/linkerd/linkerd2/controller/k8s"
labels "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/prometheus"
@ -31,6 +30,7 @@ type (
endpoints *watcher.EndpointsWatcher
opaquePorts *watcher.OpaquePortsWatcher
profiles *watcher.ProfileWatcher
servers *watcher.ServerWatcher
nodes coreinformers.NodeInformer
enableH2Upgrade bool
@ -82,12 +82,14 @@ func NewServer(
endpoints := watcher.NewEndpointsWatcher(k8sAPI, log, enableEndpointSlices)
opaquePorts := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts)
profiles := watcher.NewProfileWatcher(k8sAPI, log)
servers := watcher.NewServerWatcher(k8sAPI, log)
srv := server{
pb.UnimplementedDestinationServer{},
endpoints,
opaquePorts,
profiles,
servers,
k8sAPI.Node(),
enableH2Upgrade,
controllerNS,
@ -207,14 +209,35 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
return err
}
// The IP may or may not map to a pod (pod argument can be nil). If
// pod is not nil we will return a single endpoint in the
// DestinationProfile response, otherwise we return a default
// profile response.
err = s.sendEndpointProfile(stream, pod, port)
opaquePorts, err := getAnnotatedOpaquePorts(pod, s.defaultOpaquePorts)
if err != nil {
log.Debugf("Failed to send profile response for endpoint %s:%d: %v", ip.String(), port, err)
return err
return fmt.Errorf("failed to get opaque ports for pod: %s", err)
}
var address watcher.Address
var endpoint *pb.WeightedAddr
if pod != nil {
address, err = s.createAddress(pod, port)
if err != nil {
return fmt.Errorf("failed to create address: %s", err)
}
endpoint, err = s.createEndpoint(address, opaquePorts)
if err != nil {
return fmt.Errorf("failed to create endpoint: %s", err)
}
}
translator := newEndpointProfileTranslator(pod, port, endpoint, stream, s.log)
// If the endpoint's port is annotated as opaque, we don't need to
// subscribe for updates because it will always be opaque
// regardless of any Servers that may select it.
if _, ok := opaquePorts[port]; ok {
translator.UpdateProtocol(true)
} else if pod == nil {
translator.UpdateProtocol(false)
} else {
translator.UpdateProtocol(address.OpaqueProtocol)
s.servers.Subscribe(pod, port, translator)
defer s.servers.Unsubscribe(pod, port, translator)
}
select {
@ -222,7 +245,6 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
case <-stream.Context().Done():
log.Debugf("GetProfile(%+v) cancelled", dest)
}
return nil
}
} else {
@ -239,21 +261,43 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
if hostname != "" {
pod, err := getPodByHostname(s.k8sAPI, hostname, service)
if err != nil {
log.Errorf("Failed to get pod for hostname %s: %v", hostname, err)
log.Errorf("failed to get pod for hostname %s: %v", hostname, err)
}
err = s.sendEndpointProfile(stream, pod, port)
opaquePorts, err := getAnnotatedOpaquePorts(pod, s.defaultOpaquePorts)
if err != nil {
log.Debugf("Failed to send profile response for host %s: %v", hostname, err)
return err
return fmt.Errorf("failed to get opaque ports for pod: %s", err)
}
var address watcher.Address
var endpoint *pb.WeightedAddr
if pod != nil {
address, err = s.createAddress(pod, port)
if err != nil {
return fmt.Errorf("failed to create address: %s", err)
}
endpoint, err = s.createEndpoint(address, opaquePorts)
if err != nil {
return fmt.Errorf("failed to create endpoint: %s", err)
}
}
translator := newEndpointProfileTranslator(pod, port, endpoint, stream, s.log)
// If the endpoint's port is annotated as opaque, we don't need to
// subscribe for updates because it will always be opaque
// regardless of any Servers that may select it.
if _, ok := opaquePorts[port]; ok {
translator.UpdateProtocol(true)
} else if pod == nil {
translator.UpdateProtocol(false)
} else {
translator.UpdateProtocol(address.OpaqueProtocol)
s.servers.Subscribe(pod, port, translator)
defer s.servers.Unsubscribe(pod, port, translator)
}
select {
case <-s.shutdown:
case <-stream.Context().Done():
log.Debugf("GetProfile(%+v) cancelled", dest)
}
return nil
}
@ -263,7 +307,7 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
// We build up the pipeline of profile updaters backwards, starting from
// the translator which takes profile updates, translates them to protobuf
// and pushes them onto the gRPC stream.
translator := newProfileTranslator(stream, log, fqn, port, nil)
translator := newProfileTranslator(stream, log, fqn, port)
// The opaque ports adaptor merges profile updates with service opaque
// port annotation updates; it then publishes the result to the traffic
@ -325,53 +369,33 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
return nil
}
// sendEndpointProfile sends a DestinationProfile response back to the client.
// If the pod argument is provided, the profile sent to the client will
// include an endpoint. Otherwise, the default profile is sent.
func (s *server) sendEndpointProfile(stream pb.Destination_GetProfileServer, pod *corev1.Pod, port uint32) error {
log := s.log
var weightedAddr *pb.WeightedAddr
opaquePorts := make(map[uint32]struct{})
var err error
if pod != nil {
ownerKind, ownerName := s.k8sAPI.GetOwnerKindAndName(context.Background(), pod, true)
address := watcher.Address{
IP: pod.Status.PodIP,
Port: port,
Pod: pod,
OwnerName: ownerName,
OwnerKind: ownerKind,
}
opaquePorts, err = getPodOpaquePorts(pod, s.defaultOpaquePorts)
if err != nil {
log.Errorf("failed to get opaque ports for pod %s/%s: %s", pod.Namespace, pod.Name, err)
}
weightedAddr, err = toWeightedAddr(address, opaquePorts, s.enableH2Upgrade, s.identityTrustDomain, s.controllerNS, s.log)
if err != nil {
return err
}
func (s *server) createAddress(pod *corev1.Pod, port uint32) (watcher.Address, error) {
ownerKind, ownerName := s.k8sAPI.GetOwnerKindAndName(context.Background(), pod, true)
address := watcher.Address{
IP: pod.Status.PodIP,
Port: port,
Pod: pod,
OwnerName: ownerName,
OwnerKind: ownerKind,
}
err := watcher.SetToServerProtocol(s.k8sAPI, &address, port)
if err != nil {
return watcher.Address{}, fmt.Errorf("failed to set address OpaqueProtocol: %s", err)
}
return address, nil
}
// `Get` doesn't include the namespace in the per-endpoint
// metadata, so it needs to be special-cased.
weightedAddr.MetricLabels["namespace"] = pod.Namespace
func (s *server) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) {
weightedAddr, err := createWeightedAddr(address, opaquePorts, s.enableH2Upgrade, s.identityTrustDomain, s.controllerNS, s.log)
if err != nil {
return nil, err
}
// Send the default profile without subscribing for future updates. The
// profile response will also include an endpoint if the IP (or hostname)
// sent in the profile request maps to a pod.
translator := newProfileTranslator(stream, log, "", port, weightedAddr)
// `Get` doesn't include the namespace in the per-endpoint
// metadata, so it needs to be special-cased.
weightedAddr.MetricLabels["namespace"] = address.Pod.Namespace
// If there are opaque ports then update the profile translator
// with a service profile that has those values
if len(opaquePorts) != 0 {
sp := sp.ServiceProfile{}
sp.Spec.OpaquePorts = opaquePorts
translator.Update(&sp)
} else {
translator.Update(nil)
}
return nil
return weightedAddr, err
}
// getSvcID returns the service that corresponds to a Cluster IP address if one
@ -551,7 +575,7 @@ func profileID(authority string, ctxToken contextToken, clusterDomain string) (w
func getHostAndPort(authority string) (string, watcher.Port, error) {
hostPort := strings.Split(authority, ":")
if len(hostPort) > 2 {
return "", 0, fmt.Errorf("Invalid destination %s", authority)
return "", 0, fmt.Errorf("invalid destination %s", authority)
}
host := hostPort[0]
port := 80
@ -559,7 +583,7 @@ func getHostAndPort(authority string) (string, watcher.Port, error) {
var err error
port, err = strconv.Atoi(hostPort[1])
if err != nil || port <= 0 || port > 65535 {
return "", 0, fmt.Errorf("Invalid port %s", hostPort[1])
return "", 0, fmt.Errorf("invalid port %s", hostPort[1])
}
}
return host, watcher.Port(port), nil
@ -616,7 +640,10 @@ func hasSuffix(slice []string, suffix []string) bool {
return true
}
func getPodOpaquePorts(pod *corev1.Pod, defaultPorts map[uint32]struct{}) (map[uint32]struct{}, error) {
func getAnnotatedOpaquePorts(pod *corev1.Pod, defaultPorts map[uint32]struct{}) (map[uint32]struct{}, error) {
if pod == nil {
return defaultPorts, nil
}
annotation, ok := pod.Annotations[labels.ProxyOpaquePortsAnnotation]
if !ok {
return defaultPorts, nil

View File

@ -25,6 +25,7 @@ const podIP1 = "172.17.0.12"
const podIP2 = "172.17.0.13"
const podIPOpaque = "172.17.0.14"
const podIPSkipped = "172.17.0.15"
const podIPPolicy = "172.17.0.16"
const podIPStatefulSet = "172.17.13.15"
const port uint32 = 8989
const opaquePort uint32 = 4242
@ -313,12 +314,52 @@ status:
podIP: 172.17.13.15`,
}
policyResources := []string{
`
apiVersion: v1
kind: Pod
metadata:
labels:
linkerd.io/control-plane-ns: linkerd
app: policy-test
name: pod-policyResources
namespace: ns
status:
phase: Running
podIP: 172.17.0.16
spec:
containers:
- name: linkerd-proxy
env:
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
value: 0.0.0.0:4143
- name: app
image: nginx
ports:
- containerPort: 80
name: http
protocol: TCP`,
`
apiVersion: policy.linkerd.io/v1beta1
kind: Server
metadata:
name: srv
namespace: ns
spec:
podSelector:
matchLabels:
app: policy-test
port: 80
proxyProtocol: opaque`,
}
res := append(meshedPodResources, clientSP...)
res = append(res, unmeshedPod)
res = append(res, meshedOpaquePodResources...)
res = append(res, meshedOpaqueServiceResources...)
res = append(res, meshedSkippedPodResource...)
res = append(res, meshedStatefulSetPodResource...)
res = append(res, policyResources...)
k8sAPI, err := k8s.NewFakeAPI(res...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
@ -341,6 +382,7 @@ status:
endpoints := watcher.NewEndpointsWatcher(k8sAPI, log, false)
opaquePorts := watcher.NewOpaquePortsWatcher(k8sAPI, log, defaultOpaquePorts)
profiles := watcher.NewProfileWatcher(k8sAPI, log)
servers := watcher.NewServerWatcher(k8sAPI, log)
// Sync after creating watchers so that the the indexers added get updated
// properly
@ -351,6 +393,7 @@ status:
endpoints,
opaquePorts,
profiles,
servers,
k8sAPI.Node(),
true,
"linkerd",
@ -915,7 +958,7 @@ func TestGetProfiles(t *testing.T) {
if first.Endpoint.ProtocolHint == nil {
t.Fatalf("Expected protocol hint but found none")
}
if first.Endpoint.ProtocolHint.GetOpaqueTransport().InboundPort != 4143 {
if first.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 {
t.Fatalf("Expected pod to support opaque traffic on port 4143")
}
if first.Endpoint.Addr.String() != epAddr.String() {
@ -986,6 +1029,46 @@ func TestGetProfiles(t *testing.T) {
t.Fatalf("Expected TLS identity for %s to be nil but got %+v", path, addr.TlsIdentity)
}
})
t.Run("Return profile with opaque protocol when using Pod IP selected by a Server", func(t *testing.T) {
server := makeServer(t)
stream := &bufferingGetProfileStream{
updates: []*pb.DestinationProfile{},
MockServerStream: util.NewMockServerStream(),
}
stream.Cancel()
_, err := toAddress(podIPPolicy, 80)
if err != nil {
t.Fatalf("Got error: %s", err)
}
err = server.GetProfile(&pb.GetDestination{
Scheme: "k8s",
Path: fmt.Sprintf("%s:%d", podIPPolicy, 80),
}, stream)
if err != nil {
t.Fatalf("Got error: %s", err)
}
// Test that the first update has a destination profile with an
// opaque protocol and opaque transport.
if len(stream.updates) == 0 {
t.Fatalf("Expected at least 1 update but got 0")
}
update := stream.updates[0]
if update.Endpoint == nil {
t.Fatalf("Expected response to have endpoint field")
}
if !update.OpaqueProtocol {
t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 80)
}
if update.Endpoint.ProtocolHint == nil {
t.Fatalf("Expected protocol hint but found none")
}
if update.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 {
t.Fatalf("Expected pod to support opaque traffic on port 4143")
}
})
}
func TestTokenStructure(t *testing.T) {

View File

@ -25,7 +25,7 @@ func (c *MockAPIClient) Get(ctx context.Context, in *destinationPb.GetDestinatio
// GetProfile provides a mock of a destination API method
func (c *MockAPIClient) GetProfile(ctx context.Context, _ *destinationPb.GetDestination, _ ...grpc.CallOption) (destinationPb.Destination_GetProfileClient, error) {
// Not implemented through this client. The proxies use the gRPC server directly instead.
return nil, errors.New("Not implemented")
return nil, errors.New("not implemented")
}
// MockDestinationGetClient satisfies the Destination_GetClient gRPC interface.

View File

@ -416,6 +416,8 @@ func (ew *EndpointsWatcher) getServicePublisher(id ServiceID) (sp *servicePublis
}
func (ew *EndpointsWatcher) addServer(obj interface{}) {
ew.Lock()
defer ew.Unlock()
server := obj.(*v1beta1.Server)
for _, sp := range ew.publishers {
sp.updateServer(server, true)
@ -423,6 +425,8 @@ func (ew *EndpointsWatcher) addServer(obj interface{}) {
}
func (ew *EndpointsWatcher) deleteServer(obj interface{}) {
ew.Lock()
defer ew.Unlock()
server := obj.(*v1beta1.Server)
for _, sp := range ew.publishers {
sp.updateServer(server, false)
@ -770,7 +774,7 @@ func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) A
pp.log.Errorf("Unable to create new address:%v", err)
continue
}
err = setToServerProtocol(pp.k8sAPI, &address, resolvedPort)
err = SetToServerProtocol(pp.k8sAPI, &address, resolvedPort)
if err != nil {
pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
continue
@ -823,7 +827,7 @@ func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) Addre
pp.log.Errorf("Unable to create new address:%v", err)
continue
}
err = setToServerProtocol(pp.k8sAPI, &address, resolvedPort)
err = SetToServerProtocol(pp.k8sAPI, &address, resolvedPort)
if err != nil {
pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
continue
@ -1142,7 +1146,9 @@ func isValidSlice(es *discovery.EndpointSlice) bool {
return true
}
func setToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error {
// SetToServerProtocol sets the address's OpaqueProtocol field based off any
// Servers that select it and override the expected protocol.
func SetToServerProtocol(k8sAPI *k8s.API, address *Address, port Port) error {
servers, err := k8sAPI.Srv().Lister().Servers("").List(labels.Everything())
if err != nil {
return fmt.Errorf("failed to list Servers: %s", err)

View File

@ -0,0 +1,151 @@
package watcher
import (
"sync"
"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
"github.com/linkerd/linkerd2/controller/k8s"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/cache"
)
// ServerWatcher watches all the servers in the cluster. When there is an
// update, it only sends updates to listeners if their endpoint's protocol
// is changed by the Server.
type ServerWatcher struct {
subscriptions map[podPort][]ServerUpdateListener
k8sAPI *k8s.API
log *logging.Entry
sync.RWMutex
}
type podPort struct {
pod *corev1.Pod
port Port
}
// ServerUpdateListener is the interface that subscribers must implement.
type ServerUpdateListener interface {
// UpdateProtocol takes a bool which is set to true if the endpoint is
// opaque and false otherwise. This value is used to send a
// DestinationProfile update to listeners for that endpoint.
UpdateProtocol(bool)
}
// NewServerWatcher creates a new ServerWatcher.
func NewServerWatcher(k8sAPI *k8s.API, log *logging.Entry) *ServerWatcher {
sw := &ServerWatcher{
subscriptions: make(map[podPort][]ServerUpdateListener),
k8sAPI: k8sAPI,
log: log,
}
k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sw.addServer,
DeleteFunc: sw.deleteServer,
UpdateFunc: func(_, obj interface{}) { sw.addServer(obj) },
})
return sw
}
// Subscribe subscribes a listener for any Server updates that may select the
// endpoint and change its expected protocol.
func (sw *ServerWatcher) Subscribe(pod *corev1.Pod, port Port, listener ServerUpdateListener) {
sw.Lock()
defer sw.Unlock()
pp := podPort{
pod: pod,
port: port,
}
listeners, ok := sw.subscriptions[pp]
if !ok {
sw.subscriptions[pp] = []ServerUpdateListener{listener}
return
}
listeners = append(listeners, listener)
sw.subscriptions[pp] = listeners
}
// Unsubscribe unsubcribes a listener from any Server updates.
func (sw *ServerWatcher) Unsubscribe(pod *corev1.Pod, port Port, listener ServerUpdateListener) {
sw.Lock()
defer sw.Unlock()
pp := podPort{
pod: pod,
port: port,
}
listeners, ok := sw.subscriptions[pp]
if !ok {
sw.log.Errorf("cannot unsubscribe from unknown Pod: %s/%s:%d", pod.Namespace, pod.Name, port)
return
}
for i, l := range listeners {
if l == listener {
n := len(listeners)
listeners[i] = listeners[n-1]
listeners[n-1] = nil
listeners = listeners[:n-1]
}
}
sw.subscriptions[pp] = listeners
}
func (sw *ServerWatcher) addServer(obj interface{}) {
server := obj.(*v1beta1.Server)
selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
if err != nil {
sw.log.Errorf("failed to create Selector: %s", err)
return
}
sw.updateServer(server, selector, true)
}
func (sw *ServerWatcher) deleteServer(obj interface{}) {
server := obj.(*v1beta1.Server)
selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
if err != nil {
sw.log.Errorf("failed to create Selector: %s", err)
return
}
sw.updateServer(server, selector, false)
}
func (sw *ServerWatcher) updateServer(server *v1beta1.Server, selector labels.Selector, isAdd bool) {
sw.Lock()
defer sw.Unlock()
for pp, listeners := range sw.subscriptions {
if selector.Matches(labels.Set(pp.pod.Labels)) {
var portMatch bool
switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(pp.port) {
portMatch = true
}
case intstr.String:
for _, c := range pp.pod.Spec.Containers {
for _, p := range c.Ports {
if p.ContainerPort == int32(pp.port) && p.Name == server.Spec.Port.StrVal {
portMatch = true
}
}
}
default:
continue
}
if portMatch {
var isOpaque bool
if isAdd && server.Spec.ProxyProtocol == opaqueProtocol {
isOpaque = true
} else {
isOpaque = false
}
for _, listener := range listeners {
listener.UpdateProtocol(isOpaque)
}
}
}
}
}

View File

@ -87,6 +87,8 @@ func NewFakeClientSets(configs ...string) (
discoveryObjs = append(discoveryObjs, obj)
case ServiceProfile:
spObjs = append(spObjs, obj)
case Server:
spObjs = append(spObjs, obj)
default:
objs = append(objs, obj)
}