Merge pull request #125868 from soltysh/wait_for
Add --for=create option to kubectl wait Kubernetes-commit: 37f733a657ef71d66177d00f9b7d47ec507dedd3
This commit is contained in:
		
						commit
						15aefcb085
					
				
							
								
								
									
										2
									
								
								go.mod
								
								
								
								
							
							
						
						
									
										2
									
								
								go.mod
								
								
								
								
							|  | @ -31,7 +31,7 @@ require ( | ||||||
| 	gopkg.in/yaml.v2 v2.4.0 | 	gopkg.in/yaml.v2 v2.4.0 | ||||||
| 	k8s.io/api v0.0.0-20240707022826-2cf4612580d3 | 	k8s.io/api v0.0.0-20240707022826-2cf4612580d3 | ||||||
| 	k8s.io/apimachinery v0.0.0-20240711022542-6b362fab6df2 | 	k8s.io/apimachinery v0.0.0-20240711022542-6b362fab6df2 | ||||||
| 	k8s.io/cli-runtime v0.0.0-20240707030735-f3a07984ce75 | 	k8s.io/cli-runtime v0.0.0-20240711160938-80033e7bae90 | ||||||
| 	k8s.io/client-go v0.0.0-20240710183246-7f36d816ee99 | 	k8s.io/client-go v0.0.0-20240710183246-7f36d816ee99 | ||||||
| 	k8s.io/component-base v0.0.0-20240707024307-c741ec442cc3 | 	k8s.io/component-base v0.0.0-20240707024307-c741ec442cc3 | ||||||
| 	k8s.io/component-helpers v0.0.0-20240707024433-5a2d7426dbcc | 	k8s.io/component-helpers v0.0.0-20240707024433-5a2d7426dbcc | ||||||
|  |  | ||||||
							
								
								
									
										4
									
								
								go.sum
								
								
								
								
							
							
						
						
									
										4
									
								
								go.sum
								
								
								
								
							|  | @ -281,8 +281,8 @@ k8s.io/api v0.0.0-20240707022826-2cf4612580d3 h1:2bHi5SAn7dRf6oJGO29yFEtMyIN9oCD | ||||||
| k8s.io/api v0.0.0-20240707022826-2cf4612580d3/go.mod h1:TeZW9JsLR2mmT3LA7JsD/4i5g40HHrREIdaD0QCEP1Y= | k8s.io/api v0.0.0-20240707022826-2cf4612580d3/go.mod h1:TeZW9JsLR2mmT3LA7JsD/4i5g40HHrREIdaD0QCEP1Y= | ||||||
| k8s.io/apimachinery v0.0.0-20240711022542-6b362fab6df2 h1:NFXoAJM3jIlKAOfDNCKb29pjw8iNnhtT+Z6YOQhXM50= | k8s.io/apimachinery v0.0.0-20240711022542-6b362fab6df2 h1:NFXoAJM3jIlKAOfDNCKb29pjw8iNnhtT+Z6YOQhXM50= | ||||||
| k8s.io/apimachinery v0.0.0-20240711022542-6b362fab6df2/go.mod h1:Et4EUFrefx1K28ZwNXpkHUqq7fSML2FROj79Ku7Lj1w= | k8s.io/apimachinery v0.0.0-20240711022542-6b362fab6df2/go.mod h1:Et4EUFrefx1K28ZwNXpkHUqq7fSML2FROj79Ku7Lj1w= | ||||||
| k8s.io/cli-runtime v0.0.0-20240707030735-f3a07984ce75 h1:jWSuMI9QUWe40r8V1nEwFZ+AHg3wzTxH6eqClJ19CCE= | k8s.io/cli-runtime v0.0.0-20240711160938-80033e7bae90 h1:VE4DMc3nU/Y945k78wqxjPY+hGdDSEWHuzhfpy0FhXQ= | ||||||
| k8s.io/cli-runtime v0.0.0-20240707030735-f3a07984ce75/go.mod h1:2Umm5N7cEQTVpw0RI+Ner/0aB0bvFokCZIevJNf+eFM= | k8s.io/cli-runtime v0.0.0-20240711160938-80033e7bae90/go.mod h1:yAmTKyXms0bDV26vzKNx2mlvEJV0rylfuXJDbR8LQ8w= | ||||||
| k8s.io/client-go v0.0.0-20240710183246-7f36d816ee99 h1:ay9vtgW1GcBryEkZPnfx6pmWmkmLTWLKn3TizDGeTqI= | k8s.io/client-go v0.0.0-20240710183246-7f36d816ee99 h1:ay9vtgW1GcBryEkZPnfx6pmWmkmLTWLKn3TizDGeTqI= | ||||||
| k8s.io/client-go v0.0.0-20240710183246-7f36d816ee99/go.mod h1:M4YkUERkMjSHhNDDe6dYdACCOtPD4td6mRAME6AX7i8= | k8s.io/client-go v0.0.0-20240710183246-7f36d816ee99/go.mod h1:M4YkUERkMjSHhNDDe6dYdACCOtPD4td6mRAME6AX7i8= | ||||||
| k8s.io/component-base v0.0.0-20240707024307-c741ec442cc3 h1:ipvnLvus3w+bw+jX1TQGeMyet9Gi/RBFgZ4g6dipaXM= | k8s.io/component-base v0.0.0-20240707024307-c741ec442cc3 h1:ipvnLvus3w+bw+jX1TQGeMyet9Gi/RBFgZ4g6dipaXM= | ||||||
|  |  | ||||||
|  | @ -0,0 +1,197 @@ | ||||||
|  | /* | ||||||
|  | Copyright 2024 The Kubernetes Authors. | ||||||
|  | 
 | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  | 
 | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0
 | ||||||
|  | 
 | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  | 
 | ||||||
|  | package wait | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"errors" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"strings" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||
|  | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
|  | 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||||||
|  | 	"k8s.io/apimachinery/pkg/fields" | ||||||
|  | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
|  | 	"k8s.io/apimachinery/pkg/util/wait" | ||||||
|  | 	"k8s.io/apimachinery/pkg/watch" | ||||||
|  | 	"k8s.io/cli-runtime/pkg/resource" | ||||||
|  | 	"k8s.io/client-go/tools/cache" | ||||||
|  | 	watchtools "k8s.io/client-go/tools/watch" | ||||||
|  | 	"k8s.io/kubectl/pkg/util/interrupt" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // ConditionalWait hold information to check an API status condition
 | ||||||
|  | type ConditionalWait struct { | ||||||
|  | 	conditionName   string | ||||||
|  | 	conditionStatus string | ||||||
|  | 	// errOut is written to if an error occurs
 | ||||||
|  | 	errOut io.Writer | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // IsConditionMet is a conditionfunc for waiting on an API condition to be met
 | ||||||
|  | func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { | ||||||
|  | 	return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) { | ||||||
|  | 	conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions") | ||||||
|  | 	if err != nil { | ||||||
|  | 		return false, err | ||||||
|  | 	} | ||||||
|  | 	if !found { | ||||||
|  | 		return false, nil | ||||||
|  | 	} | ||||||
|  | 	for _, conditionUncast := range conditions { | ||||||
|  | 		condition := conditionUncast.(map[string]interface{}) | ||||||
|  | 		name, found, err := unstructured.NestedString(condition, "type") | ||||||
|  | 		if !found || err != nil || !strings.EqualFold(name, w.conditionName) { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		status, found, err := unstructured.NestedString(condition, "status") | ||||||
|  | 		if !found || err != nil { | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation") | ||||||
|  | 		if found { | ||||||
|  | 			observedGeneration, found := getObservedGeneration(obj, condition) | ||||||
|  | 			if found && observedGeneration < generation { | ||||||
|  | 				return false, nil | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		return strings.EqualFold(status, w.conditionStatus), nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return false, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) { | ||||||
|  | 	if event.Type == watch.Error { | ||||||
|  | 		// keep waiting in the event we see an error - we expect the watch to be closed by
 | ||||||
|  | 		// the server
 | ||||||
|  | 		err := apierrors.FromObject(event.Object) | ||||||
|  | 		fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err) | ||||||
|  | 		return false, nil | ||||||
|  | 	} | ||||||
|  | 	if event.Type == watch.Deleted { | ||||||
|  | 		// this will chain back out, result in another get and an return false back up the chain
 | ||||||
|  | 		return false, nil | ||||||
|  | 	} | ||||||
|  | 	obj := event.Object.(*unstructured.Unstructured) | ||||||
|  | 	return w.checkCondition(obj) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type isCondMetFunc func(event watch.Event) (bool, error) | ||||||
|  | type checkCondFunc func(obj *unstructured.Unstructured) (bool, error) | ||||||
|  | 
 | ||||||
|  | // getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
 | ||||||
|  | // If the condition is not met, it will make a Watch query to the server and pass in the condMet function
 | ||||||
|  | func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) { | ||||||
|  | 	if len(info.Name) == 0 { | ||||||
|  | 		return info.Object, false, fmt.Errorf("resource name must be provided") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	endTime := time.Now().Add(o.Timeout) | ||||||
|  | 	timeout := time.Until(endTime) | ||||||
|  | 	errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) // nolint:staticcheck // SA1019
 | ||||||
|  | 	if o.Timeout == 0 { | ||||||
|  | 		// If timeout is zero we will fetch the object(s) once only and check
 | ||||||
|  | 		gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{}) | ||||||
|  | 		if initObjGetErr != nil { | ||||||
|  | 			return nil, false, initObjGetErr | ||||||
|  | 		} | ||||||
|  | 		if gottenObj == nil { | ||||||
|  | 			return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName()) | ||||||
|  | 		} | ||||||
|  | 		conditionCheck, err := check(gottenObj) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return gottenObj, false, err | ||||||
|  | 		} | ||||||
|  | 		if !conditionCheck { | ||||||
|  | 			return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName()) | ||||||
|  | 		} | ||||||
|  | 		return gottenObj, true, nil | ||||||
|  | 	} | ||||||
|  | 	if timeout < 0 { | ||||||
|  | 		// we're out of time
 | ||||||
|  | 		return info.Object, false, errWaitTimeoutWithName | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears
 | ||||||
|  | 	fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String() | ||||||
|  | 	lw := &cache.ListWatch{ | ||||||
|  | 		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { | ||||||
|  | 			options.FieldSelector = fieldSelector | ||||||
|  | 			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options) | ||||||
|  | 		}, | ||||||
|  | 		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { | ||||||
|  | 			options.FieldSelector = fieldSelector | ||||||
|  | 			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options) | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
 | ||||||
|  | 	preconditionFunc := func(store cache.Store) (bool, error) { | ||||||
|  | 		_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name}) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return true, err | ||||||
|  | 		} | ||||||
|  | 		if !exists { | ||||||
|  | 			return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name) | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return false, nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	intrCtx, cancel := context.WithCancel(ctx) | ||||||
|  | 	defer cancel() | ||||||
|  | 	var result runtime.Object | ||||||
|  | 	intr := interrupt.New(nil, cancel) | ||||||
|  | 	err := intr.Run(func() error { | ||||||
|  | 		ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet)) | ||||||
|  | 		if ev != nil { | ||||||
|  | 			result = ev.Object | ||||||
|  | 		} | ||||||
|  | 		if errors.Is(err, context.DeadlineExceeded) { | ||||||
|  | 			return errWaitTimeoutWithName | ||||||
|  | 		} | ||||||
|  | 		return err | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		if errors.Is(err, wait.ErrWaitTimeout) { // nolint:staticcheck // SA1019
 | ||||||
|  | 			return result, false, errWaitTimeoutWithName | ||||||
|  | 		} | ||||||
|  | 		return result, false, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return result, true, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func extendErrWaitTimeout(err error, info *resource.Info) error { | ||||||
|  | 	return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) { | ||||||
|  | 	conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration") | ||||||
|  | 	if found { | ||||||
|  | 		return conditionObservedGeneration, true | ||||||
|  | 	} | ||||||
|  | 	statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration") | ||||||
|  | 	return statusObservedGeneration, found | ||||||
|  | } | ||||||
|  | @ -0,0 +1,33 @@ | ||||||
|  | /* | ||||||
|  | Copyright 2024 The Kubernetes Authors. | ||||||
|  | 
 | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  | 
 | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0
 | ||||||
|  | 
 | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  | 
 | ||||||
|  | package wait | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"fmt" | ||||||
|  | 
 | ||||||
|  | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
|  | 	"k8s.io/cli-runtime/pkg/resource" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // IsCreated is a condition func for waiting for something to be created
 | ||||||
|  | func IsCreated(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { | ||||||
|  | 	if len(info.Name) == 0 || info.Object == nil { | ||||||
|  | 		return nil, false, fmt.Errorf("resource name must be provided") | ||||||
|  | 	} | ||||||
|  | 	return info.Object, true, nil | ||||||
|  | } | ||||||
|  | @ -0,0 +1,144 @@ | ||||||
|  | /* | ||||||
|  | Copyright 2024 The Kubernetes Authors. | ||||||
|  | 
 | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  | 
 | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0
 | ||||||
|  | 
 | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  | 
 | ||||||
|  | package wait | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"errors" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
|  | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||
|  | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
|  | 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||||||
|  | 	"k8s.io/apimachinery/pkg/fields" | ||||||
|  | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
|  | 	"k8s.io/apimachinery/pkg/util/wait" | ||||||
|  | 	"k8s.io/apimachinery/pkg/watch" | ||||||
|  | 	"k8s.io/cli-runtime/pkg/resource" | ||||||
|  | 	"k8s.io/client-go/tools/cache" | ||||||
|  | 	watchtools "k8s.io/client-go/tools/watch" | ||||||
|  | 	"k8s.io/kubectl/pkg/util/interrupt" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // IsDeleted is a condition func for waiting for something to be deleted
 | ||||||
|  | func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { | ||||||
|  | 	if len(info.Name) == 0 { | ||||||
|  | 		return info.Object, false, fmt.Errorf("resource name must be provided") | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(ctx, info.Name, metav1.GetOptions{}) | ||||||
|  | 	if apierrors.IsNotFound(initObjGetErr) { | ||||||
|  | 		return info.Object, true, nil | ||||||
|  | 	} | ||||||
|  | 	if initObjGetErr != nil { | ||||||
|  | 		// TODO this could do something slightly fancier if we wish
 | ||||||
|  | 		return info.Object, false, initObjGetErr | ||||||
|  | 	} | ||||||
|  | 	resourceLocation := ResourceLocation{ | ||||||
|  | 		GroupResource: info.Mapping.Resource.GroupResource(), | ||||||
|  | 		Namespace:     gottenObj.GetNamespace(), | ||||||
|  | 		Name:          gottenObj.GetName(), | ||||||
|  | 	} | ||||||
|  | 	if uid, ok := o.UIDMap[resourceLocation]; ok { | ||||||
|  | 		if gottenObj.GetUID() != uid { | ||||||
|  | 			return gottenObj, true, nil | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	endTime := time.Now().Add(o.Timeout) | ||||||
|  | 	timeout := time.Until(endTime) | ||||||
|  | 	errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) // nolint:staticcheck // SA1019
 | ||||||
|  | 	if o.Timeout == 0 { | ||||||
|  | 		// If timeout is zero check if the object exists once only
 | ||||||
|  | 		if gottenObj == nil { | ||||||
|  | 			return nil, true, nil | ||||||
|  | 		} | ||||||
|  | 		return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName()) | ||||||
|  | 	} | ||||||
|  | 	if timeout < 0 { | ||||||
|  | 		// we're out of time
 | ||||||
|  | 		return info.Object, false, errWaitTimeoutWithName | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String() | ||||||
|  | 	lw := &cache.ListWatch{ | ||||||
|  | 		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { | ||||||
|  | 			options.FieldSelector = fieldSelector | ||||||
|  | 			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(ctx, options) | ||||||
|  | 		}, | ||||||
|  | 		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { | ||||||
|  | 			options.FieldSelector = fieldSelector | ||||||
|  | 			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(ctx, options) | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
 | ||||||
|  | 	preconditionFunc := func(store cache.Store) (bool, error) { | ||||||
|  | 		_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name}) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return true, err | ||||||
|  | 		} | ||||||
|  | 		if !exists { | ||||||
|  | 			// since we're looking for it to disappear we just return here if it no longer exists
 | ||||||
|  | 			return true, nil | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		return false, nil | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	intrCtx, cancel := context.WithCancel(ctx) | ||||||
|  | 	defer cancel() | ||||||
|  | 	intr := interrupt.New(nil, cancel) | ||||||
|  | 	err := intr.Run(func() error { | ||||||
|  | 		_, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted) | ||||||
|  | 		if errors.Is(err, context.DeadlineExceeded) { | ||||||
|  | 			return errWaitTimeoutWithName | ||||||
|  | 		} | ||||||
|  | 		return err | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		if errors.Is(err, wait.ErrWaitTimeout) { // nolint:staticcheck // SA1019
 | ||||||
|  | 			return gottenObj, false, errWaitTimeoutWithName | ||||||
|  | 		} | ||||||
|  | 		return gottenObj, false, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return gottenObj, true, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Wait has helper methods for handling watches, including error handling.
 | ||||||
|  | type Wait struct { | ||||||
|  | 	errOut io.Writer | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // IsDeleted returns true if the object is deleted. It prints any errors it encounters.
 | ||||||
|  | func (w Wait) IsDeleted(event watch.Event) (bool, error) { | ||||||
|  | 	switch event.Type { | ||||||
|  | 	case watch.Error: | ||||||
|  | 		// keep waiting in the event we see an error - we expect the watch to be closed by
 | ||||||
|  | 		// the server if the error is unrecoverable.
 | ||||||
|  | 		err := apierrors.FromObject(event.Object) | ||||||
|  | 		fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err) | ||||||
|  | 		return false, nil | ||||||
|  | 	case watch.Deleted: | ||||||
|  | 		return true, nil | ||||||
|  | 	default: | ||||||
|  | 		return false, nil | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | @ -0,0 +1,120 @@ | ||||||
|  | /* | ||||||
|  | Copyright 2024 The Kubernetes Authors. | ||||||
|  | 
 | ||||||
|  | Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
|  | you may not use this file except in compliance with the License. | ||||||
|  | You may obtain a copy of the License at | ||||||
|  | 
 | ||||||
|  |     http://www.apache.org/licenses/LICENSE-2.0
 | ||||||
|  | 
 | ||||||
|  | Unless required by applicable law or agreed to in writing, software | ||||||
|  | distributed under the License is distributed on an "AS IS" BASIS, | ||||||
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
|  | See the License for the specific language governing permissions and | ||||||
|  | limitations under the License. | ||||||
|  | */ | ||||||
|  | 
 | ||||||
|  | package wait | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"errors" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"reflect" | ||||||
|  | 	"strings" | ||||||
|  | 
 | ||||||
|  | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||
|  | 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||||||
|  | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
|  | 	"k8s.io/apimachinery/pkg/watch" | ||||||
|  | 	"k8s.io/cli-runtime/pkg/resource" | ||||||
|  | 	"k8s.io/client-go/util/jsonpath" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | // JSONPathWait holds a JSONPath Parser which has the ability
 | ||||||
|  | // to check for the JSONPath condition and compare with the API server provided JSON output.
 | ||||||
|  | type JSONPathWait struct { | ||||||
|  | 	matchAnyValue  bool | ||||||
|  | 	jsonPathValue  string | ||||||
|  | 	jsonPathParser *jsonpath.JSONPath | ||||||
|  | 	// errOut is written to if an error occurs
 | ||||||
|  | 	errOut io.Writer | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check
 | ||||||
|  | func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { | ||||||
|  | 	return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // isJSONPathConditionMet is a helper function of IsJSONPathConditionMet
 | ||||||
|  | // which check the watch event and check if a JSONPathWait condition is met
 | ||||||
|  | func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) { | ||||||
|  | 	if event.Type == watch.Error { | ||||||
|  | 		// keep waiting in the event we see an error - we expect the watch to be closed by
 | ||||||
|  | 		// the server
 | ||||||
|  | 		err := apierrors.FromObject(event.Object) | ||||||
|  | 		fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err) | ||||||
|  | 		return false, nil | ||||||
|  | 	} | ||||||
|  | 	if event.Type == watch.Deleted { | ||||||
|  | 		// this will chain back out, result in another get and an return false back up the chain
 | ||||||
|  | 		return false, nil | ||||||
|  | 	} | ||||||
|  | 	// event runtime Object can be safely asserted to Unstructed
 | ||||||
|  | 	// because we are working with dynamic client
 | ||||||
|  | 	obj := event.Object.(*unstructured.Unstructured) | ||||||
|  | 	return j.checkCondition(obj) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // checkCondition uses JSONPath parser to parse the JSON received from the API server
 | ||||||
|  | // and check if it matches the desired condition
 | ||||||
|  | func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) { | ||||||
|  | 	queryObj := obj.UnstructuredContent() | ||||||
|  | 	parseResults, err := j.jsonPathParser.FindResults(queryObj) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return false, err | ||||||
|  | 	} | ||||||
|  | 	if len(parseResults) == 0 || len(parseResults[0]) == 0 { | ||||||
|  | 		return false, nil | ||||||
|  | 	} | ||||||
|  | 	if err := verifyParsedJSONPath(parseResults); err != nil { | ||||||
|  | 		return false, err | ||||||
|  | 	} | ||||||
|  | 	if j.matchAnyValue { | ||||||
|  | 		return true, nil | ||||||
|  | 	} | ||||||
|  | 	isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathValue) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return false, err | ||||||
|  | 	} | ||||||
|  | 	return isConditionMet, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // verifyParsedJSONPath verifies the JSON received from the API server is valid.
 | ||||||
