kubectl wait: split condition functions into separate files
Kubernetes-commit: 6eec9d6b21316833bc66b1586207d5b2326b35fe
This commit is contained in:
parent
eaa3175fb1
commit
1c0cdd03d9
|
@ -0,0 +1,197 @@
|
|||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package wait
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubectl/pkg/util/interrupt"
|
||||
)
|
||||
|
||||
// ConditionalWait hold information to check an API status condition
|
||||
type ConditionalWait struct {
|
||||
conditionName string
|
||||
conditionStatus string
|
||||
// errOut is written to if an error occurs
|
||||
errOut io.Writer
|
||||
}
|
||||
|
||||
// IsConditionMet is a conditionfunc for waiting on an API condition to be met
|
||||
func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
|
||||
return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition)
|
||||
}
|
||||
|
||||
func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
|
||||
conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !found {
|
||||
return false, nil
|
||||
}
|
||||
for _, conditionUncast := range conditions {
|
||||
condition := conditionUncast.(map[string]interface{})
|
||||
name, found, err := unstructured.NestedString(condition, "type")
|
||||
if !found || err != nil || !strings.EqualFold(name, w.conditionName) {
|
||||
continue
|
||||
}
|
||||
status, found, err := unstructured.NestedString(condition, "status")
|
||||
if !found || err != nil {
|
||||
continue
|
||||
}
|
||||
generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation")
|
||||
if found {
|
||||
observedGeneration, found := getObservedGeneration(obj, condition)
|
||||
if found && observedGeneration < generation {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return strings.EqualFold(status, w.conditionStatus), nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
|
||||
if event.Type == watch.Error {
|
||||
// keep waiting in the event we see an error - we expect the watch to be closed by
|
||||
// the server
|
||||
err := apierrors.FromObject(event.Object)
|
||||
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
if event.Type == watch.Deleted {
|
||||
// this will chain back out, result in another get and an return false back up the chain
|
||||
return false, nil
|
||||
}
|
||||
obj := event.Object.(*unstructured.Unstructured)
|
||||
return w.checkCondition(obj)
|
||||
}
|
||||
|
||||
type isCondMetFunc func(event watch.Event) (bool, error)
|
||||
type checkCondFunc func(obj *unstructured.Unstructured) (bool, error)
|
||||
|
||||
// getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
|
||||
// If the condition is not met, it will make a Watch query to the server and pass in the condMet function
|
||||
func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
|
||||
if len(info.Name) == 0 {
|
||||
return info.Object, false, fmt.Errorf("resource name must be provided")
|
||||
}
|
||||
|
||||
endTime := time.Now().Add(o.Timeout)
|
||||
timeout := time.Until(endTime)
|
||||
errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) // nolint:staticcheck // SA1019
|
||||
if o.Timeout == 0 {
|
||||
// If timeout is zero we will fetch the object(s) once only and check
|
||||
gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
|
||||
if initObjGetErr != nil {
|
||||
return nil, false, initObjGetErr
|
||||
}
|
||||
if gottenObj == nil {
|
||||
return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName())
|
||||
}
|
||||
conditionCheck, err := check(gottenObj)
|
||||
if err != nil {
|
||||
return gottenObj, false, err
|
||||
}
|
||||
if !conditionCheck {
|
||||
return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
|
||||
}
|
||||
return gottenObj, true, nil
|
||||
}
|
||||
if timeout < 0 {
|
||||
// we're out of time
|
||||
return info.Object, false, errWaitTimeoutWithName
|
||||
}
|
||||
|
||||
mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears
|
||||
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
|
||||
},
|
||||
}
|
||||
|
||||
// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
|
||||
preconditionFunc := func(store cache.Store) (bool, error) {
|
||||
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
if !exists {
|
||||
return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
intrCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
var result runtime.Object
|
||||
intr := interrupt.New(nil, cancel)
|
||||
err := intr.Run(func() error {
|
||||
ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet))
|
||||
if ev != nil {
|
||||
result = ev.Object
|
||||
}
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return errWaitTimeoutWithName
|
||||
}
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, wait.ErrWaitTimeout) { // nolint:staticcheck // SA1019
|
||||
return result, false, errWaitTimeoutWithName
|
||||
}
|
||||
return result, false, err
|
||||
}
|
||||
|
||||
return result, true, nil
|
||||
}
|
||||
|
||||
func extendErrWaitTimeout(err error, info *resource.Info) error {
|
||||
return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name)
|
||||
}
|
||||
|
||||
func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) {
|
||||
conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration")
|
||||
if found {
|
||||
return conditionObservedGeneration, true
|
||||
}
|
||||
statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration")
|
||||
return statusObservedGeneration, found
|
||||
}
|
|
@ -0,0 +1,144 @@
|
|||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package wait
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubectl/pkg/util/interrupt"
|
||||
)
|
||||
|
||||
// IsDeleted is a condition func for waiting for something to be deleted
|
||||
func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
|
||||
if len(info.Name) == 0 {
|
||||
return info.Object, false, fmt.Errorf("resource name must be provided")
|
||||
}
|
||||
|
||||
gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(ctx, info.Name, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(initObjGetErr) {
|
||||
return info.Object, true, nil
|
||||
}
|
||||
if initObjGetErr != nil {
|
||||
// TODO this could do something slightly fancier if we wish
|
||||
return info.Object, false, initObjGetErr
|
||||
}
|
||||
resourceLocation := ResourceLocation{
|
||||
GroupResource: info.Mapping.Resource.GroupResource(),
|
||||
Namespace: gottenObj.GetNamespace(),
|
||||
Name: gottenObj.GetName(),
|
||||
}
|
||||
if uid, ok := o.UIDMap[resourceLocation]; ok {
|
||||
if gottenObj.GetUID() != uid {
|
||||
return gottenObj, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
endTime := time.Now().Add(o.Timeout)
|
||||
timeout := time.Until(endTime)
|
||||
errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) // nolint:staticcheck // SA1019
|
||||
if o.Timeout == 0 {
|
||||
// If timeout is zero check if the object exists once only
|
||||
if gottenObj == nil {
|
||||
return nil, true, nil
|
||||
}
|
||||
return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
|
||||
}
|
||||
if timeout < 0 {
|
||||
// we're out of time
|
||||
return info.Object, false, errWaitTimeoutWithName
|
||||
}
|
||||
|
||||
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(ctx, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(ctx, options)
|
||||
},
|
||||
}
|
||||
|
||||
// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
|
||||
preconditionFunc := func(store cache.Store) (bool, error) {
|
||||
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
if !exists {
|
||||
// since we're looking for it to disappear we just return here if it no longer exists
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
intrCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
intr := interrupt.New(nil, cancel)
|
||||
err := intr.Run(func() error {
|
||||
_, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted)
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return errWaitTimeoutWithName
|
||||
}
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, wait.ErrWaitTimeout) { // nolint:staticcheck // SA1019
|
||||
return gottenObj, false, errWaitTimeoutWithName
|
||||
}
|
||||
return gottenObj, false, err
|
||||
}
|
||||
|
||||
return gottenObj, true, nil
|
||||
}
|
||||
|
||||
// Wait has helper methods for handling watches, including error handling.
|
||||
type Wait struct {
|
||||
errOut io.Writer
|
||||
}
|
||||
|
||||
// IsDeleted returns true if the object is deleted. It prints any errors it encounters.
|
||||
func (w Wait) IsDeleted(event watch.Event) (bool, error) {
|
||||
switch event.Type {
|
||||
case watch.Error:
|
||||
// keep waiting in the event we see an error - we expect the watch to be closed by
|
||||
// the server if the error is unrecoverable.
|
||||
err := apierrors.FromObject(event.Object)
|
||||
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
|
||||
return false, nil
|
||||
case watch.Deleted:
|
||||
return true, nil
|
||||
default:
|
||||
return false, nil
|
||||
}
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package wait
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/client-go/util/jsonpath"
|
||||
)
|
||||
|
||||
// JSONPathWait holds a JSONPath Parser which has the ability
|
||||
// to check for the JSONPath condition and compare with the API server provided JSON output.
|
||||
type JSONPathWait struct {
|
||||
matchAnyValue bool
|
||||
jsonPathValue string
|
||||
jsonPathParser *jsonpath.JSONPath
|
||||
// errOut is written to if an error occurs
|
||||
errOut io.Writer
|
||||
}
|
||||
|
||||
// IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check
|
||||
func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
|
||||
return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition)
|
||||
}
|
||||
|
||||
// isJSONPathConditionMet is a helper function of IsJSONPathConditionMet
|
||||
// which check the watch event and check if a JSONPathWait condition is met
|
||||
func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) {
|
||||
if event.Type == watch.Error {
|
||||
// keep waiting in the event we see an error - we expect the watch to be closed by
|
||||
// the server
|
||||
err := apierrors.FromObject(event.Object)
|
||||
fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
if event.Type == watch.Deleted {
|
||||
// this will chain back out, result in another get and an return false back up the chain
|
||||
return false, nil
|
||||
}
|
||||
// event runtime Object can be safely asserted to Unstructed
|
||||
// because we are working with dynamic client
|
||||
obj := event.Object.(*unstructured.Unstructured)
|
||||
return j.checkCondition(obj)
|
||||
}
|
||||
|
||||
// checkCondition uses JSONPath parser to parse the JSON received from the API server
|
||||
// and check if it matches the desired condition
|
||||
func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
|
||||
queryObj := obj.UnstructuredContent()
|
||||
parseResults, err := j.jsonPathParser.FindResults(queryObj)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(parseResults) == 0 || len(parseResults[0]) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
if err := verifyParsedJSONPath(parseResults); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if j.matchAnyValue {
|
||||
return true, nil
|
||||
}
|
||||
isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathValue)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return isConditionMet, nil
|
||||
}
|
||||
|
||||
// verifyParsedJSONPath verifies the JSON received from the API server is valid.
|
||||
// It will only accept a single JSON
|
||||
func verifyParsedJSONPath(results [][]reflect.Value) error {
|
||||
if len(results) > 1 {
|
||||
return errors.New("given jsonpath expression matches more than one list")
|
||||
}
|
||||
if len(results[0]) > 1 {
|
||||
return errors.New("given jsonpath expression matches more than one value")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// compareResults will compare the reflect.Value from the result parsed by the
|
||||
// JSONPath parser with the expected value given by the value
|
||||
//
|
||||
// Since this is coming from an unstructured this can only ever be a primitive,
|
||||
// map[string]interface{}, or []interface{}.
|
||||
// We do not support the last two and rely on fmt to handle conversion to string
|
||||
// and compare the result with user input
|
||||
func compareResults(r reflect.Value, expectedVal string) (bool, error) {
|
||||
switch r.Interface().(type) {
|
||||
case map[string]interface{}, []interface{}:
|
||||
return false, errors.New("jsonpath leads to a nested object or list which is not supported")
|
||||
}
|
||||
s := fmt.Sprintf("%v", r.Interface())
|
||||
return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil
|
||||
}
|
|
@ -21,33 +21,25 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||
"k8s.io/cli-runtime/pkg/genericiooptions"
|
||||
"k8s.io/cli-runtime/pkg/printers"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/client-go/util/jsonpath"
|
||||
cmdget "k8s.io/kubectl/pkg/cmd/get"
|
||||
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
||||
"k8s.io/kubectl/pkg/util/i18n"
|
||||
"k8s.io/kubectl/pkg/util/interrupt"
|
||||
"k8s.io/kubectl/pkg/util/templates"
|
||||
)
|
||||
|
||||
|
@ -352,356 +344,3 @@ func (o *WaitOptions) RunWait() error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// IsDeleted is a condition func for waiting for something to be deleted
|
||||
func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
|
||||
if len(info.Name) == 0 {
|
||||
return info.Object, false, fmt.Errorf("resource name must be provided")
|
||||
}
|
||||
|
||||
gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(initObjGetErr) {
|
||||
return info.Object, true, nil
|
||||
}
|
||||
if initObjGetErr != nil {
|
||||
// TODO this could do something slightly fancier if we wish
|
||||
return info.Object, false, initObjGetErr
|
||||
}
|
||||
resourceLocation := ResourceLocation{
|
||||
GroupResource: info.Mapping.Resource.GroupResource(),
|
||||
Namespace: gottenObj.GetNamespace(),
|
||||
Name: gottenObj.GetName(),
|
||||
}
|
||||
if uid, ok := o.UIDMap[resourceLocation]; ok {
|
||||
if gottenObj.GetUID() != uid {
|
||||
return gottenObj, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
endTime := time.Now().Add(o.Timeout)
|
||||
timeout := time.Until(endTime)
|
||||
errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
|
||||
if o.Timeout == 0 {
|
||||
// If timeout is zero check if the object exists once only
|
||||
if gottenObj == nil {
|
||||
return nil, true, nil
|
||||
}
|
||||
return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
|
||||
}
|
||||
if timeout < 0 {
|
||||
// we're out of time
|
||||
return info.Object, false, errWaitTimeoutWithName
|
||||
}
|
||||
|
||||
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
|
||||
},
|
||||
}
|
||||
|
||||
// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
|
||||
preconditionFunc := func(store cache.Store) (bool, error) {
|
||||
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
if !exists {
|
||||
// since we're looking for it to disappear we just return here if it no longer exists
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
intrCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
intr := interrupt.New(nil, cancel)
|
||||
err := intr.Run(func() error {
|
||||
_, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted)
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return errWaitTimeoutWithName
|
||||
}
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
if err == wait.ErrWaitTimeout {
|
||||
return gottenObj, false, errWaitTimeoutWithName
|
||||
}
|
||||
return gottenObj, false, err
|
||||
}
|
||||
|
||||
return gottenObj, true, nil
|
||||
}
|
||||
|
||||
// Wait has helper methods for handling watches, including error handling.
|
||||
type Wait struct {
|
||||
errOut io.Writer
|
||||
}
|
||||
|
||||
// IsDeleted returns true if the object is deleted. It prints any errors it encounters.
|
||||
func (w Wait) IsDeleted(event watch.Event) (bool, error) {
|
||||
switch event.Type {
|
||||
case watch.Error:
|
||||
// keep waiting in the event we see an error - we expect the watch to be closed by
|
||||
// the server if the error is unrecoverable.
|
||||
err := apierrors.FromObject(event.Object)
|
||||
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
|
||||
return false, nil
|
||||
case watch.Deleted:
|
||||
return true, nil
|
||||
default:
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
type isCondMetFunc func(event watch.Event) (bool, error)
|
||||
type checkCondFunc func(obj *unstructured.Unstructured) (bool, error)
|
||||
|
||||
// getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
|
||||
// If the condition is not met, it will make a Watch query to the server and pass in the condMet function
|
||||
func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
|
||||
if len(info.Name) == 0 {
|
||||
return info.Object, false, fmt.Errorf("resource name must be provided")
|
||||
}
|
||||
|
||||
endTime := time.Now().Add(o.Timeout)
|
||||
timeout := time.Until(endTime)
|
||||
errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
|
||||
if o.Timeout == 0 {
|
||||
// If timeout is zero we will fetch the object(s) once only and check
|
||||
gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
|
||||
if initObjGetErr != nil {
|
||||
return nil, false, initObjGetErr
|
||||
}
|
||||
if gottenObj == nil {
|
||||
return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName())
|
||||
}
|
||||
conditionCheck, err := check(gottenObj)
|
||||
if err != nil {
|
||||
return gottenObj, false, err
|
||||
}
|
||||
if conditionCheck == false {
|
||||
return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
|
||||
}
|
||||
return gottenObj, true, nil
|
||||
}
|
||||
if timeout < 0 {
|
||||
// we're out of time
|
||||
return info.Object, false, errWaitTimeoutWithName
|
||||
}
|
||||
|
||||
mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears
|
||||
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
|
||||
},
|
||||
}
|
||||
|
||||
// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
|
||||
preconditionFunc := func(store cache.Store) (bool, error) {
|
||||
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
if !exists {
|
||||
return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
intrCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
var result runtime.Object
|
||||
intr := interrupt.New(nil, cancel)
|
||||
err := intr.Run(func() error {
|
||||
ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet))
|
||||
if ev != nil {
|
||||
result = ev.Object
|
||||
}
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return errWaitTimeoutWithName
|
||||
}
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
if err == wait.ErrWaitTimeout {
|
||||
return result, false, errWaitTimeoutWithName
|
||||
}
|
||||
return result, false, err
|
||||
}
|
||||
|
||||
return result, true, nil
|
||||
}
|
||||
|
||||
// ConditionalWait hold information to check an API status condition
|
||||
type ConditionalWait struct {
|
||||
conditionName string
|
||||
conditionStatus string
|
||||
// errOut is written to if an error occurs
|
||||
errOut io.Writer
|
||||
}
|
||||
|
||||
// IsConditionMet is a conditionfunc for waiting on an API condition to be met
|
||||
func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
|
||||
return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition)
|
||||
}
|
||||
|
||||
func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
|
||||
conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !found {
|
||||
return false, nil
|
||||
}
|
||||
for _, conditionUncast := range conditions {
|
||||
condition := conditionUncast.(map[string]interface{})
|
||||
name, found, err := unstructured.NestedString(condition, "type")
|
||||
if !found || err != nil || !strings.EqualFold(name, w.conditionName) {
|
||||
continue
|
||||
}
|
||||
status, found, err := unstructured.NestedString(condition, "status")
|
||||
if !found || err != nil {
|
||||
continue
|
||||
}
|
||||
generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation")
|
||||
if found {
|
||||
observedGeneration, found := getObservedGeneration(obj, condition)
|
||||
if found && observedGeneration < generation {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return strings.EqualFold(status, w.conditionStatus), nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
|
||||
if event.Type == watch.Error {
|
||||
// keep waiting in the event we see an error - we expect the watch to be closed by
|
||||
// the server
|
||||
err := apierrors.FromObject(event.Object)
|
||||
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
if event.Type == watch.Deleted {
|
||||
// this will chain back out, result in another get and an return false back up the chain
|
||||
return false, nil
|
||||
}
|
||||
obj := event.Object.(*unstructured.Unstructured)
|
||||
return w.checkCondition(obj)
|
||||
}
|
||||
|
||||
func extendErrWaitTimeout(err error, info *resource.Info) error {
|
||||
return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name)
|
||||
}
|
||||
|
||||
func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) {
|
||||
conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration")
|
||||
if found {
|
||||
return conditionObservedGeneration, true
|
||||
}
|
||||
statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration")
|
||||
return statusObservedGeneration, found
|
||||
}
|
||||
|
||||
// JSONPathWait holds a JSONPath Parser which has the ability
|
||||
// to check for the JSONPath condition and compare with the API server provided JSON output.
|
||||
type JSONPathWait struct {
|
||||
matchAnyValue bool
|
||||
jsonPathValue string
|
||||
jsonPathParser *jsonpath.JSONPath
|
||||
// errOut is written to if an error occurs
|
||||
errOut io.Writer
|
||||
}
|
||||
|
||||
// IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check
|
||||
func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
|
||||
return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition)
|
||||
}
|
||||
|
||||
// isJSONPathConditionMet is a helper function of IsJSONPathConditionMet
|
||||
// which check the watch event and check if a JSONPathWait condition is met
|
||||
func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) {
|
||||
if event.Type == watch.Error {
|
||||
// keep waiting in the event we see an error - we expect the watch to be closed by
|
||||
// the server
|
||||
err := apierrors.FromObject(event.Object)
|
||||
fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
if event.Type == watch.Deleted {
|
||||
// this will chain back out, result in another get and an return false back up the chain
|
||||
return false, nil
|
||||
}
|
||||
// event runtime Object can be safely asserted to Unstructed
|
||||
// because we are working with dynamic client
|
||||
obj := event.Object.(*unstructured.Unstructured)
|
||||
return j.checkCondition(obj)
|
||||
}
|
||||
|
||||
// checkCondition uses JSONPath parser to parse the JSON received from the API server
|
||||
// and check if it matches the desired condition
|
||||
func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
|
||||
queryObj := obj.UnstructuredContent()
|
||||
parseResults, err := j.jsonPathParser.FindResults(queryObj)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(parseResults) == 0 || len(parseResults[0]) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
if err := verifyParsedJSONPath(parseResults); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if j.matchAnyValue {
|
||||
return true, nil
|
||||
}
|
||||
isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathValue)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return isConditionMet, nil
|
||||
}
|
||||
|
||||
// verifyParsedJSONPath verifies the JSON received from the API server is valid.
|
||||
// It will only accept a single JSON
|
||||
func verifyParsedJSONPath(results [][]reflect.Value) error {
|
||||
if len(results) > 1 {
|
||||
return errors.New("given jsonpath expression matches more than one list")
|
||||
}
|
||||
if len(results[0]) > 1 {
|
||||
return errors.New("given jsonpath expression matches more than one value")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// compareResults will compare the reflect.Value from the result parsed by the
|
||||
// JSONPath parser with the expected value given by the value
|
||||
//
|
||||
// Since this is coming from an unstructured this can only ever be a primitive,
|
||||
// map[string]interface{}, or []interface{}.
|
||||
// We do not support the last two and rely on fmt to handle conversion to string
|
||||
// and compare the result with user input
|
||||
func compareResults(r reflect.Value, expectedVal string) (bool, error) {
|
||||
switch r.Interface().(type) {
|
||||
case map[string]interface{}, []interface{}:
|
||||
return false, errors.New("jsonpath leads to a nested object or list which is not supported")
|
||||
}
|
||||
s := fmt.Sprintf("%v", r.Interface())
|
||||
return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue