Compare commits

...

3 Commits

Author SHA1 Message Date
sh 4aee7f6b58 fix: cloneset revision update (#1549)
Signed-off-by: suhe <suhe@njust.edu.cn>
2024-04-03 10:30:14 +08:00
berg fe0b27730f restrict the access permissions of kruise-daemon to secrets (#1482)
* restrict the access permissions of kruise-daemon to secrets

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>

* util meta ut

Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>

---------

Signed-off-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>
Co-authored-by: mingzhou.swx <mingzhou.swx@alibaba-inc.com>
2024-01-26 11:27:04 +08:00
berg 914146d9a4 disable feature-gate PreDownloadImageForInPlaceUpdate by default (#1244)
Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>
2023-04-07 09:50:03 +08:00
24 changed files with 616 additions and 36 deletions

View File

@ -8,7 +8,7 @@
> No, really, you must read this before you upgrade
- Enable following feature-gates by default: PreDownloadImageForInPlaceUpdate, ResourcesDeletionProtection, WorkloadSpread, PodUnavailableBudgetDeleteGate, InPlaceUpdateEnvFromMetadata,
- Enable following feature-gates by default: ResourcesDeletionProtection, WorkloadSpread, PodUnavailableBudgetDeleteGate, InPlaceUpdateEnvFromMetadata,
StatefulSetAutoDeletePVC, PodProbeMarkerGate. ([#1214](https://github.com/openkruise/kruise/pull/1214), [@zmberg](https://github.com/zmberg))
- Change Kruise leader election from configmap to configmapsleases, this is a smooth upgrade with no disruption to OpenKruise service. ([#1184](https://github.com/openkruise/kruise/pull/1184), [@YTGhost](https://github.com/YTGhost))

View File

@ -76,10 +76,11 @@ deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in
cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
$(KUSTOMIZE) build config/default | kubectl apply -f -
echo -e "resources:\n- manager.yaml" > config/manager/kustomization.yaml
$(KUSTOMIZE) build config/daemonconfig | kubectl apply -f -
undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/config.
$(KUSTOMIZE) build config/default | kubectl delete -f -
$(KUSTOMIZE) build config/daemonconfig | kubectl delete -f -
CONTROLLER_GEN = $(shell pwd)/bin/controller-gen
controller-gen: ## Download controller-gen locally if necessary.

View File

@ -35,6 +35,7 @@ OpenKruise (官网: [https://openkruise.io](https://openkruise.io)) 是CNCF([Clo
- [**SidecarSet** - 定义和升级你的 sidecar 容器](https://openkruise.io/zh/docs/user-manuals/sidecarset)
- [**Container Launch Priority** 控制sidecar启动顺序](https://openkruise.io/zh/docs/user-manuals/containerlaunchpriority)
- [**Sidecar Job Terminator** 当 Job 类 Pod 主容器退出后Terminator Sidecar容器](https://openkruise.io/zh/docs/user-manuals/jobsidecarterminator)
- **多区域管理**
@ -49,6 +50,9 @@ OpenKruise (官网: [https://openkruise.io](https://openkruise.io)) 是CNCF([Clo
- [原地重启 pod 中的容器](https://openkruise.io/zh/docs/user-manuals/containerrecreaterequest)
- [指定的一批节点上拉取镜像](https://openkruise.io/zh/docs/user-manuals/imagepulljob)
- [**ResourceDistribution** 支持 Secret、Configmaps 资源跨 Namespace 分发](https://openkruise.io/zh/docs/user-manuals/resourcedistribution)
- [**PersistentPodState** 保持Pod的一些状态比如"固定IP调度"](https://openkruise.io/zh/docs/user-manuals/persistentpodstate)
- [**PodProbeMarker** 提供自定义Probe探测的能力](https://openkruise.io/zh/docs/user-manuals/podprobemarker)
- **应用安全防护**
@ -73,7 +77,8 @@ OpenKruise (官网: [https://openkruise.io](https://openkruise.io)) 是CNCF([Clo
- Spectro Cloud, 艾佳生活, Arkane Systems, 滴普科技, 火花思维
- OPPO, 苏宁, 欢聚时代, 汇量科技, 深圳凤凰木网络有限公司
- 小米, 网易, 美团金融, 虾皮购物, e签宝
- LinkedIn, 雪球, 兴盛优选, Wholee
- LinkedIn, 雪球, 兴盛优选, Wholee, LilithGames, Baidu
- Bilibili, 冠赢互娱, MeiTuan, 同城
## 贡献

View File

@ -35,6 +35,7 @@ It consists of several controllers which extend and complement the [Kubernetes c
- [**SidecarSet** for defining and upgrading your own sidecars](https://openkruise.io/docs/user-manuals/sidecarset)
- [**Container Launch Priority** to control the container startup orders](https://openkruise.io/docs/user-manuals/containerlaunchpriority)
- [**Sidecar Job Terminator** terminates sidecar containers for such job-type Pods when its main containers completed.](https://openkruise.io/docs/user-manuals/jobsidecarterminator)
- **Multi-domain Management**
@ -48,8 +49,11 @@ It consists of several controllers which extend and complement the [Kubernetes c
- **Enhanced Operations**
- [Restart containers in a running pod](https://openkruise.io/docs/user-manuals/containerrecreaterequest)
- [Download images on specific nodes](https://openkruise.io/docs/user-manuals/imagepulljob)
- [**ContainerRecreateRequest** provides a way to let users restart/recreate containers in a running pod](https://openkruise.io/docs/user-manuals/containerrecreaterequest)
- [**ImagePullJob** pre-download images on specific nodes](https://openkruise.io/docs/user-manuals/imagepulljob)
- [**ResourceDistribution** support Secret & ConfigMap resource distribution across namespaces](https://openkruise.io/docs/user-manuals/resourcedistribution)
- [**PersistentPodState** is able to persistent states of the Pod, such as "IP Retention"](https://openkruise.io/docs/user-manuals/persistentpodstate)
- [**PodProbeMarker** provides the ability to customize the Probe and return the result to the Pod](https://openkruise.io/docs/user-manuals/podprobemarker)
- **Application Protection**
@ -74,7 +78,8 @@ Registration: [Who is using Kruise](https://github.com/openkruise/kruise/issues/
- Spectro Cloud, ihomefnt, Arkane Systems, Deepexi, 火花思维
- OPPO, Suning.cn, joyy, Mobvista, 深圳凤凰木网络有限公司
- xiaomi, Netease, MeiTuan Finance, Shopee, Esign
- LinkedIn, 雪球, 兴盛优选, Wholee
- LinkedIn, 雪球, 兴盛优选, Wholee, LilithGames, Baidu
- Bilibili, 冠赢互娱, MeiTuan, 同城
## Contributing

View File

@ -24,6 +24,12 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
v1 "k8s.io/kubernetes/pkg/apis/core/v1"
utilpointer "k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
const (
// ProtectionFinalizer is designed to ensure the GC of resources.
ProtectionFinalizer = "apps.kruise.io/deletion-protection"
)
// SetDefaults_SidecarSet set default values for SidecarSet.
@ -351,7 +357,7 @@ func SetDefaultsImageTagPullPolicy(obj *v1alpha1.ImageTagPullPolicy) {
}
// SetDefaults_ImagePullJob set default values for ImagePullJob.
func SetDefaultsImagePullJob(obj *v1alpha1.ImagePullJob) {
func SetDefaultsImagePullJob(obj *v1alpha1.ImagePullJob, addProtection bool) {
if obj.Spec.CompletionPolicy.Type == "" {
obj.Spec.CompletionPolicy.Type = v1alpha1.Always
}
@ -364,4 +370,7 @@ func SetDefaultsImagePullJob(obj *v1alpha1.ImagePullJob) {
if obj.Spec.PullPolicy.BackoffLimit == nil {
obj.Spec.PullPolicy.BackoffLimit = utilpointer.Int32Ptr(3)
}
if addProtection {
controllerutil.AddFinalizer(obj, ProtectionFinalizer)
}
}

View File

@ -0,0 +1,3 @@
resources:
- namespace.yaml
- rbac.yaml

View File

@ -0,0 +1,4 @@
apiVersion: v1
kind: Namespace
metadata:
name: kruise-daemon-config

View File

@ -0,0 +1,29 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
creationTimestamp: null
name: kruise-daemon-secret-role
namespace: kruise-daemon-config
rules:
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: kruise-daemon-secret-rolebinding
namespace: kruise-daemon-config
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: kruise-daemon-secret-role
subjects:
- kind: ServiceAccount
name: kruise-daemon
namespace: kruise-system

View File

@ -0,0 +1,8 @@
namespace: kruise-daemon-config
# Value of this field is prepended to the
# names of all resources, e.g. a deployment named
# "wordpress" becomes "alices-wordpress".
# Note that it should also match with the prefix (text before '-') of the namespace
# field above.
bases:
- config

View File

@ -0,0 +1,6 @@
apiVersion: v1
kind: Namespace
metadata:
labels:
control-plane: controller-manager
name: kruise-daemon-config

View File

@ -1,6 +1,3 @@
# Adds namespace to all resources.
namespace: kruise-system
# Value of this field is prepended to the
# names of all resources, e.g. a deployment named
# "wordpress" becomes "alices-wordpress".
@ -12,16 +9,19 @@ namePrefix: kruise-
#commonLabels:
# someName: someValue
resources:
- kruise-daemon-config.yaml
bases:
- ../crd
- ../rbac
- ../manager
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# crd/kustomization.yaml
- ../webhook
# [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER'. 'WEBHOOK' components are required.
#- ../certmanager
# [PROMETHEUS] To enable prometheus monitor, uncomment all sections with 'PROMETHEUS'.
# [PROMETHEUS] To enable prometheus monitor, uncomment all sections with 'PROMETHEUS'.
#- ../prometheus
patchesStrategicMerge:
@ -30,7 +30,7 @@ patchesStrategicMerge:
# endpoint w/o any authn/z, please comment the following line.
# - manager_auth_proxy_patch.yaml
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# crd/kustomization.yaml
- manager_webhook_patch.yaml

View File

@ -1,2 +1,5 @@
# Adds namespace to all resources.
namespace: kruise-system
resources:
- manager.yaml

View File

@ -53,8 +53,6 @@ rules:
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
@ -64,14 +62,6 @@ rules:
- get
- patch
- update
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- list
- watch
- apiGroups:
- apps.kruise.io
resources:

View File

@ -1,3 +1,6 @@
# Adds namespace to all resources.
namespace: kruise-system
resources:
- role.yaml
- role_binding.yaml

View File

@ -1,3 +1,6 @@
# Adds namespace to all resources.
namespace: kruise-system
resources:
- manifests.yaml
- service.yaml

View File

@ -100,8 +100,8 @@ func (r *realStatusUpdater) calculateStatus(cs *appsv1alpha1.CloneSet, newStatus
newStatus.UpdatedReadyReplicas++
}
}
// Consider the update revision as stable if revisions of all pods are consistent to it, no need to wait all of them ready
if newStatus.UpdatedReplicas == newStatus.Replicas {
// Consider the update revision as stable if revisions of all pods are consistent to it and have the expected number of replicas, no need to wait all of them ready
if newStatus.UpdatedReplicas == newStatus.Replicas && newStatus.Replicas == *cs.Spec.Replicas {
newStatus.CurrentRevision = newStatus.UpdateRevision
}

View File

@ -23,6 +23,7 @@ import (
"sort"
"time"
"github.com/openkruise/kruise/apis/apps/defaults"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/openkruise/kruise/pkg/features"
@ -38,10 +39,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@ -56,11 +59,22 @@ var (
concurrentReconciles = 3
controllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("ImagePullJob")
resourceVersionExpectations = expectations.NewResourceVersionExpectation()
scaleExpectations = expectations.NewScaleExpectations()
)
const (
defaultParallelism = 1
minRequeueTime = time.Second
// SourceSecretKeyAnno is an annotations instead of label
// because the length of key may be more than 64.
SourceSecretKeyAnno = "imagepulljobs.kruise.io/source-key"
// SourceSecretUIDLabelKey is designed to select target via source secret.
SourceSecretUIDLabelKey = "imagepulljobs.kruise.io/source-uid"
// TargetOwnerReferencesAnno records the keys of imagePullJobs that refers
// the target secret. If TargetOwnerReferencesAnno is empty, means the target
// secret should be deleted.
TargetOwnerReferencesAnno = "imagepulljobs.kruise.io/references"
)
// Add creates a new ImagePullJob Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
@ -107,6 +121,12 @@ func add(mgr manager.Manager, r *ReconcileImagePullJob) error {
return err
}
// Watch for secret for jobs that have pullSecrets
err = c.Watch(&source.Kind{Type: &v1.Secret{}}, &secretEventHandler{Reader: mgr.GetCache()})
if err != nil {
return err
}
return nil
}
@ -151,6 +171,16 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R
return reconcile.Result{}, err
}
// If scale expectations have not satisfied yet, just skip this reconcile
if scaleSatisfied, unsatisfiedDuration, dirtyData := scaleExpectations.SatisfiedExpectations(request.String()); !scaleSatisfied {
if unsatisfiedDuration >= expectations.ExpectationTimeout {
klog.Warningf("ImagePullJob: expectation unsatisfied overtime for %v, dirtyData=%v, overtime=%v", request.String(), dirtyData, unsatisfiedDuration)
return reconcile.Result{}, nil
}
klog.V(4).Infof("ImagePullJob: not satisfied scale for %v, dirtyData=%v", request.String(), dirtyData)
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}
// If resourceVersion expectations have not satisfied yet, just skip this reconcile
resourceVersionExpectations.Observe(job)
if isSatisfied, unsatisfiedDuration := resourceVersionExpectations.IsSatisfied(job); !isSatisfied {
@ -163,11 +193,17 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R
}
if job.DeletionTimestamp != nil {
return reconcile.Result{}, nil
// ensure the GC of secrets and remove protection finalizer
return reconcile.Result{}, r.finalize(job)
}
// The Job has been finished
if job.Status.CompletionTime != nil {
// ensure the GC of secrets and remove protection finalizer
if err = r.finalize(job); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to remove finalizer: %v", err)
}
var leftTime time.Duration
if job.Spec.CompletionPolicy.TTLSecondsAfterFinished != nil {
leftTime = time.Duration(*job.Spec.CompletionPolicy.TTLSecondsAfterFinished)*time.Second - time.Since(job.Status.CompletionTime.Time)
@ -182,6 +218,11 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R
return reconcile.Result{RequeueAfter: leftTime}, nil
}
// add protection finalizer to ensure the GC of secrets
if err = r.addProtectionFinalizer(job); err != nil {
return reconcile.Result{}, err
}
// Get all NodeImage related to this ImagePullJob
nodeImages, err := utilimagejob.GetNodeImagesForJob(r.Client, job)
if err != nil {
@ -201,14 +242,20 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R
}
}
// sync secret to kruise-daemon-config namespace before pulling
secrets, err := r.syncSecrets(job)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to sync secrets: %v", err)
}
// Calculate the new status for this job
newStatus, notSyncedNodeImages, err := r.calculateStatus(job, nodeImages)
newStatus, notSyncedNodeImages, err := r.calculateStatus(job, nodeImages, secrets)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to calculate status: %v", err)
}
// Sync image to more NodeImages
if err = r.syncNodeImages(job, newStatus, notSyncedNodeImages); err != nil {
if err = r.syncNodeImages(job, newStatus, notSyncedNodeImages, secrets); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to sync NodeImages: %v", err)
}
@ -231,7 +278,28 @@ func (r *ReconcileImagePullJob) Reconcile(_ context.Context, request reconcile.R
return reconcile.Result{}, nil
}
func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, newStatus *appsv1alpha1.ImagePullJobStatus, notSyncedNodeImages []string) error {
func (r *ReconcileImagePullJob) syncSecrets(job *appsv1alpha1.ImagePullJob) ([]appsv1alpha1.ReferenceObject, error) {
if job.Namespace == util.GetKruiseDaemonConfigNamespace() {
return getSecrets(job), nil // Ignore this special case.
}
targetMap, deleteMap, err := r.getTargetSecretMap(job)
if err != nil {
return nil, err
}
if err = r.releaseTargetSecrets(deleteMap, job); err != nil {
return nil, err
}
if job.DeletionTimestamp != nil || job.Status.CompletionTime != nil {
return nil, r.releaseTargetSecrets(targetMap, job)
}
if err = r.checkNamespaceExists(util.GetKruiseDaemonConfigNamespace()); err != nil {
return nil, fmt.Errorf("failed to check kruise-daemon-config namespace: %v", err)
}
return r.syncTargetSecrets(job, targetMap)
}
func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, newStatus *appsv1alpha1.ImagePullJobStatus, notSyncedNodeImages []string, secrets []appsv1alpha1.ReferenceObject) error {
if len(notSyncedNodeImages) == 0 {
return nil
}
@ -251,7 +319,6 @@ func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, n
}
ownerRef := getOwnerRef(job)
secrets := getSecrets(job)
pullPolicy := getImagePullPolicy(job)
now := metav1.NewTime(r.clock.Now())
imageName, imageTag, _ := daemonutil.NormalizeImageRefToNameTag(job.Spec.Image)
@ -335,7 +402,116 @@ func (r *ReconcileImagePullJob) syncNodeImages(job *appsv1alpha1.ImagePullJob, n
return nil
}
func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob, nodeImages []*appsv1alpha1.NodeImage) (*appsv1alpha1.ImagePullJobStatus, []string, error) {
func (r *ReconcileImagePullJob) getTargetSecretMap(job *appsv1alpha1.ImagePullJob) (map[string]*v1.Secret, map[string]*v1.Secret, error) {
options := client.ListOptions{
Namespace: util.GetKruiseDaemonConfigNamespace(),
}
targetLister := &v1.SecretList{}
if err := r.List(context.TODO(), targetLister, &options, utilclient.DisableDeepCopy); err != nil {
return nil, nil, err
}
jobKey := keyFromObject(job)
sourceReferences := getSecrets(job)
deleteMap := make(map[string]*v1.Secret)
targetMap := make(map[string]*v1.Secret, len(targetLister.Items))
for i := range targetLister.Items {
target := &targetLister.Items[i]
if target.DeletionTimestamp != nil {
continue
}
keySet := referenceSetFromTarget(target)
if !keySet.Contains(jobKey) {
continue
}
sourceNs, sourceName, err := cache.SplitMetaNamespaceKey(target.Annotations[SourceSecretKeyAnno])
if err != nil {
klog.Warningf("Failed to parse source key from target %s annotations: %s", target.Name, err)
}
if containsObject(sourceReferences, appsv1alpha1.ReferenceObject{Namespace: sourceNs, Name: sourceName}) {
targetMap[target.Labels[SourceSecretUIDLabelKey]] = target
} else {
deleteMap[target.Labels[SourceSecretUIDLabelKey]] = target
}
}
return targetMap, deleteMap, nil
}
func (r *ReconcileImagePullJob) releaseTargetSecrets(targetMap map[string]*v1.Secret, job *appsv1alpha1.ImagePullJob) error {
if len(targetMap) == 0 {
return nil
}
jobKey := keyFromObject(job)
for _, secret := range targetMap {
if secret == nil {
continue
}
keySet := referenceSetFromTarget(secret)
// Remove the reference to this job from target, we use Update instead of
// Patch to make sure we do not delete any targets that is still referred,
// because a target may be newly referred in this reconcile round.
if keySet.Contains(keyFromObject(job)) {
keySet.Delete(jobKey)
secret = secret.DeepCopy()
secret.Annotations[TargetOwnerReferencesAnno] = keySet.String()
if err := r.Update(context.TODO(), secret); err != nil {
return err
}
resourceVersionExpectations.Expect(secret)
}
// The target is still referred by other jobs, do not delete it.
if !keySet.IsEmpty() {
return nil
}
// Just delete it if no one refers it anymore.
if err := r.Delete(context.TODO(), secret); err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func (r *ReconcileImagePullJob) syncTargetSecrets(job *appsv1alpha1.ImagePullJob, targetMap map[string]*v1.Secret) ([]appsv1alpha1.ReferenceObject, error) {
sourceReferences := getSecrets(job)
targetReferences := make([]appsv1alpha1.ReferenceObject, 0, len(sourceReferences))
for _, sourceRef := range sourceReferences {
source := &v1.Secret{}
if err := r.Get(context.TODO(), keyFromRef(sourceRef), source); err != nil {
if errors.IsNotFound(err) {
continue
}
return nil, err
}
target := targetMap[string(source.UID)]
switch action := computeTargetSyncAction(source, target, job); action {
case create:
referenceKeys := makeReferenceSet(keyFromObject(job))
target = targetFromSource(source, referenceKeys)
scaleExpectations.ExpectScale(keyFromObject(job).String(), expectations.Create, string(source.UID))
if err := r.Create(context.TODO(), target); err != nil {
scaleExpectations.ObserveScale(keyFromObject(job).String(), expectations.Create, string(source.UID))
return nil, err
}
case update:
referenceKeys := referenceSetFromTarget(target).Insert(keyFromObject(job))
target = updateTarget(target, source, referenceKeys)
if err := r.Update(context.TODO(), target); err != nil {
return nil, err
}
resourceVersionExpectations.Expect(target)
}
targetReferences = append(targetReferences, appsv1alpha1.ReferenceObject{Namespace: target.Namespace, Name: target.Name})
}
return targetReferences, nil
}
func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob, nodeImages []*appsv1alpha1.NodeImage, secrets []appsv1alpha1.ReferenceObject) (*appsv1alpha1.ImagePullJobStatus, []string, error) {
newStatus := appsv1alpha1.ImagePullJobStatus{
StartTime: job.Status.StartTime,
Desired: int32(len(nodeImages)),
@ -353,7 +529,20 @@ func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob,
var notSynced, pulling, succeeded, failed []string
for _, nodeImage := range nodeImages {
var tagVersion int64 = -1
var secretSynced bool = true
if imageSpec, ok := nodeImage.Spec.Images[imageName]; ok {
for _, secret := range secrets {
if !containsObject(imageSpec.PullSecrets, secret) {
secretSynced = false
break
}
}
if !secretSynced {
notSynced = append(notSynced, nodeImage.Name)
continue
}
for _, tagSpec := range imageSpec.Tags {
if tagSpec.Tag != imageTag {
continue
@ -371,6 +560,7 @@ func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob,
tagVersion = tagSpec.Version
}
}
if tagVersion < 0 {
notSynced = append(notSynced, nodeImage.Name)
continue
@ -426,3 +616,29 @@ func (r *ReconcileImagePullJob) calculateStatus(job *appsv1alpha1.ImagePullJob,
sort.Strings(newStatus.FailedNodes)
return &newStatus, notSynced, nil
}
func (r *ReconcileImagePullJob) checkNamespaceExists(nsName string) error {
namespace := v1.Namespace{}
return r.Get(context.TODO(), types.NamespacedName{Name: nsName}, &namespace)
}
// addProtectionFinalizer ensure the GC of secrets in kruise-daemon-config ns
func (r *ReconcileImagePullJob) addProtectionFinalizer(job *appsv1alpha1.ImagePullJob) error {
if controllerutil.ContainsFinalizer(job, defaults.ProtectionFinalizer) {
return nil
}
job.Finalizers = append(job.Finalizers, defaults.ProtectionFinalizer)
return r.Update(context.TODO(), job)
}
// finalize also ensure the GC of secrets in kruise-daemon-config ns
func (r *ReconcileImagePullJob) finalize(job *appsv1alpha1.ImagePullJob) error {
if !controllerutil.ContainsFinalizer(job, defaults.ProtectionFinalizer) {
return nil
}
if _, err := r.syncSecrets(job); err != nil {
return err
}
controllerutil.RemoveFinalizer(job, defaults.ProtectionFinalizer)
return r.Update(context.TODO(), job)
}

View File

@ -17,10 +17,14 @@ limitations under the License.
package imagepulljob
import (
"context"
"reflect"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
kruiseutil "github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/expectations"
utilimagejob "github.com/openkruise/kruise/pkg/util/imagejob"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
@ -187,6 +191,102 @@ func (e *podEventHandler) handleUpdate(pod, oldPod *v1.Pod, q workqueue.RateLimi
}
}
type secretEventHandler struct {
client.Reader
}
var _ handler.EventHandler = &secretEventHandler{}
func (e *secretEventHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
obj := evt.Object.(*v1.Secret)
e.handle(obj, q)
}
func (e *secretEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
newObj := evt.ObjectNew.(*v1.Secret)
oldObj := evt.ObjectOld.(*v1.Secret)
e.handleUpdate(newObj, oldObj, q)
}
func (e *secretEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
}
func (e *secretEventHandler) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {
}
func (e *secretEventHandler) handle(secret *v1.Secret, q workqueue.RateLimitingInterface) {
if secret != nil && secret.Namespace == kruiseutil.GetKruiseDaemonConfigNamespace() {
jobKeySet := referenceSetFromTarget(secret)
klog.V(4).Infof("Observe secret %s/%s created, uid: %s, refs: %s", secret.Namespace, secret.Name, secret.UID, jobKeySet.String())
for key := range jobKeySet {
scaleExpectations.ObserveScale(key.String(), expectations.Create, secret.Labels[SourceSecretUIDLabelKey])
}
return
}
if secret == nil || secret.DeletionTimestamp != nil {
return
}
// Get jobs related to this Secret
jobKeys, err := e.getActiveJobKeysForSecret(secret)
if err != nil {
klog.Errorf("Failed to get jobs for Secret %s/%s: %v", secret.Namespace, secret.Name, err)
}
for _, jKey := range jobKeys {
q.Add(reconcile.Request{NamespacedName: jKey})
}
}
func (e *secretEventHandler) handleUpdate(secretNew, secretOld *v1.Secret, q workqueue.RateLimitingInterface) {
if secretNew != nil && secretNew.Namespace == kruiseutil.GetKruiseDaemonConfigNamespace() {
jobKeySet := referenceSetFromTarget(secretNew)
for key := range jobKeySet {
scaleExpectations.ObserveScale(key.String(), expectations.Create, secretNew.Labels[SourceSecretUIDLabelKey])
}
return
}
if secretOld == nil || secretNew == nil || secretNew.DeletionTimestamp != nil ||
(reflect.DeepEqual(secretNew.Data, secretOld.Data) && reflect.DeepEqual(secretNew.StringData, secretOld.StringData)) {
return
}
// Get jobs related to this Secret
jobKeys, err := e.getActiveJobKeysForSecret(secretNew)
if err != nil {
klog.Errorf("Failed to get jobs for Secret %s/%s: %v", secretNew.Namespace, secretNew.Name, err)
}
for _, jKey := range jobKeys {
q.Add(reconcile.Request{NamespacedName: jKey})
}
}
func (e *secretEventHandler) getActiveJobKeysForSecret(secret *v1.Secret) ([]types.NamespacedName, error) {
jobLister := &appsv1alpha1.ImagePullJobList{}
if err := e.List(context.TODO(), jobLister, client.InNamespace(secret.Namespace), utilclient.DisableDeepCopy); err != nil {
return nil, err
}
var jobKeys []types.NamespacedName
for i := range jobLister.Items {
job := &jobLister.Items[i]
if job.DeletionTimestamp != nil {
continue
}
if jobContainsSecret(job, secret.Name) {
jobKeys = append(jobKeys, keyFromObject(job))
}
}
return jobKeys, nil
}
func jobContainsSecret(job *appsv1alpha1.ImagePullJob, secretName string) bool {
for _, s := range job.Spec.PullSecrets {
if secretName == s {
return true
}
}
return false
}
func diffJobs(newJobs, oldJobs []*appsv1alpha1.ImagePullJob) set {
setNew := make(set, len(newJobs))
setOld := make(set, len(oldJobs))

View File

@ -19,15 +19,28 @@ package imagepulljob
import (
"fmt"
"math/rand"
"reflect"
"strings"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
defaultTTLSecondsForNever = int32(24 * 3600)
type syncAction string
const (
defaultTTLSecondsForNever = int32(24 * 3600)
defaultActiveDeadlineSecondsForNever = int64(1800)
create syncAction = "create"
update syncAction = "update"
noAction syncAction = "noAction"
)
func getTTLSecondsForAlways(job *appsv1alpha1.ImagePullJob) *int32 {
@ -118,3 +131,105 @@ func formatStatusMessage(status *appsv1alpha1.ImagePullJobStatus) (ret string) {
}
return fmt.Sprintf("job is running, progress %.1f%%", 100.0*float64(status.Succeeded+status.Failed)/float64(status.Desired))
}
func keyFromRef(ref appsv1alpha1.ReferenceObject) types.NamespacedName {
return types.NamespacedName{
Name: ref.Name,
Namespace: ref.Namespace,
}
}
func keyFromObject(object client.Object) types.NamespacedName {
return types.NamespacedName{
Name: object.GetName(),
Namespace: object.GetNamespace(),
}
}
func targetFromSource(source *v1.Secret, keySet referenceSet) *v1.Secret {
target := source.DeepCopy()
target.ObjectMeta = metav1.ObjectMeta{
Namespace: util.GetKruiseDaemonConfigNamespace(),
GenerateName: fmt.Sprintf("%s-", source.Name),
Labels: map[string]string{
SourceSecretUIDLabelKey: string(source.UID),
},
Annotations: map[string]string{
SourceSecretKeyAnno: keyFromObject(source).String(),
TargetOwnerReferencesAnno: keySet.String(),
},
}
return target
}
func updateTarget(target, source *v1.Secret, keySet referenceSet) *v1.Secret {
target = target.DeepCopy()
target.Data = source.Data
target.StringData = source.StringData
target.Annotations[TargetOwnerReferencesAnno] = keySet.String()
return target
}
func referenceSetFromTarget(target *v1.Secret) referenceSet {
refs := strings.Split(target.Annotations[TargetOwnerReferencesAnno], ",")
keys := makeReferenceSet()
for _, ref := range refs {
namespace, name, err := cache.SplitMetaNamespaceKey(ref)
if err != nil {
klog.Errorf("Failed to parse job key from target secret %s annotations: %v", target.Name, err)
continue
}
keys.Insert(types.NamespacedName{Namespace: namespace, Name: name})
}
return keys
}
func computeTargetSyncAction(source, target *v1.Secret, job *appsv1alpha1.ImagePullJob) syncAction {
if target == nil || len(target.UID) == 0 {
return create
}
keySet := referenceSetFromTarget(target)
if !keySet.Contains(keyFromObject(job)) ||
!reflect.DeepEqual(source.Data, target.Data) ||
!reflect.DeepEqual(source.StringData, target.StringData) {
return update
}
return noAction
}
func makeReferenceSet(items ...types.NamespacedName) referenceSet {
refSet := map[types.NamespacedName]struct{}{}
for _, item := range items {
refSet[item] = struct{}{}
}
return refSet
}
type referenceSet map[types.NamespacedName]struct{}
func (set referenceSet) String() string {
keyList := make([]string, 0, len(set))
for ref := range set {
keyList = append(keyList, ref.String())
}
return strings.Join(keyList, ",")
}
func (set referenceSet) Contains(key types.NamespacedName) bool {
_, exists := set[key]
return exists
}
func (set referenceSet) Insert(key types.NamespacedName) referenceSet {
set[key] = struct{}{}
return set
}
func (set referenceSet) Delete(key types.NamespacedName) referenceSet {
delete(set, key)
return set
}
func (set referenceSet) IsEmpty() bool {
return len(set) == 0
}

View File

@ -110,7 +110,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
CloneSetShortHash: {Default: false, PreRelease: featuregate.Alpha},
KruisePodReadinessGate: {Default: false, PreRelease: featuregate.Alpha},
PreDownloadImageForInPlaceUpdate: {Default: true, PreRelease: featuregate.Alpha},
PreDownloadImageForInPlaceUpdate: {Default: false, PreRelease: featuregate.Alpha},
CloneSetPartitionRollback: {Default: false, PreRelease: featuregate.Alpha},
ResourcesDeletionProtection: {Default: true, PreRelease: featuregate.Alpha},
WorkloadSpread: {Default: true, PreRelease: featuregate.Alpha},

View File

@ -1,5 +1,5 @@
/*
Copyright 2021.
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -24,3 +24,10 @@ func GetKruiseNamespace() string {
}
return "kruise-system"
}
func GetKruiseDaemonConfigNamespace() string {
if ns := os.Getenv("KRUISE_DAEMON_CONFIG_NS"); len(ns) > 0 {
return ns
}
return "kruise-daemon-config"
}

39
pkg/util/meta_test.go Normal file
View File

@ -0,0 +1,39 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"os"
"testing"
)
func TestMetaGetNamespace(t *testing.T) {
if GetKruiseNamespace() != "kruise-system" {
t.Fatalf("expect(kruise-system), but get(%s)", GetKruiseNamespace())
}
_ = os.Setenv("POD_NAMESPACE", "test")
if GetKruiseNamespace() != "test" {
t.Fatalf("expect(test), but get(%s)", GetKruiseNamespace())
}
if GetKruiseDaemonConfigNamespace() != "kruise-daemon-config" {
t.Fatalf("expect(kruise-daemon-config), but get(%s)", GetKruiseDaemonConfigNamespace())
}
_ = os.Setenv("KRUISE_DAEMON_CONFIG_NS", "test")
if GetKruiseDaemonConfigNamespace() != "test" {
t.Fatalf("expect(test), but get(%s)", GetKruiseDaemonConfigNamespace())
}
}

View File

@ -25,6 +25,7 @@ import (
"github.com/openkruise/kruise/apis/apps/defaults"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/util"
admissionv1 "k8s.io/api/admission/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
@ -46,7 +47,7 @@ func (h *ImagePullJobCreateUpdateHandler) Handle(ctx context.Context, req admiss
return admission.Errored(http.StatusBadRequest, err)
}
var copy runtime.Object = obj.DeepCopy()
defaults.SetDefaultsImagePullJob(obj)
defaults.SetDefaultsImagePullJob(obj, req.AdmissionRequest.Operation == admissionv1.Create)
if reflect.DeepEqual(obj, copy) {
return admission.Allowed("")
}

View File

@ -21,8 +21,13 @@ import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
"github.com/openkruise/kruise/pkg/controller/imagepulljob"
"github.com/openkruise/kruise/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
)
type ImagePullJobTester struct {
@ -57,3 +62,31 @@ func (tester *ImagePullJobTester) GetJob(job *appsv1alpha1.ImagePullJob) (*appsv
func (tester *ImagePullJobTester) ListJobs(ns string) (*appsv1alpha1.ImagePullJobList, error) {
return tester.kc.AppsV1alpha1().ImagePullJobs(ns).List(context.TODO(), metav1.ListOptions{})
}
func (tester *ImagePullJobTester) CreateSecret(secret *v1.Secret) (*v1.Secret, error) {
return tester.c.CoreV1().Secrets(secret.Namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
}
func (tester *ImagePullJobTester) UpdateSecret(secret *v1.Secret) (*v1.Secret, error) {
namespace, name := secret.GetNamespace(), secret.GetName()
var err error
var newSecret *v1.Secret
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
newSecret, err = tester.c.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
newSecret.Data = secret.Data
newSecret, err = tester.c.CoreV1().Secrets(namespace).Update(context.TODO(), newSecret, metav1.UpdateOptions{})
return err
})
return newSecret, err
}
func (tester *ImagePullJobTester) ListSyncedSecrets(source *v1.Secret) ([]v1.Secret, error) {
options := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{imagepulljob.SourceSecretUIDLabelKey: string(source.UID)}).String(),
}
lister, err := tester.c.CoreV1().Secrets(util.GetKruiseDaemonConfigNamespace()).List(context.TODO(), options)
return lister.Items, err
}