mirror of https://github.com/linkerd/linkerd2.git
Add flag to enable namespace creation in the service mirror controller (#13137)
When the service mirror controller attempts to mirror a remote service in a namespace that does not exist in the local cluster, it skips mirroring that service since there is no local namespace to put the service in. We make this behavior configurable by adding a link value called `enableNamespaceCreation`. When set to true, the service mirror controller will create namespaces as necessary to mirror services if those namespaces don't already exist locally. When set to false (which is the default), the current behavior is preserved where mirroring of the service will be skipped if the local namespace does not already exist. Namespace creation can be enabled as so: ``` linkerd --context east multicluster link --cluster-name=east --set enableNamespaceCreation=true | kubectl --context=west apply -f - ``` Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
parent
8bbf22856c
commit
72ff2f787f
|
@ -29,6 +29,7 @@ Kubernetes: `>=1.22.0-0`
|
|||
| controllerImage | string | `"cr.l5d.io/linkerd/controller"` | Docker image for the Service mirror component (uses the Linkerd controller image) |
|
||||
| controllerImageVersion | string | `"linkerdVersionValue"` | Tag for the Service Mirror container Docker image |
|
||||
| enableHeadlessServices | bool | `false` | Toggle support for mirroring headless services |
|
||||
| enableNamespaceCreation | bool | `false` | Toggle support for creating namespaces for mirror services when necessary |
|
||||
| enablePSP | bool | `false` | Create RoleBindings to associate ServiceAccount of target cluster Service Mirror to the control plane PSP resource. This requires that `enabledPSP` is set to true on the extension and control plane install. Note PSP has been deprecated since k8s v1.21 |
|
||||
| enablePodAntiAffinity | bool | `false` | Enables Pod Anti Affinity logic to balance the placement of replicas across hosts and zones for High Availability. Enable this only when you have multiple replicas of components. |
|
||||
| gateway.enabled | bool | `true` | Controls whether link will create a probe service for the gateway |
|
||||
|
|
|
@ -14,6 +14,11 @@ rules:
|
|||
- apiGroups: [""]
|
||||
resources: ["namespaces"]
|
||||
verbs: ["list", "get", "watch"]
|
||||
{{- if .Values.enableNamespaceCreation }}
|
||||
- apiGroups: [""]
|
||||
resources: ["namespaces"]
|
||||
verbs: ["create"]
|
||||
{{- end}}
|
||||
---
|
||||
kind: ClusterRoleBinding
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
|
@ -138,6 +143,9 @@ spec:
|
|||
{{- if .Values.enableHeadlessServices }}
|
||||
- -enable-headless-services
|
||||
{{- end }}
|
||||
{{- if .Values.enableNamespaceCreation }}
|
||||
- -enable-namespace-creation
|
||||
{{- end }}
|
||||
- -enable-pprof={{.Values.enablePprof | default false}}
|
||||
- {{.Values.targetClusterName}}
|
||||
{{- if or .Values.serviceMirrorAdditionalEnv .Values.serviceMirrorExperimentalEnv }}
|
||||
|
|
|
@ -14,6 +14,8 @@ podLabels: {}
|
|||
commonLabels: {}
|
||||
# -- Toggle support for mirroring headless services
|
||||
enableHeadlessServices: false
|
||||
# -- Toggle support for creating namespaces for mirror services when necessary
|
||||
enableNamespaceCreation: false
|
||||
# -- Enables Pod Anti Affinity logic to balance the placement of replicas
|
||||
# across hosts and zones for High Availability.
|
||||
# Enable this only when you have multiple replicas of components.
|
||||
|
|
|
@ -52,6 +52,7 @@ func Main(args []string) {
|
|||
namespace := cmd.String("namespace", "", "namespace containing Link and credentials Secret")
|
||||
repairPeriod := cmd.Duration("endpoint-refresh-period", 1*time.Minute, "frequency to refresh endpoint resolution")
|
||||
enableHeadlessSvc := cmd.Bool("enable-headless-services", false, "toggle support for headless service mirroring")
|
||||
enableNamespaceCreation := cmd.Bool("enable-namespace-creation", false, "toggle support for namespace creation")
|
||||
enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server")
|
||||
|
||||
flags.ConfigureAndParse(cmd, args)
|
||||
|
@ -152,7 +153,7 @@ func Main(args []string) {
|
|||
if err != nil {
|
||||
log.Errorf("Failed to load remote cluster credentials: %s", err)
|
||||
}
|
||||
err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc)
|
||||
err = restartClusterWatcher(ctx, link, *namespace, creds, controllerK8sAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc, *enableNamespaceCreation)
|
||||
if err != nil {
|
||||
// failed to restart cluster watcher; give a bit of slack
|
||||
// and restart the link watch to give it another try
|
||||
|
@ -280,6 +281,7 @@ func restartClusterWatcher(
|
|||
repairPeriod time.Duration,
|
||||
metrics servicemirror.ProbeMetricVecs,
|
||||
enableHeadlessSvc bool,
|
||||
enableNamespaceCreation bool,
|
||||
) error {
|
||||
|
||||
cleanupWorkers()
|
||||
|
@ -313,6 +315,7 @@ func restartClusterWatcher(
|
|||
repairPeriod,
|
||||
ch,
|
||||
enableHeadlessSvc,
|
||||
enableNamespaceCreation,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create cluster watcher: %w", err)
|
||||
|
|
|
@ -39,20 +39,21 @@ type (
|
|||
// it can be requeued up to N times, to ensure that the failure is not due to some temporary network
|
||||
// problems or general glitch in the Matrix.
|
||||
RemoteClusterServiceWatcher struct {
|
||||
serviceMirrorNamespace string
|
||||
link *multicluster.Link
|
||||
remoteAPIClient *k8s.API
|
||||
localAPIClient *k8s.API
|
||||
stopper chan struct{}
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
recorder record.EventRecorder
|
||||
log *logging.Entry
|
||||
eventsQueue workqueue.TypedRateLimitingInterface[any]
|
||||
requeueLimit int
|
||||
repairPeriod time.Duration
|
||||
gatewayAlive bool
|
||||
liveness chan bool
|
||||
headlessServicesEnabled bool
|
||||
serviceMirrorNamespace string
|
||||
link *multicluster.Link
|
||||
remoteAPIClient *k8s.API
|
||||
localAPIClient *k8s.API
|
||||
stopper chan struct{}
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
recorder record.EventRecorder
|
||||
log *logging.Entry
|
||||
eventsQueue workqueue.TypedRateLimitingInterface[any]
|
||||
requeueLimit int
|
||||
repairPeriod time.Duration
|
||||
gatewayAlive bool
|
||||
liveness chan bool
|
||||
headlessServicesEnabled bool
|
||||
namespaceCreationEnabled bool
|
||||
|
||||
informerHandlers
|
||||
}
|
||||
|
@ -168,6 +169,7 @@ func NewRemoteClusterServiceWatcher(
|
|||
repairPeriod time.Duration,
|
||||
liveness chan bool,
|
||||
enableHeadlessSvc bool,
|
||||
enableNamespaceCreation bool,
|
||||
) (*RemoteClusterServiceWatcher, error) {
|
||||
remoteAPI, err := k8s.InitializeAPIForConfig(ctx, cfg, false, clusterName, k8s.Svc, k8s.Endpoint)
|
||||
if err != nil {
|
||||
|
@ -201,11 +203,12 @@ func NewRemoteClusterServiceWatcher(
|
|||
"cluster": clusterName,
|
||||
"apiAddress": cfg.Host,
|
||||
}),
|
||||
eventsQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()),
|
||||
requeueLimit: requeueLimit,
|
||||
repairPeriod: repairPeriod,
|
||||
liveness: liveness,
|
||||
headlessServicesEnabled: enableHeadlessSvc,
|
||||
eventsQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()),
|
||||
requeueLimit: requeueLimit,
|
||||
repairPeriod: repairPeriod,
|
||||
liveness: liveness,
|
||||
headlessServicesEnabled: enableHeadlessSvc,
|
||||
namespaceCreationEnabled: enableNamespaceCreation,
|
||||
// always instantiate the gatewayAlive=true to prevent unexpected service fail fast
|
||||
gatewayAlive: true,
|
||||
}, nil
|
||||
|
@ -275,6 +278,35 @@ func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceAnnotations(remoteSer
|
|||
return annotations
|
||||
}
|
||||
|
||||
func (rcsw *RemoteClusterServiceWatcher) mirrorNamespaceIfNecessary(ctx context.Context, namespace string) error {
|
||||
// if the namespace is already present we do not need to change it.
|
||||
// if we are creating it we want to put a label indicating this is a
|
||||
// mirrored resource
|
||||
if _, err := rcsw.localAPIClient.NS().Lister().Get(namespace); err != nil {
|
||||
if kerrors.IsNotFound(err) {
|
||||
// if the namespace is not found, we can just create it
|
||||
ns := &corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
consts.MirroredResourceLabel: "true",
|
||||
consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
|
||||
},
|
||||
Name: namespace,
|
||||
},
|
||||
}
|
||||
_, err := rcsw.localAPIClient.Client.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
// something went wrong with the create, we can just retry as well
|
||||
return RetryableError{[]error{err}}
|
||||
}
|
||||
} else {
|
||||
// something else went wrong, so we can just retry
|
||||
return RetryableError{[]error{err}}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// This method takes care of port remapping. What it does essentially is get the one gateway port
|
||||
// that we should send traffic to and create endpoint ports that bind to the mirrored service ports
|
||||
// (same name, etc) but send traffic to the gateway port. This way we do not need to do any remapping
|
||||
|
@ -528,15 +560,21 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context.
|
|||
serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name)
|
||||
localServiceName := rcsw.mirroredResourceName(remoteService.Name)
|
||||
|
||||
// Ensure the namespace exists, and skip mirroring if it doesn't
|
||||
if _, err := rcsw.localAPIClient.Client.CoreV1().Namespaces().Get(ctx, remoteService.Namespace, metav1.GetOptions{}); err != nil {
|
||||
if kerrors.IsNotFound(err) {
|
||||
rcsw.recorder.Event(remoteService, corev1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: namespace does not exist")
|
||||
rcsw.log.Warnf("Skipping mirroring of service %s: namespace %s does not exist", serviceInfo, remoteService.Namespace)
|
||||
return nil
|
||||
if rcsw.namespaceCreationEnabled {
|
||||
if err := rcsw.mirrorNamespaceIfNecessary(ctx, remoteService.Namespace); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Ensure the namespace exists, and skip mirroring if it doesn't
|
||||
if _, err := rcsw.localAPIClient.Client.CoreV1().Namespaces().Get(ctx, remoteService.Namespace, metav1.GetOptions{}); err != nil {
|
||||
if kerrors.IsNotFound(err) {
|
||||
rcsw.recorder.Event(remoteService, corev1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: namespace does not exist")
|
||||
rcsw.log.Warnf("Skipping mirroring of service %s: namespace %s does not exist", serviceInfo, remoteService.Namespace)
|
||||
return nil
|
||||
}
|
||||
// something else went wrong, so we can just retry
|
||||
return RetryableError{[]error{err}}
|
||||
}
|
||||
// something else went wrong, so we can just retry
|
||||
return RetryableError{[]error{err}}
|
||||
}
|
||||
|
||||
serviceToCreate := &corev1.Service{
|
||||
|
|
|
@ -199,14 +199,20 @@ func (rcsw *RemoteClusterServiceWatcher) createRemoteHeadlessService(ctx context
|
|||
serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name)
|
||||
localServiceName := rcsw.mirroredResourceName(remoteService.Name)
|
||||
|
||||
// Ensure the namespace exists, and skip mirroring if it doesn't
|
||||
if _, err := rcsw.localAPIClient.NS().Lister().Get(remoteService.Namespace); err != nil {
|
||||
if kerrors.IsNotFound(err) {
|
||||
rcsw.log.Warnf("Skipping mirroring of service %s: namespace %s does not exist", serviceInfo, remoteService.Namespace)
|
||||
return &corev1.Service{}, nil
|
||||
if rcsw.namespaceCreationEnabled {
|
||||
if err := rcsw.mirrorNamespaceIfNecessary(ctx, remoteService.Namespace); err != nil {
|
||||
return &corev1.Service{}, err
|
||||
}
|
||||
} else {
|
||||
// Ensure the namespace exists, and skip mirroring if it doesn't
|
||||
if _, err := rcsw.localAPIClient.NS().Lister().Get(remoteService.Namespace); err != nil {
|
||||
if kerrors.IsNotFound(err) {
|
||||
rcsw.log.Warnf("Skipping mirroring of service %s: namespace %s does not exist", serviceInfo, remoteService.Namespace)
|
||||
return &corev1.Service{}, nil
|
||||
}
|
||||
// something else went wrong, so we can just retry
|
||||
return nil, RetryableError{[]error{err}}
|
||||
}
|
||||
// something else went wrong, so we can just retry
|
||||
return nil, RetryableError{[]error{err}}
|
||||
}
|
||||
|
||||
serviceToCreate := &corev1.Service{
|
||||
|
|
Loading…
Reference in New Issue