restrict the access permissions of kruise-daemon to secrets (#1482)

* restrict the access permissions of kruise-daemon to secrets

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>

* util meta ut

Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>

---------

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>
Co-authored-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
This commit is contained in:
berg 2024-01-04 17:34:19 +08:00 committed by liheng.zms
parent c5b63fa2df
commit c0384d675e
19 changed files with 598 additions and 28 deletions

View File

@ -76,10 +76,11 @@ deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in
cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
$(KUSTOMIZE) build config/default | kubectl apply -f -
echo -e "resources:\n- manager.yaml" > config/manager/kustomization.yaml
$(KUSTOMIZE) build config/daemonconfig | kubectl apply -f -
undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/config.
$(KUSTOMIZE) build config/default | kubectl delete -f -
$(KUSTOMIZE) build config/daemonconfig | kubectl delete -f -
CONTROLLER_GEN = $(shell pwd)/bin/controller-gen
controller-gen: ## Download controller-gen locally if necessary.

View File

@ -24,6 +24,12 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
v1 "k8s.io/kubernetes/pkg/apis/core/v1"
utilpointer "k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
const (
// ProtectionFinalizer is designed to ensure the GC of resources.
ProtectionFinalizer = "apps.kruise.io/deletion-protection"
)
// SetDefaults_SidecarSet set default values for SidecarSet.
@ -351,7 +357,7 @@ func SetDefaultsImageTagPullPolicy(obj *v1alpha1.ImageTagPullPolicy) {
}
// SetDefaults_ImagePullJob set default values for ImagePullJob.
func SetDefaultsImagePullJob(obj *v1alpha1.ImagePullJob) {
func SetDefaultsImagePullJob(obj *v1alpha1.ImagePullJob, addProtection bool) {
if obj.Spec.CompletionPolicy.Type == "" {
obj.Spec.CompletionPolicy.Type = v1alpha1.Always
}
@ -364,4 +370,7 @@ func SetDefaultsImagePullJob(obj *v1alpha1.ImagePullJob) {
if obj.Spec.PullPolicy.BackoffLimit == nil {
obj.Spec.PullPolicy.BackoffLimit = utilpointer.Int32Ptr(3)
}
if addProtection {
controllerutil.AddFinalizer(obj, ProtectionFinalizer)
}
}

View File

@ -0,0 +1,3 @@
resources:
- namespace.yaml
- rbac.yaml

View File

@ -0,0 +1,4 @@
apiVersion: v1
kind: Namespace
metadata:
name: kruise-daemon-config

View File

@ -0,0 +1,29 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
creationTimestamp: null
name: kruise-daemon-secret-role
namespace: kruise-daemon-config
rules:
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: kruise-daemon-secret-rolebinding
namespace: kruise-daemon-config
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: kruise-daemon-secret-role
subjects:
- kind: ServiceAccount
name: kruise-daemon
namespace: kruise-system

View File

@ -0,0 +1,8 @@
namespace: kruise-daemon-config
# Value of this field is prepended to the
# names of all resources, e.g. a deployment named
# "wordpress" becomes "alices-wordpress".
# Note that it should also match with the prefix (text before '-') of the namespace
# field above.
bases:
- config

View File

@ -0,0 +1,6 @@
apiVersion: v1
kind: Namespace
metadata:
labels:
control-plane: controller-manager
name: kruise-daemon-config

View File

@ -1,6 +1,3 @@
# Adds namespace to all resources.
namespace: kruise-system
# Value of this field is prepended to the
# names of all resources, e.g. a deployment named
# "wordpress" becomes "alices-wordpress".
@ -12,16 +9,19 @@ namePrefix: kruise-
#commonLabels:
# someName: someValue
resources:
- kruise-daemon-config.yaml
bases:
- ../crd
- ../rbac
- ../manager
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# crd/kustomization.yaml
- ../webhook
# [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER'. 'WEBHOOK' components are required.
#- ../certmanager
# [PROMETHEUS] To enable prometheus monitor, uncomment all sections with 'PROMETHEUS'.
# [PROMETHEUS] To enable prometheus monitor, uncomment all sections with 'PROMETHEUS'.
#- ../prometheus
patchesStrategicMerge:
@ -30,7 +30,7 @@ patchesStrategicMerge:
# endpoint w/o any authn/z, please comment the following line.
# - manager_auth_proxy_patch.yaml
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# crd/kustomization.yaml
- manager_webhook_patch.yaml

View File

@ -1,2 +1,5 @@
# Adds namespace to all resources.
namespace: kruise-system
resources:
- manager.yaml

View File

@ -53,8 +53,6 @@ rules:
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
@ -64,14 +62,6 @@ rules:
- get
- patch
- update
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- list
- watch
- apiGroups:
- apps.kruise.io
resources:

View File

@ -1,3 +1,6 @@
# Adds namespace to all resources.
namespace: kruise-system
resources:
- role.yaml
- role_binding.yaml

View File

@ -1,3 +1,6 @@
# Adds namespace to all resources.
namespace: kruise-system
resources:
- manifests.yaml
- service.yaml

View File

@ -23,6 +23,7 @@ import (
"sort"
"time"
"github.com/openkruise/kruise/apis/apps/defaults"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/openkruise/kruise/pkg/features"
@ -38,10 +39,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@ -56,11 +59,22 @@ var (
concurrentReconciles = 3
controllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("ImagePullJob")
resourceVersionExpectations = expectations.NewResourceVersionExpectation()
scaleExpectations = expectations.NewScaleExpectations()
)
const (
defaultParallelism = 1
minRequeueTime = time.Second
// SourceSecretKeyAnno is an annotations instead of label
// because the length of key may be more than 64.
SourceSecretKeyAnno = "imagepulljobs.kruise.io/source-key"
// SourceSecretUIDLabelKey is designed to select target via source secret.
SourceSecretUIDLabelKey = "imagepulljobs.kruise.io/source-uid"
// TargetOwnerReferencesAnno records the keys of imagePullJobs that refers
// the target secret. If TargetOwnerReferencesAnno is empty, means the target
// secret should be deleted.
TargetOwnerReferencesAnno = "imagepulljobs.kruise.io/references"
)
// Add creates a new ImagePullJob Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
@ -107,6 +121,12 @@ func add(mgr manager.Manager, r *ReconcileImagePullJob) error {
return err
}
// Watch for secret for jobs that have pullSecrets
err = c.Watch(&source.Kind{Type: &v1.Secret{}}, &secretEventHandler{Reader: mgr.GetCache()})
if err != nil {
return err
}
return nil
}
@ -151,6 +171,16 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R
return reconcile.Result{}, err
}
// If scale expectations have not satisfied yet, just skip this reconcile
if scaleSatisfied, unsatisfiedDuration, dirtyData := scaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied {
if unsatisfiedDuration >= expectations.ExpectationTimeout {
klog.Warningf("ImagePullJob: expectation unsatisfied overtime for %v, dirtyData=%v, overtime=%v", request.String(), dirtyData, unsatisfiedDuration)
return reconcile.Result{}, nil
}
klog.V(4).Infof("ImagePullJob: not satisfied scale for %v, dirtyData=%v", request.String(), dirtyData)
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}
// If resourceVersion expectations have not satisfied yet, just skip this reconcile
resourceVersionExpectations.Observe(job)
if isSatisfied, unsatisfiedDuration := resourceVersionExpectations.IsSatisfied(job); !isSatisfied {
@ -163,11 +193,17 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R
}
if job.DeletionTimestamp != nil {
return reconcile.Result{}, nil
// ensure the GC of secrets and remove protection finalizer
return reconcile.Result{}, r.finalize(job)
}
// The Job has been finished
if job.Status.CompletionTime != nil {
// ensure the GC of secrets and remove protection finalizer
if err = r.finalize(job); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to remove finalizer: %v", err)
}
var leftTime time.Duration
if job.Spec.CompletionPolicy.TTLSecondsAfterFinished != nil {
leftTime = time.Duration(*job.Spec.CompletionPolicy.TTLSecondsAfterFinished)*time.Second - time.Since(job.Status.CompletionTime.Time)
@ -182,6 +218,11 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R
return reconcile.Result{RequeueAfter: leftTime}, nil
}
// add protection finalizer to ensure the GC of secrets
if err = r.addProtectionFinalizer(job); err != nil {
return reconcile.Result{}, err
}
// Get all NodeImage related to this ImagePullJob
nodeImages, err := utilimagejob.GetNodeImagesForJob(r.Client, job)
if err != nil {
@ -201,14 +242,20 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R
}
}
// sync secret to kruise-daemon-config namespace before pulling
secrets, err := r.syncSecrets(job)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to sync secrets: %v", err)
}
// Calculate the new status for this job
newStatus, notSyncedNodeImages, err := r.calculateStatus(job, nodeImages)
newStatus, notSyncedNodeImages, err := r.calculateStatus(job, nodeImages, secrets)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to calculate status: %v", err)
}
// Sync image to more NodeImages
if err = r.syncNodeImages(job, newStatus, notSyncedNodeImages); err != nil {
if err = r.syncNodeImages(job, newStatus, notSyncedNodeImages, secrets); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to sync NodeImages: %v", err)
}
@ -231,7 +278,28 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R
return reconcile.Result{}, nil
}
func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, newStatus *appsv1alpha1.ImagePullJobStatus, notSyncedNodeImages []string) error {
func (r *ReconcileImagePullJob) syncSecrets(job *appsv1alpha1.ImagePullJob) ([]appsv1alpha1.ReferenceObject, error) {
if job.Namespace == util.GetKruiseDaemonConfigNamespace() {
return getSecrets(job), nil // Ignore this special case.
}
targetMap, deleteMap, err := r.getTargetSecretMap(job)
if err != nil {
return nil, err
}
if err = r.releaseTargetSecrets(deleteMap, job); err != nil {
return nil, err
}
if job.DeletionTimestamp != nil || job.Status.CompletionTime != nil {
return nil, r.releaseTargetSecrets(targetMap, job)
}
if err = r.checkNamespaceExists(util.GetKruiseDaemonConfigNamespace()); err != nil {
return nil, fmt.Errorf("failed to check kruise-daemon-config namespace: %v", err)
}
return r.syncTargetSecrets(job, targetMap)
}
func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, newStatus *appsv1alpha1.ImagePullJobStatus, notSyncedNodeImages []string, secrets []appsv1alpha1.ReferenceObject) error {
if len(notSyncedNodeImages) == 0 {
return nil
}
@ -251,7 +319,6 @@ func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, n
}
ownerRef := getOwnerRef(job)
secrets := getSecrets(job)
pullPolicy := getImagePullPolicy(job)
now := metav1.NewTime(r.clock.Now())
@ -335,7 +402,116 @@ func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, n
return nil
}
func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob, nodeImages []*appsv1alpha1.NodeImage) (*appsv1alpha1.ImagePullJobStatus, []string, error) {
func (r *ReconcileImagePullJob) getTargetSecretMap(job *appsv1alpha1.ImagePullJob) (map[string]*v1.Secret, map[string]*v1.Secret, error) {
options := client.ListOptions{
Namespace: util.GetKruiseDaemonConfigNamespace(),
}
targetLister := &v1.SecretList{}
if err := r.List(context.TODO(), targetLister, &options, utilclient.DisableDeepCopy); err != nil {
return nil, nil, err
}
jobKey := keyFromObject(job)
sourceReferences := getSecrets(job)
deleteMap := make(map[string]*v1.Secret)
targetMap := make(map[string]*v1.Secret, len(targetLister.Items))
for i := range targetLister.Items {
target := &targetLister.Items[i]
if target.DeletionTimestamp != nil {
continue
}
keySet := referenceSetFromTarget(target)
if !keySet.Contains(jobKey) {
continue
}
sourceNs, sourceName, err := cache.SplitMetaNamespaceKey(target.Annotations[SourceSecretKeyAnno])
if err != nil {
klog.Warningf("Failed to parse source key from target %s annotations: %s", target.Name, err)
}
if containsObject(sourceReferences, appsv1alpha1.ReferenceObject{Namespace: sourceNs, Name: sourceName}) {
targetMap[target.Labels[SourceSecretUIDLabelKey]] = target
} else {
deleteMap[target.Labels[SourceSecretUIDLabelKey]] = target
}
}
return targetMap, deleteMap, nil
}
func (r *ReconcileImagePullJob) releaseTargetSecrets(targetMap map[string]*v1.Secret, job *appsv1alpha1.ImagePullJob) error {
if len(targetMap) == 0 {
return nil
}
jobKey := keyFromObject(job)
for _, secret := range targetMap {
if secret == nil {
continue
}
keySet := referenceSetFromTarget(secret)
// Remove the reference to this job from target, we use Update instead of
// Patch to make sure we do not delete any targets that is still referred,
// because a target may be newly referred in this reconcile round.
if keySet.Contains(keyFromObject(job)) {
keySet.Delete(jobKey)
secret = secret.DeepCopy()
secret.Annotations[TargetOwnerReferencesAnno] = keySet.String()
if err := r.Update(context.TODO(), secret); err != nil {
return err
}
resourceVersionExpectations.Expect(secret)
}
// The target is still referred by other jobs, do not delete it.
if !keySet.IsEmpty() {
return nil
}
// Just delete it if no one refers it anymore.
if err := r.Delete(context.TODO(), secret); err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func (r *ReconcileImagePullJob) syncTargetSecrets(job *appsv1alpha1.ImagePullJob, targetMap map[string]*v1.Secret) ([]appsv1alpha1.ReferenceObject, error) {
sourceReferences := getSecrets(job)
targetReferences := make([]appsv1alpha1.ReferenceObject, 0, len(sourceReferences))
for _, sourceRef := range sourceReferences {
source := &v1.Secret{}
if err := r.Get(context.TODO(), keyFromRef(sourceRef), source); err != nil {
if errors.IsNotFound(err) {
continue
}
return nil, err
}
target := targetMap[string(source.UID)]
switch action := computeTargetSyncAction(source, target, job); action {
case create:
referenceKeys := makeReferenceSet(keyFromObject(job))
target = targetFromSource(source, referenceKeys)
scaleExpectations.ExpectScale(keyFromObject(job).String(), expectations.Create, string(source.UID))
if err := r.Create(context.TODO(), target); err != nil {
scaleExpectations.ObserveScale(keyFromObject(job).String(), expectations.Create, string(source.UID))
return nil, err
}
case update:
referenceKeys := referenceSetFromTarget(target).Insert(keyFromObject(job))
target = updateTarget(target, source, referenceKeys)
if err := r.Update(context.TODO(), target); err != nil {
return nil, err
}
resourceVersionExpectations.Expect(target)
}
targetReferences = append(targetReferences, appsv1alpha1.ReferenceObject{Namespace: target.Namespace, Name: target.Name})
}
return targetReferences, nil
}
func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob, nodeImages []*appsv1alpha1.NodeImage, secrets []appsv1alpha1.ReferenceObject) (*appsv1alpha1.ImagePullJobStatus, []string, error) {
newStatus := appsv1alpha1.ImagePullJobStatus{
StartTime: job.Status.StartTime,
Desired: int32(len(nodeImages)),
@ -353,7 +529,20 @@ func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob,
var notSynced, pulling, succeeded, failed []string
for _, nodeImage := range nodeImages {
var tagVersion int64 = -1
var secretSynced bool = true
if imageSpec, ok := nodeImage.Spec.Images[imageName]; ok {
for _, secret := range secrets {
if !containsObject(imageSpec.PullSecrets, secret) {
secretSynced = false
break
}
}
if !secretSynced {
notSynced = append(notSynced, nodeImage.Name)
continue
}
for _, tagSpec := range imageSpec.Tags {
if tagSpec.Tag != imageTag {
continue
@ -371,6 +560,7 @@ func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob,
tagVersion = tagSpec.Version
}
}
if tagVersion < 0 {
notSynced = append(notSynced, nodeImage.Name)
continue
@ -426,3 +616,29 @@ func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob,
sort.Strings(newStatus.FailedNodes)
return &newStatus, notSynced, nil
}
func (r *ReconcileImagePullJob) checkNamespaceExists(nsName string) error {
namespace := v1.Namespace{}
return r.Get(context.TODO(), types.NamespacedName{Name: nsName}, &namespace)
}
// addProtectionFinalizer ensure the GC of secrets in kruise-daemon-config ns
func (r *ReconcileImagePullJob) addProtectionFinalizer(job *appsv1alpha1.ImagePullJob) error {
if controllerutil.ContainsFinalizer(job, defaults.ProtectionFinalizer) {
return nil
}
job.Finalizers = append(job.Finalizers, defaults.ProtectionFinalizer)
return r.Update(context.TODO(), job)
}
// finalize also ensure the GC of secrets in kruise-daemon-config ns
func (r *ReconcileImagePullJob) finalize(job *appsv1alpha1.ImagePullJob) error {
if !controllerutil.ContainsFinalizer(job, defaults.ProtectionFinalizer) {
return nil
}
if _, err := r.syncSecrets(job); err != nil {
return err
}
controllerutil.RemoveFinalizer(job, defaults.ProtectionFinalizer)
return r.Update(context.TODO(), job)
}

View File

@ -17,10 +17,14 @@ limitations under the License.
package imagepulljob
import (
"context"
"reflect"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
kruiseutil "github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/expectations"
utilimagejob "github.com/openkruise/kruise/pkg/util/imagejob"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
@ -187,6 +191,102 @@ func (e *podEventHandler) handleUpdate(pod, oldPod *v1.Pod, q workqueue.RateLimi
}
}
type secretEventHandler struct {
client.Reader
}
var _ handler.EventHandler = &secretEventHandler{}
func (e *secretEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
obj := evt.Object.(*v1.Secret)
e.handle(obj, q)
}
func (e *secretEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
newObj := evt.ObjectNew.(*v1.Secret)
oldObj := evt.ObjectOld.(*v1.Secret)
e.handleUpdate(newObj, oldObj, q)
}
func (e *secretEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
}
func (e *secretEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
}
func (e *secretEventHandler) handle(secret *v1.Secret, q workqueue.RateLimitingInterface) {
if secret != nil && secret.Namespace == kruiseutil.GetKruiseDaemonConfigNamespace() {
jobKeySet := referenceSetFromTarget(secret)
klog.V(4).Infof("Observe secret %s/%s created, uid: %s, refs: %s", secret.Namespace, secret.Name, secret.UID, jobKeySet.String())
for key := range jobKeySet {
scaleExpectations.ObserveScale(key.String(), expectations.Create, secret.Labels[SourceSecretUIDLabelKey])
}
return
}
if secret == nil || secret.DeletionTimestamp != nil {
return
}
// Get jobs related to this Secret
jobKeys, err := e.getActiveJobKeysForSecret(secret)
if err != nil {
klog.Errorf("Failed to get jobs for Secret %s/%s: %v", secret.Namespace, secret.Name, err)
}
for _, jKey := range jobKeys {
q.Add(reconcile.Request{NamespacedName: jKey})
}
}
func (e *secretEventHandler) handleUpdate(secretNew, secretOld *v1.Secret, q workqueue.RateLimitingInterface) {
if secretNew != nil && secretNew.Namespace == kruiseutil.GetKruiseDaemonConfigNamespace() {
jobKeySet := referenceSetFromTarget(secretNew)
for key := range jobKeySet {
scaleExpectations.ObserveScale(key.String(), expectations.Create, secretNew.Labels[SourceSecretUIDLabelKey])
}
return
}
if secretOld == nil || secretNew == nil || secretNew.DeletionTimestamp != nil ||
(reflect.DeepEqual(secretNew.Data, secretOld.Data) && reflect.DeepEqual(secretNew.StringData, secretOld.StringData)) {
return
}
// Get jobs related to this Secret
jobKeys, err := e.getActiveJobKeysForSecret(secretNew)
if err != nil {
klog.Errorf("Failed to get jobs for Secret %s/%s: %v", secretNew.Namespace, secretNew.Name, err)
}
for _, jKey := range jobKeys {
q.Add(reconcile.Request{NamespacedName: jKey})
}
}
func (e *secretEventHandler) getActiveJobKeysForSecret(secret *v1.Secret) ([]types.NamespacedName, error) {
jobLister := &appsv1alpha1.ImagePullJobList{}
if err := e.List(context.TODO(), jobLister, client.InNamespace(secret.Namespace), utilclient.DisableDeepCopy); err != nil {
return nil, err
}
var jobKeys []types.NamespacedName
for i := range jobLister.Items {
job := &jobLister.Items[i]
if job.DeletionTimestamp != nil {
continue
}
if jobContainsSecret(job, secret.Name) {
jobKeys = append(jobKeys, keyFromObject(job))
}
}
return jobKeys, nil
}
func jobContainsSecret(job *appsv1alpha1.ImagePullJob, secretName string) bool {
for _, s := range job.Spec.PullSecrets {
if secretName == s {
return true
}
}
return false
}
func diffJobs(newJobs, oldJobs []*appsv1alpha1.ImagePullJob) set {
setNew := make(set, len(newJobs))
setOld := make(set, len(oldJobs))

View File

@ -19,15 +19,28 @@ package imagepulljob
import (
"fmt"
"math/rand"
"reflect"
"strings"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
defaultTTLSecondsForNever = int32(24 * 3600)
type syncAction string
const (
defaultTTLSecondsForNever = int32(24 * 3600)
defaultActiveDeadlineSecondsForNever = int64(1800)
create syncAction = "create"
update syncAction = "update"
noAction syncAction = "noAction"
)
func getTTLSecondsForAlways(job *appsv1alpha1.ImagePullJob) *int32 {
@ -118,3 +131,105 @@ func formatStatusMessage(status *appsv1alpha1.ImagePullJobStatus) (ret string) {
}
return fmt.Sprintf("job is running, progress %.1f%%", 100.0*float64(status.Succeeded+status.Failed)/float64(status.Desired))
}
func keyFromRef(ref appsv1alpha1.ReferenceObject) types.NamespacedName {
return types.NamespacedName{
Name: ref.Name,
Namespace: ref.Namespace,
}
}
func keyFromObject(object client.Object) types.NamespacedName {
return types.NamespacedName{
Name: object.GetName(),
Namespace: object.GetNamespace(),
}
}
func targetFromSource(source *v1.Secret, keySet referenceSet) *v1.Secret {
target := source.DeepCopy()
target.ObjectMeta = metav1.ObjectMeta{
Namespace: util.GetKruiseDaemonConfigNamespace(),
GenerateName: fmt.Sprintf("%s-", source.Name),
Labels: map[string]string{
SourceSecretUIDLabelKey: string(source.UID),
},
Annotations: map[string]string{
SourceSecretKeyAnno: keyFromObject(source).String(),
TargetOwnerReferencesAnno: keySet.String(),
},
}
return target
}
func updateTarget(target, source *v1.Secret, keySet referenceSet) *v1.Secret {
target = target.DeepCopy()
target.Data = source.Data
target.StringData = source.StringData
target.Annotations[TargetOwnerReferencesAnno] = keySet.String()
return target
}
func referenceSetFromTarget(target *v1.Secret) referenceSet {
refs := strings.Split(target.Annotations[TargetOwnerReferencesAnno], ",")
keys := makeReferenceSet()
for _, ref := range refs {
namespace, name, err := cache.SplitMetaNamespaceKey(ref)
if err != nil {
klog.Errorf("Failed to parse job key from target secret %s annotations: %v", target.Name, err)
continue
}
keys.Insert(types.NamespacedName{Namespace: namespace, Name: name})
}
return keys
}
func computeTargetSyncAction(source, target *v1.Secret, job *appsv1alpha1.ImagePullJob) syncAction {
if target == nil || len(target.UID) == 0 {
return create
}
keySet := referenceSetFromTarget(target)
if !keySet.Contains(keyFromObject(job)) ||
!reflect.DeepEqual(source.Data, target.Data) ||
!reflect.DeepEqual(source.StringData, target.StringData) {
return update
}
return noAction
}
func makeReferenceSet(items ...types.NamespacedName) referenceSet {
refSet := map[types.NamespacedName]struct{}{}
for _, item := range items {
refSet[item] = struct{}{}
}
return refSet
}
type referenceSet map[types.NamespacedName]struct{}
func (set referenceSet) String() string {
keyList := make([]string, 0, len(set))
for ref := range set {
keyList = append(keyList, ref.String())
}
return strings.Join(keyList, ",")
}
func (set referenceSet) Contains(key types.NamespacedName) bool {
_, exists := set[key]
return exists
}
func (set referenceSet) Insert(key types.NamespacedName) referenceSet {
set[key] = struct{}{}
return set
}
func (set referenceSet) Delete(key types.NamespacedName) referenceSet {
delete(set, key)
return set
}
func (set referenceSet) IsEmpty() bool {
return len(set) == 0
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2021.
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -24,3 +24,10 @@ func GetKruiseNamespace() string {
}
return "kruise-system"
}
func GetKruiseDaemonConfigNamespace() string {
if ns := os.Getenv("KRUISE_DAEMON_CONFIG_NS"); len(ns) > 0 {
return ns
}
return "kruise-daemon-config"
}

39
pkg/util/meta_test.go Normal file
View File

@ -0,0 +1,39 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"os"
"testing"
)
func TestMetaGetNamespace(t *testing.T) {
if GetKruiseNamespace() != "kruise-system" {
t.Fatalf("expect(kruise-system), but get(%s)", GetKruiseNamespace())
}
_ = os.Setenv("POD_NAMESPACE", "test")
if GetKruiseNamespace() != "test" {
t.Fatalf("expect(test), but get(%s)", GetKruiseNamespace())
}
if GetKruiseDaemonConfigNamespace() != "kruise-daemon-config" {
t.Fatalf("expect(kruise-daemon-config), but get(%s)", GetKruiseDaemonConfigNamespace())
}
_ = os.Setenv("KRUISE_DAEMON_CONFIG_NS", "test")
if GetKruiseDaemonConfigNamespace() != "test" {
t.Fatalf("expect(test), but get(%s)", GetKruiseDaemonConfigNamespace())
}
}

View File

@ -25,6 +25,7 @@ import (
"github.com/openkruise/kruise/apis/apps/defaults"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/util"
admissionv1 "k8s.io/api/admission/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
@ -46,7 +47,7 @@ func (h *ImagePullJobCreateUpdateHandler) Handle(ctx context.Context, req admiss
return admission.Errored(http.StatusBadRequest, err)
}
var copy runtime.Object = obj.DeepCopy()
defaults.SetDefaultsImagePullJob(obj)
defaults.SetDefaultsImagePullJob(obj, req.AdmissionRequest.Operation == admissionv1.Create)
if reflect.DeepEqual(obj, copy) {
return admission.Allowed("")
}

View File

@ -21,8 +21,13 @@ import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
"github.com/openkruise/kruise/pkg/controller/imagepulljob"
"github.com/openkruise/kruise/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
)
type ImagePullJobTester struct {
@ -57,3 +62,31 @@ func (tester *ImagePullJobTester) GetJob(job *appsv1alpha1.ImagePullJob) (*appsv
func (tester *ImagePullJobTester) ListJobs(ns string) (*appsv1alpha1.ImagePullJobList, error) {
return tester.kc.AppsV1alpha1().ImagePullJobs(ns).List(context.TODO(), metav1.ListOptions{})
}
func (tester *ImagePullJobTester) CreateSecret(secret *v1.Secret) (*v1.Secret, error) {
return tester.c.CoreV1().Secrets(secret.Namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
}
func (tester *ImagePullJobTester) UpdateSecret(secret *v1.Secret) (*v1.Secret, error) {
namespace, name := secret.GetNamespace(), secret.GetName()
var err error
var newSecret *v1.Secret
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
newSecret, err = tester.c.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
newSecret.Data = secret.Data
newSecret, err = tester.c.CoreV1().Secrets(namespace).Update(context.TODO(), newSecret, metav1.UpdateOptions{})
return err
})
return newSecret, err
}
func (tester *ImagePullJobTester) ListSyncedSecrets(source *v1.Secret) ([]v1.Secret, error) {
options := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{imagepulljob.SourceSecretUIDLabelKey: string(source.UID)}).String(),
}
lister, err := tester.c.CoreV1().Secrets(util.GetKruiseDaemonConfigNamespace()).List(context.TODO(), options)
return lister.Items, err
}