diff --git a/go.mod b/go.mod index 6c4987fa..de5dc92f 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.0.0-20240707022826-2cf4612580d3 k8s.io/apimachinery v0.0.0-20240711022542-6b362fab6df2 - k8s.io/cli-runtime v0.0.0-20240707030735-f3a07984ce75 + k8s.io/cli-runtime v0.0.0-20240711160938-80033e7bae90 k8s.io/client-go v0.0.0-20240710183246-7f36d816ee99 k8s.io/component-base v0.0.0-20240707024307-c741ec442cc3 k8s.io/component-helpers v0.0.0-20240707024433-5a2d7426dbcc diff --git a/go.sum b/go.sum index d07b21b2..fda59bb9 100644 --- a/go.sum +++ b/go.sum @@ -281,8 +281,8 @@ k8s.io/api v0.0.0-20240707022826-2cf4612580d3 h1:2bHi5SAn7dRf6oJGO29yFEtMyIN9oCD k8s.io/api v0.0.0-20240707022826-2cf4612580d3/go.mod h1:TeZW9JsLR2mmT3LA7JsD/4i5g40HHrREIdaD0QCEP1Y= k8s.io/apimachinery v0.0.0-20240711022542-6b362fab6df2 h1:NFXoAJM3jIlKAOfDNCKb29pjw8iNnhtT+Z6YOQhXM50= k8s.io/apimachinery v0.0.0-20240711022542-6b362fab6df2/go.mod h1:Et4EUFrefx1K28ZwNXpkHUqq7fSML2FROj79Ku7Lj1w= -k8s.io/cli-runtime v0.0.0-20240707030735-f3a07984ce75 h1:jWSuMI9QUWe40r8V1nEwFZ+AHg3wzTxH6eqClJ19CCE= -k8s.io/cli-runtime v0.0.0-20240707030735-f3a07984ce75/go.mod h1:2Umm5N7cEQTVpw0RI+Ner/0aB0bvFokCZIevJNf+eFM= +k8s.io/cli-runtime v0.0.0-20240711160938-80033e7bae90 h1:VE4DMc3nU/Y945k78wqxjPY+hGdDSEWHuzhfpy0FhXQ= +k8s.io/cli-runtime v0.0.0-20240711160938-80033e7bae90/go.mod h1:yAmTKyXms0bDV26vzKNx2mlvEJV0rylfuXJDbR8LQ8w= k8s.io/client-go v0.0.0-20240710183246-7f36d816ee99 h1:ay9vtgW1GcBryEkZPnfx6pmWmkmLTWLKn3TizDGeTqI= k8s.io/client-go v0.0.0-20240710183246-7f36d816ee99/go.mod h1:M4YkUERkMjSHhNDDe6dYdACCOtPD4td6mRAME6AX7i8= k8s.io/component-base v0.0.0-20240707024307-c741ec442cc3 h1:ipvnLvus3w+bw+jX1TQGeMyet9Gi/RBFgZ4g6dipaXM= diff --git a/pkg/cmd/wait/condition.go b/pkg/cmd/wait/condition.go new file mode 100644 index 00000000..7dec0de2 --- /dev/null +++ b/pkg/cmd/wait/condition.go @@ -0,0 +1,197 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wait + +import ( + "context" + "errors" + "fmt" + "io" + "strings" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" + "k8s.io/kubectl/pkg/util/interrupt" +) + +// ConditionalWait hold information to check an API status condition +type ConditionalWait struct { + conditionName string + conditionStatus string + // errOut is written to if an error occurs + errOut io.Writer +} + +// IsConditionMet is a conditionfunc for waiting on an API condition to be met +func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { + return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition) +} + +func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) { + conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions") + if err != nil { + return false, err + } + if !found { + return false, nil + } + for _, conditionUncast := range conditions { + condition := conditionUncast.(map[string]interface{}) + name, found, err := unstructured.NestedString(condition, "type") + if !found || err != nil || !strings.EqualFold(name, w.conditionName) { + continue + } + status, found, err := unstructured.NestedString(condition, "status") + if !found || err != nil { + continue + } + generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation") + if found { + observedGeneration, found := getObservedGeneration(obj, condition) + if found && observedGeneration < generation { + return false, nil + } + } + return strings.EqualFold(status, w.conditionStatus), nil + } + + return false, nil +} + +func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) { + if event.Type == watch.Error { + // keep waiting in the event we see an error - we expect the watch to be closed by + // the server + err := apierrors.FromObject(event.Object) + fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err) + return false, nil + } + if event.Type == watch.Deleted { + // this will chain back out, result in another get and an return false back up the chain + return false, nil + } + obj := event.Object.(*unstructured.Unstructured) + return w.checkCondition(obj) +} + +type isCondMetFunc func(event watch.Event) (bool, error) +type checkCondFunc func(obj *unstructured.Unstructured) (bool, error) + +// getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function. +// If the condition is not met, it will make a Watch query to the server and pass in the condMet function +func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) { + if len(info.Name) == 0 { + return info.Object, false, fmt.Errorf("resource name must be provided") + } + + endTime := time.Now().Add(o.Timeout) + timeout := time.Until(endTime) + errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) // nolint:staticcheck // SA1019 + if o.Timeout == 0 { + // If timeout is zero we will fetch the object(s) once only and check + gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{}) + if initObjGetErr != nil { + return nil, false, initObjGetErr + } + if gottenObj == nil { + return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName()) + } + conditionCheck, err := check(gottenObj) + if err != nil { + return gottenObj, false, err + } + if !conditionCheck { + return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName()) + } + return gottenObj, true, nil + } + if timeout < 0 { + // we're out of time + return info.Object, false, errWaitTimeoutWithName + } + + mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears + fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector + return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector + return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options) + }, + } + + // this function is used to refresh the cache to prevent timeout waits on resources that have disappeared + preconditionFunc := func(store cache.Store) (bool, error) { + _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name}) + if err != nil { + return true, err + } + if !exists { + return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name) + } + + return false, nil + } + + intrCtx, cancel := context.WithCancel(ctx) + defer cancel() + var result runtime.Object + intr := interrupt.New(nil, cancel) + err := intr.Run(func() error { + ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet)) + if ev != nil { + result = ev.Object + } + if errors.Is(err, context.DeadlineExceeded) { + return errWaitTimeoutWithName + } + return err + }) + if err != nil { + if errors.Is(err, wait.ErrWaitTimeout) { // nolint:staticcheck // SA1019 + return result, false, errWaitTimeoutWithName + } + return result, false, err + } + + return result, true, nil +} + +func extendErrWaitTimeout(err error, info *resource.Info) error { + return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name) +} + +func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) { + conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration") + if found { + return conditionObservedGeneration, true + } + statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration") + return statusObservedGeneration, found +} diff --git a/pkg/cmd/wait/create.go b/pkg/cmd/wait/create.go new file mode 100644 index 00000000..1d5eca7e --- /dev/null +++ b/pkg/cmd/wait/create.go @@ -0,0 +1,33 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wait + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/cli-runtime/pkg/resource" +) + +// IsCreated is a condition func for waiting for something to be created +func IsCreated(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { + if len(info.Name) == 0 || info.Object == nil { + return nil, false, fmt.Errorf("resource name must be provided") + } + return info.Object, true, nil +} diff --git a/pkg/cmd/wait/delete.go b/pkg/cmd/wait/delete.go new file mode 100644 index 00000000..f20cbb5e --- /dev/null +++ b/pkg/cmd/wait/delete.go @@ -0,0 +1,144 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wait + +import ( + "context" + "errors" + "fmt" + "io" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" + "k8s.io/kubectl/pkg/util/interrupt" +) + +// IsDeleted is a condition func for waiting for something to be deleted +func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { + if len(info.Name) == 0 { + return info.Object, false, fmt.Errorf("resource name must be provided") + } + + gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(ctx, info.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(initObjGetErr) { + return info.Object, true, nil + } + if initObjGetErr != nil { + // TODO this could do something slightly fancier if we wish + return info.Object, false, initObjGetErr + } + resourceLocation := ResourceLocation{ + GroupResource: info.Mapping.Resource.GroupResource(), + Namespace: gottenObj.GetNamespace(), + Name: gottenObj.GetName(), + } + if uid, ok := o.UIDMap[resourceLocation]; ok { + if gottenObj.GetUID() != uid { + return gottenObj, true, nil + } + } + + endTime := time.Now().Add(o.Timeout) + timeout := time.Until(endTime) + errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) // nolint:staticcheck // SA1019 + if o.Timeout == 0 { + // If timeout is zero check if the object exists once only + if gottenObj == nil { + return nil, true, nil + } + return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName()) + } + if timeout < 0 { + // we're out of time + return info.Object, false, errWaitTimeoutWithName + } + + fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String() + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector + return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector + return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(ctx, options) + }, + } + + // this function is used to refresh the cache to prevent timeout waits on resources that have disappeared + preconditionFunc := func(store cache.Store) (bool, error) { + _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name}) + if err != nil { + return true, err + } + if !exists { + // since we're looking for it to disappear we just return here if it no longer exists + return true, nil + } + + return false, nil + } + + intrCtx, cancel := context.WithCancel(ctx) + defer cancel() + intr := interrupt.New(nil, cancel) + err := intr.Run(func() error { + _, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted) + if errors.Is(err, context.DeadlineExceeded) { + return errWaitTimeoutWithName + } + return err + }) + if err != nil { + if errors.Is(err, wait.ErrWaitTimeout) { // nolint:staticcheck // SA1019 + return gottenObj, false, errWaitTimeoutWithName + } + return gottenObj, false, err + } + + return gottenObj, true, nil +} + +// Wait has helper methods for handling watches, including error handling. +type Wait struct { + errOut io.Writer +} + +// IsDeleted returns true if the object is deleted. It prints any errors it encounters. +func (w Wait) IsDeleted(event watch.Event) (bool, error) { + switch event.Type { + case watch.Error: + // keep waiting in the event we see an error - we expect the watch to be closed by + // the server if the error is unrecoverable. + err := apierrors.FromObject(event.Object) + fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err) + return false, nil + case watch.Deleted: + return true, nil + default: + return false, nil + } +} diff --git a/pkg/cmd/wait/json.go b/pkg/cmd/wait/json.go new file mode 100644 index 00000000..ac573b6c --- /dev/null +++ b/pkg/cmd/wait/json.go @@ -0,0 +1,120 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wait + +import ( + "context" + "errors" + "fmt" + "io" + "reflect" + "strings" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/util/jsonpath" +) + +// JSONPathWait holds a JSONPath Parser which has the ability +// to check for the JSONPath condition and compare with the API server provided JSON output. +type JSONPathWait struct { + matchAnyValue bool + jsonPathValue string + jsonPathParser *jsonpath.JSONPath + // errOut is written to if an error occurs + errOut io.Writer +} + +// IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check +func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { + return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition) +} + +// isJSONPathConditionMet is a helper function of IsJSONPathConditionMet +// which check the watch event and check if a JSONPathWait condition is met +func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) { + if event.Type == watch.Error { + // keep waiting in the event we see an error - we expect the watch to be closed by + // the server + err := apierrors.FromObject(event.Object) + fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err) + return false, nil + } + if event.Type == watch.Deleted { + // this will chain back out, result in another get and an return false back up the chain + return false, nil + } + // event runtime Object can be safely asserted to Unstructed + // because we are working with dynamic client + obj := event.Object.(*unstructured.Unstructured) + return j.checkCondition(obj) +} + +// checkCondition uses JSONPath parser to parse the JSON received from the API server +// and check if it matches the desired condition +func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) { + queryObj := obj.UnstructuredContent() + parseResults, err := j.jsonPathParser.FindResults(queryObj) + if err != nil { + return false, err + } + if len(parseResults) == 0 || len(parseResults[0]) == 0 { + return false, nil + } + if err := verifyParsedJSONPath(parseResults); err != nil { + return false, err + } + if j.matchAnyValue { + return true, nil + } + isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathValue) + if err != nil { + return false, err + } + return isConditionMet, nil +} + +// verifyParsedJSONPath verifies the JSON received from the API server is valid. +// It will only accept a single JSON +func verifyParsedJSONPath(results [][]reflect.Value) error { + if len(results) > 1 { + return errors.New("given jsonpath expression matches more than one list") + } + if len(results[0]) > 1 { + return errors.New("given jsonpath expression matches more than one value") + } + return nil +} + +// compareResults will compare the reflect.Value from the result parsed by the +// JSONPath parser with the expected value given by the value +// +// Since this is coming from an unstructured this can only ever be a primitive, +// map[string]interface{}, or []interface{}. +// We do not support the last two and rely on fmt to handle conversion to string +// and compare the result with user input +func compareResults(r reflect.Value, expectedVal string) (bool, error) { + switch r.Interface().(type) { + case map[string]interface{}, []interface{}: + return false, errors.New("jsonpath leads to a nested object or list which is not supported") + } + s := fmt.Sprintf("%v", r.Interface()) + return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil +} diff --git a/pkg/cmd/wait/wait.go b/pkg/cmd/wait/wait.go index e211ccec..9097ea12 100644 --- a/pkg/cmd/wait/wait.go +++ b/pkg/cmd/wait/wait.go @@ -21,33 +21,26 @@ import ( "errors" "fmt" "io" - "reflect" "strings" "time" "github.com/spf13/cobra" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/cli-runtime/pkg/printers" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/dynamic" - "k8s.io/client-go/tools/cache" watchtools "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/jsonpath" cmdget "k8s.io/kubectl/pkg/cmd/get" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/util/i18n" - "k8s.io/kubectl/pkg/util/interrupt" "k8s.io/kubectl/pkg/util/templates" ) @@ -194,11 +187,16 @@ func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) { } func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) { - if strings.ToLower(condition) == "delete" { + lowercaseCond := strings.ToLower(condition) + switch { + case lowercaseCond == "delete": return IsDeleted, nil - } - if strings.HasPrefix(condition, "condition=") { - conditionName := condition[len("condition="):] + + case lowercaseCond == "create": + return IsCreated, nil + + case strings.HasPrefix(lowercaseCond, "condition="): + conditionName := lowercaseCond[len("condition="):] conditionValue := "true" if equalsIndex := strings.Index(conditionName, "="); equalsIndex != -1 { conditionValue = conditionName[equalsIndex+1:] @@ -210,9 +208,9 @@ func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) conditionStatus: conditionValue, errOut: errOut, }.IsConditionMet, nil - } - if strings.HasPrefix(condition, "jsonpath=") { - jsonPathInput := strings.TrimPrefix(condition, "jsonpath=") + + case strings.HasPrefix(lowercaseCond, "jsonpath="): + jsonPathInput := strings.TrimPrefix(lowercaseCond, "jsonpath=") jsonPathExp, jsonPathValue, err := processJSONPathInput(jsonPathInput) if err != nil { return nil, err @@ -320,6 +318,31 @@ func (o *WaitOptions) RunWait() error { ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout) defer cancel() + if strings.ToLower(o.ForCondition) == "create" { + // TODO(soltysh): this is not ideal solution, because we're polling every .5s, + // and we have to use ResourceFinder, which contains the resource name. + // In the long run, we should expose resource information from ResourceFinder, + // or functions from ResourceBuilder for parsing those. Lastly, this poll + // should be replaced with a ListWatch cache. + if err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, o.Timeout, true, func(context.Context) (done bool, err error) { + visitErr := o.ResourceFinder.Do().Visit(func(info *resource.Info, err error) error { + return nil + }) + if apierrors.IsNotFound(visitErr) { + return false, nil + } + if visitErr != nil { + return false, visitErr + } + return true, nil + }); err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("%s", wait.ErrWaitTimeout.Error()) // nolint:staticcheck // SA1019 + } + return err + } + } + visitCount := 0 visitFunc := func(info *resource.Info, err error) error { if err != nil { @@ -352,356 +375,3 @@ func (o *WaitOptions) RunWait() error { } return err } - -// IsDeleted is a condition func for waiting for something to be deleted -func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { - if len(info.Name) == 0 { - return info.Object, false, fmt.Errorf("resource name must be provided") - } - - gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{}) - if apierrors.IsNotFound(initObjGetErr) { - return info.Object, true, nil - } - if initObjGetErr != nil { - // TODO this could do something slightly fancier if we wish - return info.Object, false, initObjGetErr - } - resourceLocation := ResourceLocation{ - GroupResource: info.Mapping.Resource.GroupResource(), - Namespace: gottenObj.GetNamespace(), - Name: gottenObj.GetName(), - } - if uid, ok := o.UIDMap[resourceLocation]; ok { - if gottenObj.GetUID() != uid { - return gottenObj, true, nil - } - } - - endTime := time.Now().Add(o.Timeout) - timeout := time.Until(endTime) - errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) - if o.Timeout == 0 { - // If timeout is zero check if the object exists once only - if gottenObj == nil { - return nil, true, nil - } - return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName()) - } - if timeout < 0 { - // we're out of time - return info.Object, false, errWaitTimeoutWithName - } - - fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String() - lw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = fieldSelector - return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = fieldSelector - return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options) - }, - } - - // this function is used to refresh the cache to prevent timeout waits on resources that have disappeared - preconditionFunc := func(store cache.Store) (bool, error) { - _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name}) - if err != nil { - return true, err - } - if !exists { - // since we're looking for it to disappear we just return here if it no longer exists - return true, nil - } - - return false, nil - } - - intrCtx, cancel := context.WithCancel(ctx) - defer cancel() - intr := interrupt.New(nil, cancel) - err := intr.Run(func() error { - _, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted) - if errors.Is(err, context.DeadlineExceeded) { - return errWaitTimeoutWithName - } - return err - }) - if err != nil { - if err == wait.ErrWaitTimeout { - return gottenObj, false, errWaitTimeoutWithName - } - return gottenObj, false, err - } - - return gottenObj, true, nil -} - -// Wait has helper methods for handling watches, including error handling. -type Wait struct { - errOut io.Writer -} - -// IsDeleted returns true if the object is deleted. It prints any errors it encounters. -func (w Wait) IsDeleted(event watch.Event) (bool, error) { - switch event.Type { - case watch.Error: - // keep waiting in the event we see an error - we expect the watch to be closed by - // the server if the error is unrecoverable. - err := apierrors.FromObject(event.Object) - fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err) - return false, nil - case watch.Deleted: - return true, nil - default: - return false, nil - } -} - -type isCondMetFunc func(event watch.Event) (bool, error) -type checkCondFunc func(obj *unstructured.Unstructured) (bool, error) - -// getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function. -// If the condition is not met, it will make a Watch query to the server and pass in the condMet function -func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) { - if len(info.Name) == 0 { - return info.Object, false, fmt.Errorf("resource name must be provided") - } - - endTime := time.Now().Add(o.Timeout) - timeout := time.Until(endTime) - errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) - if o.Timeout == 0 { - // If timeout is zero we will fetch the object(s) once only and check - gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{}) - if initObjGetErr != nil { - return nil, false, initObjGetErr - } - if gottenObj == nil { - return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName()) - } - conditionCheck, err := check(gottenObj) - if err != nil { - return gottenObj, false, err - } - if conditionCheck == false { - return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName()) - } - return gottenObj, true, nil - } - if timeout < 0 { - // we're out of time - return info.Object, false, errWaitTimeoutWithName - } - - mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears - fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String() - lw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - options.FieldSelector = fieldSelector - return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - options.FieldSelector = fieldSelector - return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options) - }, - } - - // this function is used to refresh the cache to prevent timeout waits on resources that have disappeared - preconditionFunc := func(store cache.Store) (bool, error) { - _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name}) - if err != nil { - return true, err - } - if !exists { - return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name) - } - - return false, nil - } - - intrCtx, cancel := context.WithCancel(ctx) - defer cancel() - var result runtime.Object - intr := interrupt.New(nil, cancel) - err := intr.Run(func() error { - ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet)) - if ev != nil { - result = ev.Object - } - if errors.Is(err, context.DeadlineExceeded) { - return errWaitTimeoutWithName - } - return err - }) - if err != nil { - if err == wait.ErrWaitTimeout { - return result, false, errWaitTimeoutWithName - } - return result, false, err - } - - return result, true, nil -} - -// ConditionalWait hold information to check an API status condition -type ConditionalWait struct { - conditionName string - conditionStatus string - // errOut is written to if an error occurs - errOut io.Writer -} - -// IsConditionMet is a conditionfunc for waiting on an API condition to be met -func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { - return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition) -} - -func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) { - conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions") - if err != nil { - return false, err - } - if !found { - return false, nil - } - for _, conditionUncast := range conditions { - condition := conditionUncast.(map[string]interface{}) - name, found, err := unstructured.NestedString(condition, "type") - if !found || err != nil || !strings.EqualFold(name, w.conditionName) { - continue - } - status, found, err := unstructured.NestedString(condition, "status") - if !found || err != nil { - continue - } - generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation") - if found { - observedGeneration, found := getObservedGeneration(obj, condition) - if found && observedGeneration < generation { - return false, nil - } - } - return strings.EqualFold(status, w.conditionStatus), nil - } - - return false, nil -} - -func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) { - if event.Type == watch.Error { - // keep waiting in the event we see an error - we expect the watch to be closed by - // the server - err := apierrors.FromObject(event.Object) - fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err) - return false, nil - } - if event.Type == watch.Deleted { - // this will chain back out, result in another get and an return false back up the chain - return false, nil - } - obj := event.Object.(*unstructured.Unstructured) - return w.checkCondition(obj) -} - -func extendErrWaitTimeout(err error, info *resource.Info) error { - return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name) -} - -func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) { - conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration") - if found { - return conditionObservedGeneration, true - } - statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration") - return statusObservedGeneration, found -} - -// JSONPathWait holds a JSONPath Parser which has the ability -// to check for the JSONPath condition and compare with the API server provided JSON output. -type JSONPathWait struct { - matchAnyValue bool - jsonPathValue string - jsonPathParser *jsonpath.JSONPath - // errOut is written to if an error occurs - errOut io.Writer -} - -// IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check -func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { - return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition) -} - -// isJSONPathConditionMet is a helper function of IsJSONPathConditionMet -// which check the watch event and check if a JSONPathWait condition is met -func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) { - if event.Type == watch.Error { - // keep waiting in the event we see an error - we expect the watch to be closed by - // the server - err := apierrors.FromObject(event.Object) - fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err) - return false, nil - } - if event.Type == watch.Deleted { - // this will chain back out, result in another get and an return false back up the chain - return false, nil - } - // event runtime Object can be safely asserted to Unstructed - // because we are working with dynamic client - obj := event.Object.(*unstructured.Unstructured) - return j.checkCondition(obj) -} - -// checkCondition uses JSONPath parser to parse the JSON received from the API server -// and check if it matches the desired condition -func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) { - queryObj := obj.UnstructuredContent() - parseResults, err := j.jsonPathParser.FindResults(queryObj) - if err != nil { - return false, err - } - if len(parseResults) == 0 || len(parseResults[0]) == 0 { - return false, nil - } - if err := verifyParsedJSONPath(parseResults); err != nil { - return false, err - } - if j.matchAnyValue { - return true, nil - } - isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathValue) - if err != nil { - return false, err - } - return isConditionMet, nil -} - -// verifyParsedJSONPath verifies the JSON received from the API server is valid. -// It will only accept a single JSON -func verifyParsedJSONPath(results [][]reflect.Value) error { - if len(results) > 1 { - return errors.New("given jsonpath expression matches more than one list") - } - if len(results[0]) > 1 { - return errors.New("given jsonpath expression matches more than one value") - } - return nil -} - -// compareResults will compare the reflect.Value from the result parsed by the -// JSONPath parser with the expected value given by the value -// -// Since this is coming from an unstructured this can only ever be a primitive, -// map[string]interface{}, or []interface{}. -// We do not support the last two and rely on fmt to handle conversion to string -// and compare the result with user input -func compareResults(r reflect.Value, expectedVal string) (bool, error) { - switch r.Interface().(type) { - case map[string]interface{}, []interface{}: - return false, errors.New("jsonpath leads to a nested object or list which is not supported") - } - s := fmt.Sprintf("%v", r.Interface()) - return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil -} diff --git a/pkg/cmd/wait/wait_test.go b/pkg/cmd/wait/wait_test.go index bb03160f..e4824217 100644 --- a/pkg/cmd/wait/wait_test.go +++ b/pkg/cmd/wait/wait_test.go @@ -24,6 +24,8 @@ import ( "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -76,7 +78,7 @@ spec: memory: 128Mi requests: cpu: 250m - memory: 64Mi + memory: 64Mi terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: @@ -983,6 +985,77 @@ func TestWaitForCondition(t *testing.T) { } } +func TestWaitForCreate(t *testing.T) { + scheme := runtime.NewScheme() + listMapping := map[schema.GroupVersionResource]string{ + {Group: "group", Version: "version", Resource: "theresource"}: "TheKindList", + } + + tests := []struct { + name string + infos []*resource.Info + infosErr error + fakeClient func() *dynamicfakeclient.FakeDynamicClient + timeout time.Duration + + expectedErr string + }{ + { + name: "missing resource, should hit timeout", + infosErr: apierrors.NewNotFound(schema.GroupResource{Group: "group", Resource: "theresource"}, "name-foo"), + fakeClient: func() *dynamicfakeclient.FakeDynamicClient { + return dynamicfakeclient.NewSimpleDynamicClientWithCustomListKinds(scheme, listMapping) + }, + timeout: 1 * time.Second, + expectedErr: "timed out waiting for the condition", + }, + { + name: "wait should succeed", + infos: []*resource.Info{ + { + Mapping: &meta.RESTMapping{ + Resource: schema.GroupVersionResource{Group: "group", Version: "version", Resource: "theresource"}, + }, + Object: &corev1.Pod{}, // the resource type is irrelevant here + Name: "name-foo", + Namespace: "ns-foo", + }, + }, + fakeClient: func() *dynamicfakeclient.FakeDynamicClient { + return dynamicfakeclient.NewSimpleDynamicClientWithCustomListKinds(scheme, listMapping) + }, + timeout: 1 * time.Second, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeClient := test.fakeClient() + o := &WaitOptions{ + ResourceFinder: genericclioptions.NewSimpleFakeResourceFinder(test.infos...).WithError(test.infosErr), + DynamicClient: fakeClient, + Timeout: test.timeout, + + Printer: printers.NewDiscardingPrinter(), + ConditionFn: IsCreated, + ForCondition: "create", + IOStreams: genericiooptions.NewTestIOStreamsDiscard(), + } + err := o.RunWait() + switch { + case err == nil && len(test.expectedErr) == 0: + case err != nil && len(test.expectedErr) == 0: + t.Fatal(err) + case err == nil && len(test.expectedErr) != 0: + t.Fatalf("missing: %q", test.expectedErr) + case err != nil && len(test.expectedErr) != 0: + if !strings.Contains(err.Error(), test.expectedErr) { + t.Fatalf("expected %q, got %q", test.expectedErr, err.Error()) + } + } + }) + } +} + func TestWaitForDeletionIgnoreNotFound(t *testing.T) { scheme := runtime.NewScheme() listMapping := map[schema.GroupVersionResource]string{