Return default opaque ports in the destination service (#5814)

This changes the destination service to always use a default set of opaque ports
for pods and services. This is so that after Linkerd is installed onto a
cluster, users can benefit from common opaque ports without having to annotate
the workloads that serve the applications.

After #5810 merges, the proxy containers will be have the default opaque ports
`25,443,587,3306,5432,11211`. This value on the proxy container does not affect
traffic though; it only configures the proxy.

In order for clients and servers to detect opaque protocols and determine opaque
transports, the pods and services need to have these annotations.

The ports `25,443,587,3306,5432,11211` are now handled opaquely when a pod or
service does not have the opaque ports annotation. If the annotation is present
with a different value, this is used instead of the default. If the annotation
is present but is an empty string, there are no opaque ports for the workload.

Signed-off-by: Kevin Leimkuhler <kevin@kleimkuhler.com>
This commit is contained in:
Kevin Leimkuhler 2021-02-24 14:55:31 -05:00 committed by GitHub
parent 5bd5db6524
commit 51a965e228
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 103 additions and 32 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/pkg/addr"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/opaqueports"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
@ -213,10 +214,15 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
err error
)
if address.Pod != nil {
opaquePorts, getErr := getPodOpaquePortsAnnotations(address.Pod)
opaquePorts, ok, getErr := getPodOpaquePortsAnnotations(address.Pod)
if getErr != nil {
et.log.Errorf("failed getting opaque ports annotation for pod: %s", getErr)
}
// If the opaque ports annotation was not set, then set the
// endpoint's opaque ports to the default value.
if !ok {
opaquePorts = opaqueports.DefaultOpaquePorts
}
wa, err = toWeightedAddr(address, opaquePorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.log)
} else {
var authOverride *pb.AuthorityOverride

View File

@ -36,8 +36,6 @@ func (opa *opaquePortsAdaptor) publish() {
if opa.profile != nil {
merged = *opa.profile
}
if len(opa.opaquePorts) != 0 {
merged.Spec.OpaquePorts = opa.opaquePorts
}
merged.Spec.OpaquePorts = opa.opaquePorts
opa.listener.Update(&merged)
}

View File

@ -12,6 +12,7 @@ import (
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
"github.com/linkerd/linkerd2/controller/k8s"
labels "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/opaqueports"
"github.com/linkerd/linkerd2/pkg/prometheus"
"github.com/linkerd/linkerd2/pkg/util"
logging "github.com/sirupsen/logrus"
@ -203,7 +204,6 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
if err != nil {
return err
}
var endpoint *pb.WeightedAddr
opaquePorts := make(map[uint32]struct{})
if pod != nil {
@ -217,10 +217,16 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr
if err != nil {
log.Errorf("failed to set opaque port annotation on pod: %s", err)
}
opaquePorts, err = getPodOpaquePortsAnnotations(pod)
var ok bool
opaquePorts, ok, err = getPodOpaquePortsAnnotations(pod)
if err != nil {
log.Errorf("failed to get opaque ports annotation for pod: %s", err)
}
// If the opaque ports annotation was not set, then set the
// endpoint's opaque ports to the default value.
if !ok {
opaquePorts = opaqueports.DefaultOpaquePorts
}
endpoint, err = toWeightedAddr(podSet.Addresses[podID], opaquePorts, s.enableH2Upgrade, s.identityTrustDomain, s.controllerNS, log)
if err != nil {
return err
@ -455,17 +461,20 @@ func hasSuffix(slice []string, suffix []string) bool {
return true
}
func getPodOpaquePortsAnnotations(pod *corev1.Pod) (map[uint32]struct{}, error) {
func getPodOpaquePortsAnnotations(pod *corev1.Pod) (map[uint32]struct{}, bool, error) {
annotation, ok := pod.Annotations[labels.ProxyOpaquePortsAnnotation]
if !ok {
return nil, false, nil
}
opaquePorts := make(map[uint32]struct{})
annotation := pod.Annotations[labels.ProxyOpaquePortsAnnotation]
if annotation != "" {
for _, portStr := range util.ParseContainerOpaquePorts(annotation, pod.Spec.Containers) {
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return nil, err
return nil, true, err
}
opaquePorts[uint32(port)] = struct{}{}
}
}
return opaquePorts, nil
return opaquePorts, true, nil
}

View File

@ -7,6 +7,7 @@ import (
"github.com/linkerd/linkerd2-proxy-init/ports"
"github.com/linkerd/linkerd2/controller/k8s"
labels "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/opaqueports"
"github.com/linkerd/linkerd2/pkg/util"
log "github.com/sirupsen/logrus"
logging "github.com/sirupsen/logrus"
@ -68,7 +69,7 @@ func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdat
// and no opaque ports
if !ok {
opw.subscriptions[id] = &svcSubscriptions{
opaquePorts: make(map[uint32]struct{}),
opaquePorts: opaqueports.DefaultOpaquePorts,
listeners: []OpaquePortsUpdateListener{listener},
}
return nil
@ -77,9 +78,7 @@ func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdat
// service listeners. If there are opaque ports for the service, update
// the listener with that value.
ss.listeners = append(ss.listeners, listener)
if len(ss.opaquePorts) != 0 {
listener.UpdateService(ss.opaquePorts)
}
listener.UpdateService(ss.opaquePorts)
return nil
}
@ -114,11 +113,16 @@ func (opw *OpaquePortsWatcher) addService(obj interface{}) {
Namespace: svc.Namespace,
Name: svc.Name,
}
opaquePorts, err := getServiceOpaquePortsAnnotation(svc)
opaquePorts, ok, err := getServiceOpaquePortsAnnotation(svc)
if err != nil {
opw.log.Errorf("failed to get %s service opaque ports annotation: %s", id, err)
return
}
// If the opaque ports annotation was not set, then set the service's
// opaque ports to the default value.
if !ok {
opaquePorts = opaqueports.DefaultOpaquePorts
}
ss, ok := opw.subscriptions[id]
// If there are no subscriptions for this service, create one with the
// opaque ports.
@ -168,9 +172,9 @@ func (opw *OpaquePortsWatcher) deleteService(obj interface{}) {
return
}
old := ss.opaquePorts
ss.opaquePorts = make(map[uint32]struct{})
// Do not send an update if the service already had no opaque ports
if len(old) == 0 {
ss.opaquePorts = opaqueports.DefaultOpaquePorts
// Do not send an update if the service already had the default opaque ports
if portsEqual(old, ss.opaquePorts) {
return
}
for _, listener := range ss.listeners {
@ -178,19 +182,22 @@ func (opw *OpaquePortsWatcher) deleteService(obj interface{}) {
}
}
func getServiceOpaquePortsAnnotation(svc *corev1.Service) (map[uint32]struct{}, error) {
func getServiceOpaquePortsAnnotation(svc *corev1.Service) (map[uint32]struct{}, bool, error) {
annotation, ok := svc.Annotations[labels.ProxyOpaquePortsAnnotation]
if !ok {
return nil, false, nil
}
opaquePorts := make(map[uint32]struct{})
annotation := svc.Annotations[labels.ProxyOpaquePortsAnnotation]
if annotation != "" {
for _, portStr := range parseServiceOpaquePorts(annotation, svc.Spec.Ports) {
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return nil, err
return nil, true, err
}
opaquePorts[uint32(port)] = struct{}{}
}
}
return opaquePorts, nil
return opaquePorts, true, nil
}
func parseServiceOpaquePorts(annotation string, sps []corev1.ServicePort) []string {

View File

@ -53,6 +53,24 @@ metadata:
Ports: []corev1.ServicePort{{Port: 3306}},
},
}
explicitlyNotOpaqueService = `
apiVersion: v1
kind: Service
metadata:
name: svc
namespace: ns
annotations:
config.linkerd.io/opaque-ports: ""`
explicitlyNotOpaqueServiceObject = corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "svc",
Namespace: "ns",
Annotations: map[string]string{"config.linkerd.io/opaque-ports": ""},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{Port: 3306}},
},
}
)
type testOpaquePortsListener struct {
@ -87,9 +105,11 @@ func TestOpaquePortsWatcher(t *testing.T) {
Name: "svc",
Namespace: "ns",
},
// Adding and removing services that do not have the opaque ports
// annotation should not result in any updates being sent.
expectedOpaquePorts: []map[uint32]struct{}{},
// 1. default opaque ports
// 2. svc updated: no update
// 3. svc deleted: no update
// 4. svc created: ?
expectedOpaquePorts: []map[uint32]struct{}{{11211: {}, 25: {}, 3306: {}, 443: {}, 5432: {}, 587: {}}},
},
{
name: "namespace with opaque service",
@ -102,9 +122,9 @@ func TestOpaquePortsWatcher(t *testing.T) {
},
// 1: svc annotation 3306
// 2: svc updated: no update
// 2: svc deleted: update with no ports
// 2: svc deleted: update with default ports
// 3. svc created: update with port 3306
expectedOpaquePorts: []map[uint32]struct{}{{3306: {}}, {}, {3306: {}}},
expectedOpaquePorts: []map[uint32]struct{}{{3306: {}}, {11211: {}, 25: {}, 3306: {}, 443: {}, 5432: {}, 587: {}}, {3306: {}}},
},
{
name: "namespace and service, create opaque service",
@ -115,11 +135,11 @@ func TestOpaquePortsWatcher(t *testing.T) {
Name: "svc",
Namespace: "ns",
},
// 1: no svc annotation
// 1: default opaque ports
// 2: svc updated: update with port 3306
// 3: svc deleted: update with no ports
// 3: svc deleted: update with default ports
// 4. svc created: update with port 3306
expectedOpaquePorts: []map[uint32]struct{}{{3306: {}}, {}, {3306: {}}},
expectedOpaquePorts: []map[uint32]struct{}{{11211: {}, 25: {}, 3306: {}, 443: {}, 5432: {}, 587: {}}, {3306: {}}, {11211: {}, 25: {}, 3306: {}, 443: {}, 5432: {}, 587: {}}, {3306: {}}},
},
{
name: "namespace and opaque service, create base service",
@ -131,10 +151,25 @@ func TestOpaquePortsWatcher(t *testing.T) {
Namespace: "ns",
},
// 1: svc annotation 3306
// 2. svc updated: update with no ports
// 2. svc updated: update with default ports
// 3. svc deleted: no update
// 4. svc added: no update
expectedOpaquePorts: []map[uint32]struct{}{{3306: {}}, {}},
expectedOpaquePorts: []map[uint32]struct{}{{3306: {}}, {11211: {}, 25: {}, 3306: {}, 443: {}, 5432: {}, 587: {}}},
},
{
name: "namespace and explicitly not opaque service, create explicitly not opaque service",
initialState: []string{testNS, explicitlyNotOpaqueService},
nsObject: &testNSObject,
svcObject: &explicitlyNotOpaqueServiceObject,
service: ServiceID{
Name: "svc",
Namespace: "ns",
},
// 1: svc annotation empty
// 2. svc updated: no update
// 3. svc deleted: update with default ports
// 4. svc added: update with no ports
expectedOpaquePorts: []map[uint32]struct{}{{}, {11211: {}, 25: {}, 3306: {}, 443: {}, 5432: {}, 587: {}}, {}},
},
} {
k8sAPI, err := k8s.NewFakeAPI(tt.initialState...)

View File

@ -0,0 +1,16 @@
package opaqueports
// DefaultOpaquePorts is the default list of opaque ports that the destination
// server will use to determine whether a destination is an opaque protocol.
// When a pod or service already has its own annotation, that value will have
// priority of this.
//
// Note: Keep in sync with proxy.opaquePorts in values.yaml
var DefaultOpaquePorts = map[uint32]struct{}{
25: {},
443: {},
587: {},
3306: {},
5432: {},
11211: {},
}