linkerd2/controller/api/destination/opaque_ports_adaptor.go

109 lines
3.1 KiB
Go

package destination
import (
"strconv"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
"github.com/linkerd/linkerd2/controller/k8s"
pkgk8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/util"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
)
// opaquePortsAdaptor implements EndpointUpdateListener so that it can watch
// endpoints for services. When endpoints change, it can check for the opaque
// ports annotation on the pods and update the list of opaque ports.
//
// opaquePortsAdaptor also implements ProfileUpdateListener so that it can
// receive updates from trafficSplitAdaptor and merge the service profile by
// adding the list of opaque ports.
//
// When either of these implemented interfaces has an update,
// opaquePortsAdaptor publishes the new service profile to the
// profileTranslator.
type opaquePortsAdaptor struct {
listener watcher.ProfileUpdateListener
k8sAPI *k8s.API
log *logging.Entry
profile *sp.ServiceProfile
opaquePorts map[uint32]struct{}
}
func newOpaquePortsAdaptor(listener watcher.ProfileUpdateListener, k8sAPI *k8s.API, log *logging.Entry) *opaquePortsAdaptor {
return &opaquePortsAdaptor{
listener: listener,
k8sAPI: k8sAPI,
log: log,
opaquePorts: make(map[uint32]struct{}),
}
}
func (opa *opaquePortsAdaptor) Add(set watcher.AddressSet) {
ports := opa.getOpaquePorts(set)
for port := range ports {
opa.opaquePorts[port] = struct{}{}
}
opa.publish()
}
func (opa *opaquePortsAdaptor) Remove(set watcher.AddressSet) {
ports := opa.getOpaquePorts(set)
for port := range ports {
delete(opa.opaquePorts, port)
}
opa.publish()
}
func (opa *opaquePortsAdaptor) NoEndpoints(exists bool) {
opa.opaquePorts = make(map[uint32]struct{})
opa.publish()
}
func (opa *opaquePortsAdaptor) Update(profile *sp.ServiceProfile) {
opa.profile = profile
opa.publish()
}
func (opa *opaquePortsAdaptor) getOpaquePorts(set watcher.AddressSet) map[uint32]struct{} {
ports := make(map[uint32]struct{})
for _, address := range set.Addresses {
pod := address.Pod
if pod != nil {
override, err := getOpaquePortsAnnotations(pod)
if err != nil {
opa.log.Errorf("Failed to get opaque ports annotation for pod %s: %s", pod, err)
}
for port := range override {
ports[port] = struct{}{}
}
}
}
return ports
}
func (opa *opaquePortsAdaptor) publish() {
merged := sp.ServiceProfile{}
if opa.profile != nil {
merged = *opa.profile
}
merged.Spec.OpaquePorts = opa.opaquePorts
opa.listener.Update(&merged)
}
func getOpaquePortsAnnotations(pod *corev1.Pod) (map[uint32]struct{}, error) {
opaquePorts := make(map[uint32]struct{})
annotation := pod.Annotations[pkgk8s.ProxyOpaquePortsAnnotation]
if annotation != "" {
for _, portStr := range util.ParseOpaquePorts(annotation, pod.Spec.Containers) {
port, err := strconv.ParseUint(portStr, 10, 32)
if err != nil {
return nil, err
}
opaquePorts[uint32(port)] = struct{}{}
}
}
return opaquePorts, nil
}