|  | // It will only accept a single JSON
 | ||||||
|  | func verifyParsedJSONPath(results [][]reflect.Value) error { | ||||||
|  | 	if len(results) > 1 { | ||||||
|  | 		return errors.New("given jsonpath expression matches more than one list") | ||||||
|  | 	} | ||||||
|  | 	if len(results[0]) > 1 { | ||||||
|  | 		return errors.New("given jsonpath expression matches more than one value") | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // compareResults will compare the reflect.Value from the result parsed by the
 | ||||||
|  | // JSONPath parser with the expected value given by the value
 | ||||||
|  | //
 | ||||||
|  | // Since this is coming from an unstructured this can only ever be a primitive,
 | ||||||
|  | // map[string]interface{}, or []interface{}.
 | ||||||
|  | // We do not support the last two and rely on fmt to handle conversion to string
 | ||||||
|  | // and compare the result with user input
 | ||||||
|  | func compareResults(r reflect.Value, expectedVal string) (bool, error) { | ||||||
|  | 	switch r.Interface().(type) { | ||||||
|  | 	case map[string]interface{}, []interface{}: | ||||||
|  | 		return false, errors.New("jsonpath leads to a nested object or list which is not supported") | ||||||
|  | 	} | ||||||
|  | 	s := fmt.Sprintf("%v", r.Interface()) | ||||||
|  | 	return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil | ||||||
|  | } | ||||||
|  | @ -21,33 +21,26 @@ import ( | ||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"io" | 	"io" | ||||||
| 	"reflect" |  | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"github.com/spf13/cobra" | 	"github.com/spf13/cobra" | ||||||
| 
 | 
 | ||||||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |  | ||||||
| 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" |  | ||||||
| 	"k8s.io/apimachinery/pkg/fields" |  | ||||||
| 	"k8s.io/apimachinery/pkg/runtime" | 	"k8s.io/apimachinery/pkg/runtime" | ||||||
| 	"k8s.io/apimachinery/pkg/runtime/schema" | 	"k8s.io/apimachinery/pkg/runtime/schema" | ||||||
| 	"k8s.io/apimachinery/pkg/types" | 	"k8s.io/apimachinery/pkg/types" | ||||||
| 	"k8s.io/apimachinery/pkg/util/wait" | 	"k8s.io/apimachinery/pkg/util/wait" | ||||||
| 	"k8s.io/apimachinery/pkg/watch" |  | ||||||
| 	"k8s.io/cli-runtime/pkg/genericclioptions" | 	"k8s.io/cli-runtime/pkg/genericclioptions" | ||||||
| 	"k8s.io/cli-runtime/pkg/genericiooptions" | 	"k8s.io/cli-runtime/pkg/genericiooptions" | ||||||
| 	"k8s.io/cli-runtime/pkg/printers" | 	"k8s.io/cli-runtime/pkg/printers" | ||||||
| 	"k8s.io/cli-runtime/pkg/resource" | 	"k8s.io/cli-runtime/pkg/resource" | ||||||
| 	"k8s.io/client-go/dynamic" | 	"k8s.io/client-go/dynamic" | ||||||
| 	"k8s.io/client-go/tools/cache" |  | ||||||
| 	watchtools "k8s.io/client-go/tools/watch" | 	watchtools "k8s.io/client-go/tools/watch" | ||||||
| 	"k8s.io/client-go/util/jsonpath" | 	"k8s.io/client-go/util/jsonpath" | ||||||
| 	cmdget "k8s.io/kubectl/pkg/cmd/get" | 	cmdget "k8s.io/kubectl/pkg/cmd/get" | ||||||
| 	cmdutil "k8s.io/kubectl/pkg/cmd/util" | 	cmdutil "k8s.io/kubectl/pkg/cmd/util" | ||||||
| 	"k8s.io/kubectl/pkg/util/i18n" | 	"k8s.io/kubectl/pkg/util/i18n" | ||||||
| 	"k8s.io/kubectl/pkg/util/interrupt" |  | ||||||
| 	"k8s.io/kubectl/pkg/util/templates" | 	"k8s.io/kubectl/pkg/util/templates" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | @ -194,11 +187,16 @@ func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) { | func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) { | ||||||
| 	if strings.ToLower(condition) == "delete" { | 	lowercaseCond := strings.ToLower(condition) | ||||||
|  | 	switch { | ||||||
|  | 	case lowercaseCond == "delete": | ||||||
| 		return IsDeleted, nil | 		return IsDeleted, nil | ||||||
| 	} | 
 | ||||||
| 	if strings.HasPrefix(condition, "condition=") { | 	case lowercaseCond == "create": | ||||||
| 		conditionName := condition[len("condition="):] | 		return IsCreated, nil | ||||||
|  | 
 | ||||||
|  | 	case strings.HasPrefix(lowercaseCond, "condition="): | ||||||
|  | 		conditionName := lowercaseCond[len("condition="):] | ||||||
| 		conditionValue := "true" | 		conditionValue := "true" | ||||||
| 		if equalsIndex := strings.Index(conditionName, "="); equalsIndex != -1 { | 		if equalsIndex := strings.Index(conditionName, "="); equalsIndex != -1 { | ||||||
| 			conditionValue = conditionName[equalsIndex+1:] | 			conditionValue = conditionName[equalsIndex+1:] | ||||||
|  | @ -210,9 +208,9 @@ func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) | ||||||
| 			conditionStatus: conditionValue, | 			conditionStatus: conditionValue, | ||||||
| 			errOut:          errOut, | 			errOut:          errOut, | ||||||
| 		}.IsConditionMet, nil | 		}.IsConditionMet, nil | ||||||
| 	} | 
 | ||||||
| 	if strings.HasPrefix(condition, "jsonpath=") { | 	case strings.HasPrefix(lowercaseCond, "jsonpath="): | ||||||
| 		jsonPathInput := strings.TrimPrefix(condition, "jsonpath=") | 		jsonPathInput := strings.TrimPrefix(lowercaseCond, "jsonpath=") | ||||||
| 		jsonPathExp, jsonPathValue, err := processJSONPathInput(jsonPathInput) | 		jsonPathExp, jsonPathValue, err := processJSONPathInput(jsonPathInput) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
|  | @ -320,6 +318,31 @@ func (o *WaitOptions) RunWait() error { | ||||||
| 	ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout) | 	ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout) | ||||||
| 	defer cancel() | 	defer cancel() | ||||||
| 
 | 
 | ||||||
|  | 	if strings.ToLower(o.ForCondition) == "create" { | ||||||
|  | 		// TODO(soltysh): this is not ideal solution, because we're polling every .5s,
 | ||||||
|  | 		// and we have to use ResourceFinder, which contains the resource name.
 | ||||||
|  | 		// In the long run, we should expose resource information from ResourceFinder,
 | ||||||
|  | 		// or functions from ResourceBuilder for parsing those. Lastly, this poll
 | ||||||
|  | 		// should be replaced with a ListWatch cache.
 | ||||||
|  | 		if err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, o.Timeout, true, func(context.Context) (done bool, err error) { | ||||||
|  | 			visitErr := o.ResourceFinder.Do().Visit(func(info *resource.Info, err error) error { | ||||||
|  | 				return nil | ||||||
|  | 			}) | ||||||
|  | 			if apierrors.IsNotFound(visitErr) { | ||||||
|  | 				return false, nil | ||||||
|  | 			} | ||||||
|  | 			if visitErr != nil { | ||||||
|  | 				return false, visitErr | ||||||
|  | 			} | ||||||
|  | 			return true, nil | ||||||
|  | 		}); err != nil { | ||||||
|  | 			if errors.Is(err, context.DeadlineExceeded) { | ||||||
|  | 				return fmt.Errorf("%s", wait.ErrWaitTimeout.Error()) // nolint:staticcheck // SA1019
 | ||||||
|  | 			} | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	visitCount := 0 | 	visitCount := 0 | ||||||
| 	visitFunc := func(info *resource.Info, err error) error { | 	visitFunc := func(info *resource.Info, err error) error { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|  | @ -352,356 +375,3 @@ func (o *WaitOptions) RunWait() error { | ||||||
| 	} | 	} | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
| 
 |  | ||||||
| // IsDeleted is a condition func for waiting for something to be deleted
 |  | ||||||
| func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { |  | ||||||
| 	if len(info.Name) == 0 { |  | ||||||
| 		return info.Object, false, fmt.Errorf("resource name must be provided") |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{}) |  | ||||||
| 	if apierrors.IsNotFound(initObjGetErr) { |  | ||||||
| 		return info.Object, true, nil |  | ||||||
| 	} |  | ||||||
| 	if initObjGetErr != nil { |  | ||||||
| 		// TODO this could do something slightly fancier if we wish
 |  | ||||||
| 		return info.Object, false, initObjGetErr |  | ||||||
| 	} |  | ||||||
| 	resourceLocation := ResourceLocation{ |  | ||||||
| 		GroupResource: info.Mapping.Resource.GroupResource(), |  | ||||||
| 		Namespace:     gottenObj.GetNamespace(), |  | ||||||
| 		Name:          gottenObj.GetName(), |  | ||||||
| 	} |  | ||||||
| 	if uid, ok := o.UIDMap[resourceLocation]; ok { |  | ||||||
| 		if gottenObj.GetUID() != uid { |  | ||||||
| 			return gottenObj, true, nil |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	endTime := time.Now().Add(o.Timeout) |  | ||||||
| 	timeout := time.Until(endTime) |  | ||||||
| 	errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) |  | ||||||
| 	if o.Timeout == 0 { |  | ||||||
| 		// If timeout is zero check if the object exists once only
 |  | ||||||
| 		if gottenObj == nil { |  | ||||||
| 			return nil, true, nil |  | ||||||
| 		} |  | ||||||
| 		return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName()) |  | ||||||
| 	} |  | ||||||
| 	if timeout < 0 { |  | ||||||
| 		// we're out of time
 |  | ||||||
| 		return info.Object, false, errWaitTimeoutWithName |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String() |  | ||||||
| 	lw := &cache.ListWatch{ |  | ||||||
| 		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { |  | ||||||
| 			options.FieldSelector = fieldSelector |  | ||||||
| 			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options) |  | ||||||
| 		}, |  | ||||||
| 		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { |  | ||||||
| 			options.FieldSelector = fieldSelector |  | ||||||
| 			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options) |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
 |  | ||||||
| 	preconditionFunc := func(store cache.Store) (bool, error) { |  | ||||||
| 		_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name}) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return true, err |  | ||||||
| 		} |  | ||||||
| 		if !exists { |  | ||||||
| 			// since we're looking for it to disappear we just return here if it no longer exists
 |  | ||||||
| 			return true, nil |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		return false, nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	intrCtx, cancel := context.WithCancel(ctx) |  | ||||||
| 	defer cancel() |  | ||||||
| 	intr := interrupt.New(nil, cancel) |  | ||||||
| 	err := intr.Run(func() error { |  | ||||||
| 		_, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted) |  | ||||||
| 		if errors.Is(err, context.DeadlineExceeded) { |  | ||||||
| 			return errWaitTimeoutWithName |  | ||||||
| 		} |  | ||||||
| 		return err |  | ||||||
| 	}) |  | ||||||
| 	if err != nil { |  | ||||||
| 		if err == wait.ErrWaitTimeout { |  | ||||||
| 			return gottenObj, false, errWaitTimeoutWithName |  | ||||||
| 		} |  | ||||||
| 		return gottenObj, false, err |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return gottenObj, true, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // Wait has helper methods for handling watches, including error handling.
 |  | ||||||
| type Wait struct { |  | ||||||
| 	errOut io.Writer |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // IsDeleted returns true if the object is deleted. It prints any errors it encounters.
 |  | ||||||
| func (w Wait) IsDeleted(event watch.Event) (bool, error) { |  | ||||||
| 	switch event.Type { |  | ||||||
| 	case watch.Error: |  | ||||||
| 		// keep waiting in the event we see an error - we expect the watch to be closed by
 |  | ||||||
| 		// the server if the error is unrecoverable.
 |  | ||||||
| 		err := apierrors.FromObject(event.Object) |  | ||||||
| 		fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err) |  | ||||||
| 		return false, nil |  | ||||||
| 	case watch.Deleted: |  | ||||||
| 		return true, nil |  | ||||||
| 	default: |  | ||||||
| 		return false, nil |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| type isCondMetFunc func(event watch.Event) (bool, error) |  | ||||||
| type checkCondFunc func(obj *unstructured.Unstructured) (bool, error) |  | ||||||
| 
 |  | ||||||
| // getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
 |  | ||||||
| // If the condition is not met, it will make a Watch query to the server and pass in the condMet function
 |  | ||||||
| func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) { |  | ||||||
| 	if len(info.Name) == 0 { |  | ||||||
| 		return info.Object, false, fmt.Errorf("resource name must be provided") |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	endTime := time.Now().Add(o.Timeout) |  | ||||||
| 	timeout := time.Until(endTime) |  | ||||||
| 	errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) |  | ||||||
| 	if o.Timeout == 0 { |  | ||||||
| 		// If timeout is zero we will fetch the object(s) once only and check
 |  | ||||||
| 		gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{}) |  | ||||||
| 		if initObjGetErr != nil { |  | ||||||
| 			return nil, false, initObjGetErr |  | ||||||
| 		} |  | ||||||
| 		if gottenObj == nil { |  | ||||||
| 			return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName()) |  | ||||||
| 		} |  | ||||||
| 		conditionCheck, err := check(gottenObj) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return gottenObj, false, err |  | ||||||
| 		} |  | ||||||
| 		if conditionCheck == false { |  | ||||||
| 			return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName()) |  | ||||||
| 		} |  | ||||||
| 		return gottenObj, true, nil |  | ||||||
| 	} |  | ||||||
| 	if timeout < 0 { |  | ||||||
| 		// we're out of time
 |  | ||||||
| 		return info.Object, false, errWaitTimeoutWithName |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears
 |  | ||||||
| 	fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String() |  | ||||||
| 	lw := &cache.ListWatch{ |  | ||||||
| 		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { |  | ||||||
| 			options.FieldSelector = fieldSelector |  | ||||||
| 			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options) |  | ||||||
| 		}, |  | ||||||
| 		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { |  | ||||||
| 			options.FieldSelector = fieldSelector |  | ||||||
| 			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options) |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
 |  | ||||||
| 	preconditionFunc := func(store cache.Store) (bool, error) { |  | ||||||
| 		_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name}) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return true, err |  | ||||||
| 		} |  | ||||||
| 		if !exists { |  | ||||||
| 			return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name) |  | ||||||
| 		} |  | ||||||
| 
 |  | ||||||
| 		return false, nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	intrCtx, cancel := context.WithCancel(ctx) |  | ||||||
| 	defer cancel() |  | ||||||
| 	var result runtime.Object |  | ||||||
| 	intr := interrupt.New(nil, cancel) |  | ||||||
| 	err := intr.Run(func() error { |  | ||||||
| 		ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet)) |  | ||||||
| 		if ev != nil { |  | ||||||
| 			result = ev.Object |  | ||||||
| 		} |  | ||||||
| 		if errors.Is(err, context.DeadlineExceeded) { |  | ||||||
| 			return errWaitTimeoutWithName |  | ||||||
| 		} |  | ||||||
| 		return err |  | ||||||
| 	}) |  | ||||||
| 	if err != nil { |  | ||||||
| 		if err == wait.ErrWaitTimeout { |  | ||||||
| 			return result, false, errWaitTimeoutWithName |  | ||||||
| 		} |  | ||||||
| 		return result, false, err |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return result, true, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // ConditionalWait hold information to check an API status condition
 |  | ||||||
| type ConditionalWait struct { |  | ||||||
| 	conditionName   string |  | ||||||
| 	conditionStatus string |  | ||||||
| 	// errOut is written to if an error occurs
 |  | ||||||
| 	errOut io.Writer |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // IsConditionMet is a conditionfunc for waiting on an API condition to be met
 |  | ||||||
| func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { |  | ||||||
| 	return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) { |  | ||||||
| 	conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions") |  | ||||||
| 	if err != nil { |  | ||||||
| 		return false, err |  | ||||||
| 	} |  | ||||||
| 	if !found { |  | ||||||
| 		return false, nil |  | ||||||
| 	} |  | ||||||
| 	for _, conditionUncast := range conditions { |  | ||||||
| 		condition := conditionUncast.(map[string]interface{}) |  | ||||||
| 		name, found, err := unstructured.NestedString(condition, "type") |  | ||||||
| 		if !found || err != nil || !strings.EqualFold(name, w.conditionName) { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		status, found, err := unstructured.NestedString(condition, "status") |  | ||||||
| 		if !found || err != nil { |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
| 		generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation") |  | ||||||
| 		if found { |  | ||||||
| 			observedGeneration, found := getObservedGeneration(obj, condition) |  | ||||||
| 			if found && observedGeneration < generation { |  | ||||||
| 				return false, nil |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 		return strings.EqualFold(status, w.conditionStatus), nil |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return false, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) { |  | ||||||
| 	if event.Type == watch.Error { |  | ||||||
| 		// keep waiting in the event we see an error - we expect the watch to be closed by
 |  | ||||||
| 		// the server
 |  | ||||||
| 		err := apierrors.FromObject(event.Object) |  | ||||||
| 		fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err) |  | ||||||
| 		return false, nil |  | ||||||
| 	} |  | ||||||
| 	if event.Type == watch.Deleted { |  | ||||||
| 		// this will chain back out, result in another get and an return false back up the chain
 |  | ||||||
| 		return false, nil |  | ||||||
| 	} |  | ||||||
| 	obj := event.Object.(*unstructured.Unstructured) |  | ||||||
| 	return w.checkCondition(obj) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func extendErrWaitTimeout(err error, info *resource.Info) error { |  | ||||||
| 	return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) { |  | ||||||
| 	conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration") |  | ||||||
| 	if found { |  | ||||||
| 		return conditionObservedGeneration, true |  | ||||||
| 	} |  | ||||||
| 	statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration") |  | ||||||
| 	return statusObservedGeneration, found |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // JSONPathWait holds a JSONPath Parser which has the ability
 |  | ||||||
| // to check for the JSONPath condition and compare with the API server provided JSON output.
 |  | ||||||
| type JSONPathWait struct { |  | ||||||
| 	matchAnyValue  bool |  | ||||||
| 	jsonPathValue  string |  | ||||||
| 	jsonPathParser *jsonpath.JSONPath |  | ||||||
| 	// errOut is written to if an error occurs
 |  | ||||||
| 	errOut io.Writer |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check
 |  | ||||||
| func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { |  | ||||||
| 	return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // isJSONPathConditionMet is a helper function of IsJSONPathConditionMet
 |  | ||||||
| // which check the watch event and check if a JSONPathWait condition is met
 |  | ||||||
| func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) { |  | ||||||
| 	if event.Type == watch.Error { |  | ||||||
| 		// keep waiting in the event we see an error - we expect the watch to be closed by
 |  | ||||||
| 		// the server
 |  | ||||||
| 		err := apierrors.FromObject(event.Object) |  | ||||||
| 		fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err) |  | ||||||
| 		return false, nil |  | ||||||
| 	} |  | ||||||
| 	if event.Type == watch.Deleted { |  | ||||||
| 		// this will chain back out, result in another get and an return false back up the chain
 |  | ||||||
| 		return false, nil |  | ||||||
| 	} |  | ||||||
| 	// event runtime Object can be safely asserted to Unstructed
 |  | ||||||
| 	// because we are working with dynamic client
 |  | ||||||
| 	obj := event.Object.(*unstructured.Unstructured) |  | ||||||
| 	return j.checkCondition(obj) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // checkCondition uses JSONPath parser to parse the JSON received from the API server
 |  | ||||||
| // and check if it matches the desired condition
 |  | ||||||
| func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) { |  | ||||||
| 	queryObj := obj.UnstructuredContent() |  | ||||||
| 	parseResults, err := j.jsonPathParser.FindResults(queryObj) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return false, err |  | ||||||
| 	} |  | ||||||
| 	if len(parseResults) == 0 || len(parseResults[0]) == 0 { |  | ||||||
| 		return false, nil |  | ||||||
| 	} |  | ||||||
| 	if err := verifyParsedJSONPath(parseResults); err != nil { |  | ||||||
| 		return false, err |  | ||||||
| 	} |  | ||||||
| 	if j.matchAnyValue { |  | ||||||
| 		return true, nil |  | ||||||
| 	} |  | ||||||
| 	isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathValue) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return false, err |  | ||||||
| 	} |  | ||||||
| 	return isConditionMet, nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // verifyParsedJSONPath verifies the JSON received from the API server is valid.
 |  | ||||||
| // It will only accept a single JSON
 |  | ||||||
| func verifyParsedJSONPath(results [][]reflect.Value) error { |  | ||||||
| 	if len(results) > 1 { |  | ||||||
| 		return errors.New("given jsonpath expression matches more than one list") |  | ||||||
| 	} |  | ||||||
| 	if len(results[0]) > 1 { |  | ||||||
| 		return errors.New("given jsonpath expression matches more than one value") |  | ||||||
| 	} |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // compareResults will compare the reflect.Value from the result parsed by the
 |  | ||||||
| // JSONPath parser with the expected value given by the value
 |  | ||||||
| //
 |  | ||||||
| // Since this is coming from an unstructured this can only ever be a primitive,
 |  | ||||||
| // map[string]interface{}, or []interface{}.
 |  | ||||||
| // We do not support the last two and rely on fmt to handle conversion to string
 |  | ||||||
| // and compare the result with user input
 |  | ||||||
| func compareResults(r reflect.Value, expectedVal string) (bool, error) { |  | ||||||
| 	switch r.Interface().(type) { |  | ||||||
| 	case map[string]interface{}, []interface{}: |  | ||||||
| 		return false, errors.New("jsonpath leads to a nested object or list which is not supported") |  | ||||||
| 	} |  | ||||||
| 	s := fmt.Sprintf("%v", r.Interface()) |  | ||||||
| 	return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  | @ -24,6 +24,8 @@ import ( | ||||||
| 
 | 
 | ||||||
| 	"github.com/stretchr/testify/require" | 	"github.com/stretchr/testify/require" | ||||||
| 
 | 
 | ||||||
|  | 	corev1 "k8s.io/api/core/v1" | ||||||
|  | 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||
| 	"k8s.io/apimachinery/pkg/api/meta" | 	"k8s.io/apimachinery/pkg/api/meta" | ||||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
| 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | 	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||||||
|  | @ -983,6 +985,77 @@ func TestWaitForCondition(t *testing.T) { | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func TestWaitForCreate(t *testing.T) { | ||||||
|  | 	scheme := runtime.NewScheme() | ||||||
|  | 	listMapping := map[schema.GroupVersionResource]string{ | ||||||
|  | 		{Group: "group", Version: "version", Resource: "theresource"}: "TheKindList", | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	tests := []struct { | ||||||
|  | 		name       string | ||||||
|  | 		infos      []*resource.Info | ||||||
|  | 		infosErr   error | ||||||
|  | 		fakeClient func() *dynamicfakeclient.FakeDynamicClient | ||||||
|  | 		timeout    time.Duration | ||||||
|  | 
 | ||||||
|  | 		expectedErr string | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			name:     "missing resource, should hit timeout", | ||||||
|  | 			infosErr: apierrors.NewNotFound(schema.GroupResource{Group: "group", Resource: "theresource"}, "name-foo"), | ||||||
|  | 			fakeClient: func() *dynamicfakeclient.FakeDynamicClient { | ||||||
|  | 				return dynamicfakeclient.NewSimpleDynamicClientWithCustomListKinds(scheme, listMapping) | ||||||
|  | 			}, | ||||||
|  | 			timeout:     1 * time.Second, | ||||||
|  | 			expectedErr: "timed out waiting for the condition", | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			name: "wait should succeed", | ||||||
|  | 			infos: []*resource.Info{ | ||||||
|  | 				{ | ||||||
|  | 					Mapping: &meta.RESTMapping{ | ||||||
|  | 						Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "theresource"}, | ||||||
|  | 					}, | ||||||
|  | 					Object:    &corev1.Pod{}, // the resource type is irrelevant here
 | ||||||
|  | 					Name:      "name-foo", | ||||||
|  | 					Namespace: "ns-foo", | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 			fakeClient: func() *dynamicfakeclient.FakeDynamicClient { | ||||||
|  | 				return dynamicfakeclient.NewSimpleDynamicClientWithCustomListKinds(scheme, listMapping) | ||||||
|  | 			}, | ||||||
|  | 			timeout: 1 * time.Second, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	for _, test := range tests { | ||||||
|  | 		t.Run(test.name, func(t *testing.T) { | ||||||
|  | 			fakeClient := test.fakeClient() | ||||||
|  | 			o := &WaitOptions{ | ||||||
|  | 				ResourceFinder: genericclioptions.NewSimpleFakeResourceFinder(test.infos...).WithError(test.infosErr), | ||||||
|  | 				DynamicClient:  fakeClient, | ||||||
|  | 				Timeout:        test.timeout, | ||||||
|  | 
 | ||||||
|  | 				Printer:      printers.NewDiscardingPrinter(), | ||||||
|  | 				ConditionFn:  IsCreated, | ||||||
|  | 				ForCondition: "create", | ||||||
|  | 				IOStreams:    genericiooptions.NewTestIOStreamsDiscard(), | ||||||
|  | 			} | ||||||
|  | 			err := o.RunWait() | ||||||
|  | 			switch { | ||||||
|  | 			case err == nil && len(test.expectedErr) == 0: | ||||||
|  | 			case err != nil && len(test.expectedErr) == 0: | ||||||
|  | 				t.Fatal(err) | ||||||
|  | 			case err == nil && len(test.expectedErr) != 0: | ||||||
|  | 				t.Fatalf("missing: %q", test.expectedErr) | ||||||
|  | 			case err != nil && len(test.expectedErr) != 0: | ||||||
|  | 				if !strings.Contains(err.Error(), test.expectedErr) { | ||||||
|  | 					t.Fatalf("expected %q, got %q", test.expectedErr, err.Error()) | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func TestWaitForDeletionIgnoreNotFound(t *testing.T) { | func TestWaitForDeletionIgnoreNotFound(t *testing.T) { | ||||||
| 	scheme := runtime.NewScheme() | 	scheme := runtime.NewScheme() | ||||||
| 	listMapping := map[schema.GroupVersionResource]string{ | 	listMapping := map[schema.GroupVersionResource]string{ | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue