749 lines
26 KiB
Go
749 lines
26 KiB
Go
/*
|
|
Copyright 2018 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package wait
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"reflect"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/spf13/cobra"
|
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/cli-runtime/pkg/genericclioptions"
|
|
"k8s.io/cli-runtime/pkg/genericiooptions"
|
|
"k8s.io/cli-runtime/pkg/printers"
|
|
"k8s.io/cli-runtime/pkg/resource"
|
|
"k8s.io/client-go/dynamic"
|
|
"k8s.io/client-go/tools/cache"
|
|
watchtools "k8s.io/client-go/tools/watch"
|
|
"k8s.io/client-go/util/jsonpath"
|
|
cmdget "k8s.io/kubectl/pkg/cmd/get"
|
|
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
|
"k8s.io/kubectl/pkg/util/i18n"
|
|
"k8s.io/kubectl/pkg/util/interrupt"
|
|
"k8s.io/kubectl/pkg/util/templates"
|
|
)
|
|
|
|
var (
|
|
waitLong = templates.LongDesc(i18n.T(`
|
|
Experimental: Wait for a specific condition on one or many resources.
|
|
|
|
The command takes multiple resources and waits until the specified condition
|
|
is seen in the Status field of every given resource.
|
|
|
|
Alternatively, the command can wait for the given set of resources to be deleted
|
|
by providing the "delete" keyword as the value to the --for flag.
|
|
|
|
A successful message will be printed to stdout indicating when the specified
|
|
condition has been met. You can use -o option to change to output destination.`))
|
|
|
|
waitExample = templates.Examples(i18n.T(`
|
|
# Wait for the pod "busybox1" to contain the status condition of type "Ready"
|
|
kubectl wait --for=condition=Ready pod/busybox1
|
|
|
|
# The default value of status condition is true; you can wait for other targets after an equal delimiter (compared after Unicode simple case folding, which is a more general form of case-insensitivity)
|
|
kubectl wait --for=condition=Ready=false pod/busybox1
|
|
|
|
# Wait for the pod "busybox1" to contain the status phase to be "Running"
|
|
kubectl wait --for=jsonpath='{.status.phase}'=Running pod/busybox1
|
|
|
|
# Wait for pod "busybox1" to be Ready
|
|
kubectl wait --for='jsonpath={.status.conditions[?(@.type=="Ready")].status}=True' pod/busybox1
|
|
|
|
# Wait for the service "loadbalancer" to have ingress.
|
|
kubectl wait --for=jsonpath='{.status.loadBalancer.ingress}' service/loadbalancer
|
|
|
|
# Wait for the pod "busybox1" to be deleted, with a timeout of 60s, after having issued the "delete" command
|
|
kubectl delete pod/busybox1
|
|
kubectl wait --for=delete pod/busybox1 --timeout=60s
|
|
|
|
# Wait for the creation of the service "loadbalancer" in addition to wait to have ingress
|
|
kubectl wait --for=jsonpath='{.status.loadBalancer.ingress}' service/loadbalancer --wait-for-creation`))
|
|
)
|
|
|
|
// errNoMatchingResources is returned when there is no resources matching a query.
|
|
var errNoMatchingResources = errors.New("no matching resources found")
|
|
|
|
// WaitFlags directly reflect the information that CLI is gathering via flags. They will be converted to Options, which
|
|
// reflect the runtime requirements for the command. This structure reduces the transformation to wiring and makes
|
|
// the logic itself easy to unit test
|
|
type WaitFlags struct {
|
|
RESTClientGetter genericclioptions.RESTClientGetter
|
|
PrintFlags *genericclioptions.PrintFlags
|
|
ResourceBuilderFlags *genericclioptions.ResourceBuilderFlags
|
|
|
|
Timeout time.Duration
|
|
ForCondition string
|
|
WaitForCreation bool
|
|
|
|
genericiooptions.IOStreams
|
|
}
|
|
|
|
// NewWaitFlags returns a default WaitFlags
|
|
func NewWaitFlags(restClientGetter genericclioptions.RESTClientGetter, streams genericiooptions.IOStreams) *WaitFlags {
|
|
return &WaitFlags{
|
|
RESTClientGetter: restClientGetter,
|
|
PrintFlags: genericclioptions.NewPrintFlags("condition met"),
|
|
ResourceBuilderFlags: genericclioptions.NewResourceBuilderFlags().
|
|
WithLabelSelector("").
|
|
WithFieldSelector("").
|
|
WithAll(false).
|
|
WithAllNamespaces(false).
|
|
WithLocal(false).
|
|
WithLatest(),
|
|
|
|
Timeout: 30 * time.Second,
|
|
WaitForCreation: true,
|
|
|
|
IOStreams: streams,
|
|
}
|
|
}
|
|
|
|
// NewCmdWait returns a cobra command for waiting
|
|
func NewCmdWait(restClientGetter genericclioptions.RESTClientGetter, streams genericiooptions.IOStreams) *cobra.Command {
|
|
flags := NewWaitFlags(restClientGetter, streams)
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "wait ([-f FILENAME] | resource.group/resource.name | resource.group [(-l label | --all)]) [--for=delete|--for condition=available|--for=jsonpath='{}'[=value]]",
|
|
Short: i18n.T("Experimental: Wait for a specific condition on one or many resources"),
|
|
Long: waitLong,
|
|
Example: waitExample,
|
|
|
|
DisableFlagsInUseLine: true,
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
o, err := flags.ToOptions(args)
|
|
cmdutil.CheckErr(err)
|
|
cmdutil.CheckErr(o.RunWait())
|
|
},
|
|
SuggestFor: []string{"list", "ps"},
|
|
}
|
|
|
|
flags.AddFlags(cmd)
|
|
|
|
return cmd
|
|
}
|
|
|
|
// AddFlags registers flags for a cli
|
|
func (flags *WaitFlags) AddFlags(cmd *cobra.Command) {
|
|
flags.PrintFlags.AddFlags(cmd)
|
|
flags.ResourceBuilderFlags.AddFlags(cmd.Flags())
|
|
|
|
cmd.Flags().DurationVar(&flags.Timeout, "timeout", flags.Timeout, "The length of time to wait before giving up. Zero means check once and don't wait, negative means wait for a week.")
|
|
cmd.Flags().StringVar(&flags.ForCondition, "for", flags.ForCondition, "The condition to wait on: [delete|condition=condition-name[=condition-value]|jsonpath='{JSONPath expression}'=[JSONPath value]]. The default condition-value is true. Condition values are compared after Unicode simple case folding, which is a more general form of case-insensitivity.")
|
|
cmd.Flags().BoolVar(&flags.WaitForCreation, "wait-for-creation", flags.WaitForCreation, "The default value is true. If set to true, also wait for creation of objects if they do not already exist. This flag is ignored in --for=delete")
|
|
}
|
|
|
|
// ToOptions converts from CLI inputs to runtime inputs
|
|
func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) {
|
|
printer, err := flags.PrintFlags.ToPrinter()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
builder := flags.ResourceBuilderFlags.ToBuilder(flags.RESTClientGetter, args)
|
|
clientConfig, err := flags.RESTClientGetter.ToRESTConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dynamicClient, err := dynamic.NewForConfig(clientConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conditionFn, err := conditionFuncFor(flags.ForCondition, flags.ErrOut)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
effectiveTimeout := flags.Timeout
|
|
if effectiveTimeout < 0 {
|
|
effectiveTimeout = 168 * time.Hour
|
|
}
|
|
|
|
o := &WaitOptions{
|
|
ResourceFinder: builder,
|
|
DynamicClient: dynamicClient,
|
|
Timeout: effectiveTimeout,
|
|
ForCondition: flags.ForCondition,
|
|
WaitForCreation: flags.WaitForCreation,
|
|
|
|
Printer: printer,
|
|
ConditionFn: conditionFn,
|
|
IOStreams: flags.IOStreams,
|
|
}
|
|
|
|
return o, nil
|
|
}
|
|
|
|
func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) {
|
|
if strings.ToLower(condition) == "delete" {
|
|
return IsDeleted, nil
|
|
}
|
|
if strings.HasPrefix(condition, "condition=") {
|
|
conditionName := condition[len("condition="):]
|
|
conditionValue := "true"
|
|
if equalsIndex := strings.Index(conditionName, "="); equalsIndex != -1 {
|
|
conditionValue = conditionName[equalsIndex+1:]
|
|
conditionName = conditionName[0:equalsIndex]
|
|
}
|
|
|
|
return ConditionalWait{
|
|
conditionName: conditionName,
|
|
conditionStatus: conditionValue,
|
|
errOut: errOut,
|
|
}.IsConditionMet, nil
|
|
}
|
|
if strings.HasPrefix(condition, "jsonpath=") {
|
|
jsonPathInput := strings.TrimPrefix(condition, "jsonpath=")
|
|
jsonPathExp, jsonPathValue, err := processJSONPathInput(jsonPathInput)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
j, err := newJSONPathParser(jsonPathExp)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return JSONPathWait{
|
|
matchAnyValue: jsonPathValue == "",
|
|
jsonPathValue: jsonPathValue,
|
|
jsonPathParser: j,
|
|
errOut: errOut,
|
|
}.IsJSONPathConditionMet, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("unrecognized condition: %q", condition)
|
|
}
|
|
|
|
// newJSONPathParser will create a new JSONPath parser based on the jsonPathExpression
|
|
func newJSONPathParser(jsonPathExpression string) (*jsonpath.JSONPath, error) {
|
|
j := jsonpath.New("wait").AllowMissingKeys(true)
|
|
if jsonPathExpression == "" {
|
|
return nil, errors.New("jsonpath expression cannot be empty")
|
|
}
|
|
if err := j.Parse(jsonPathExpression); err != nil {
|
|
return nil, err
|
|
}
|
|
return j, nil
|
|
}
|
|
|
|
// processJSONPathInput will parse and process the provided JSONPath input containing a JSON expression and optionally
|
|
// a value for the matching condition.
|
|
func processJSONPathInput(input string) (string, string, error) {
|
|
jsonPathInput := splitJSONPathInput(input)
|
|
if numOfArgs := len(jsonPathInput); numOfArgs < 1 || numOfArgs > 2 {
|
|
return "", "", fmt.Errorf("jsonpath wait format must be --for=jsonpath='{.status.readyReplicas}'=3 or --for=jsonpath='{.status.readyReplicas}'")
|
|
}
|
|
relaxedJSONPathExp, err := cmdget.RelaxedJSONPathExpression(jsonPathInput[0])
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
if len(jsonPathInput) == 1 {
|
|
return relaxedJSONPathExp, "", nil
|
|
}
|
|
jsonPathValue := strings.Trim(jsonPathInput[1], `'"`)
|
|
if jsonPathValue == "" {
|
|
return "", "", errors.New("jsonpath wait has to have a value after equal sign, like --for=jsonpath='{.status.readyReplicas}'=3")
|
|
}
|
|
return relaxedJSONPathExp, jsonPathValue, nil
|
|
}
|
|
|
|
// splitJSONPathInput splits the provided input string on single '='. Double '==' will not cause the string to be
|
|
// split. E.g., "a.b.c====d.e.f===g.h.i===" will split to ["a.b.c====d.e.f==","g.h.i==",""].
|
|
func splitJSONPathInput(input string) []string {
|
|
var output []string
|
|
var element strings.Builder
|
|
for i := 0; i < len(input); i++ {
|
|
if input[i] == '=' {
|
|
if i < len(input)-1 && input[i+1] == '=' {
|
|
element.WriteString("==")
|
|
i++
|
|
continue
|
|
}
|
|
output = append(output, element.String())
|
|
element.Reset()
|
|
continue
|
|
}
|
|
element.WriteByte(input[i])
|
|
}
|
|
return append(output, element.String())
|
|
}
|
|
|
|
// ResourceLocation holds the location of a resource
|
|
type ResourceLocation struct {
|
|
GroupResource schema.GroupResource
|
|
Namespace string
|
|
Name string
|
|
}
|
|
|
|
// UIDMap maps ResourceLocation with UID
|
|
type UIDMap map[ResourceLocation]types.UID
|
|
|
|
// WaitOptions is a set of options that allows you to wait. This is the object reflects the runtime needs of a wait
|
|
// command, making the logic itself easy to unit test with our existing mocks.
|
|
type WaitOptions struct {
|
|
ResourceFinder genericclioptions.ResourceFinder
|
|
// UIDMap maps a resource location to a UID. It is optional, but ConditionFuncs may choose to use it to make the result
|
|
// more reliable. For instance, delete can look for UID consistency during delegated calls.
|
|
UIDMap UIDMap
|
|
DynamicClient dynamic.Interface
|
|
Timeout time.Duration
|
|
ForCondition string
|
|
WaitForCreation bool
|
|
|
|
Printer printers.ResourcePrinter
|
|
ConditionFn ConditionFunc
|
|
genericiooptions.IOStreams
|
|
}
|
|
|
|
// ConditionFunc is the interface for providing condition checks
|
|
type ConditionFunc func(ctx context.Context, info *resource.Info, o *WaitOptions) (finalObject runtime.Object, done bool, err error)
|
|
|
|
// RunWait runs the waiting logic
|
|
func (o *WaitOptions) RunWait() error {
|
|
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
|
|
defer cancel()
|
|
|
|
isForDelete := strings.ToLower(o.ForCondition) == "delete"
|
|
if o.WaitForCreation && o.Timeout == 0 {
|
|
return fmt.Errorf("--wait-for-creation requires a timeout value greater than 0")
|
|
}
|
|
|
|
if o.WaitForCreation && !isForDelete {
|
|
err := func() error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return fmt.Errorf("context deadline is exceeded while waiting for the creation of the resources")
|
|
default:
|
|
err := o.ResourceFinder.Do().Visit(func(info *resource.Info, err error) error {
|
|
// We don't need to do anything after we assure that the resources exist. Because
|
|
// actual logic will be incorporated after we wait all the resources' existence.
|
|
return nil
|
|
})
|
|
// It is verified that all the resources exist.
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
// We specifically wait for the creation of resources and all the errors
|
|
// other than not found means that this is something we cannot handle.
|
|
if !apierrors.IsNotFound(err) {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
visitCount := 0
|
|
visitFunc := func(info *resource.Info, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
visitCount++
|
|
finalObject, success, err := o.ConditionFn(ctx, info, o)
|
|
if success {
|
|
o.Printer.PrintObj(finalObject, o.Out)
|
|
return nil
|
|
}
|
|
if err == nil {
|
|
return fmt.Errorf("%v unsatisified for unknown reason", finalObject)
|
|
}
|
|
return err
|
|
}
|
|
visitor := o.ResourceFinder.Do()
|
|
if visitor, ok := visitor.(*resource.Result); ok && isForDelete {
|
|
visitor.IgnoreErrors(apierrors.IsNotFound)
|
|
}
|
|
|
|
err := visitor.Visit(visitFunc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if visitCount == 0 && !isForDelete {
|
|
return errNoMatchingResources
|
|
}
|
|
return err
|
|
}
|
|
|
|
// IsDeleted is a condition func for waiting for something to be deleted
|
|
func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
|
|
if len(info.Name) == 0 {
|
|
return info.Object, false, fmt.Errorf("resource name must be provided")
|
|
}
|
|
|
|
gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
|
|
if apierrors.IsNotFound(initObjGetErr) {
|
|
return info.Object, true, nil
|
|
}
|
|
if initObjGetErr != nil {
|
|
// TODO this could do something slightly fancier if we wish
|
|
return info.Object, false, initObjGetErr
|
|
}
|
|
resourceLocation := ResourceLocation{
|
|
GroupResource: info.Mapping.Resource.GroupResource(),
|
|
Namespace: gottenObj.GetNamespace(),
|
|
Name: gottenObj.GetName(),
|
|
}
|
|
if uid, ok := o.UIDMap[resourceLocation]; ok {
|
|
if gottenObj.GetUID() != uid {
|
|
return gottenObj, true, nil
|
|
}
|
|
}
|
|
|
|
endTime := time.Now().Add(o.Timeout)
|
|
timeout := time.Until(endTime)
|
|
errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
|
|
if o.Timeout == 0 {
|
|
// If timeout is zero check if the object exists once only
|
|
if gottenObj == nil {
|
|
return nil, true, nil
|
|
}
|
|
return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
|
|
}
|
|
if timeout < 0 {
|
|
// we're out of time
|
|
return info.Object, false, errWaitTimeoutWithName
|
|
}
|
|
|
|
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
|
|
lw := &cache.ListWatch{
|
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
|
options.FieldSelector = fieldSelector
|
|
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
|
|
},
|
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
options.FieldSelector = fieldSelector
|
|
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
|
|
},
|
|
}
|
|
|
|
// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
|
|
preconditionFunc := func(store cache.Store) (bool, error) {
|
|
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
if !exists {
|
|
// since we're looking for it to disappear we just return here if it no longer exists
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
intrCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
intr := interrupt.New(nil, cancel)
|
|
err := intr.Run(func() error {
|
|
_, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted)
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
return errWaitTimeoutWithName
|
|
}
|
|
return err
|
|
})
|
|
if err != nil {
|
|
if err == wait.ErrWaitTimeout {
|
|
return gottenObj, false, errWaitTimeoutWithName
|
|
}
|
|
return gottenObj, false, err
|
|
}
|
|
|
|
return gottenObj, true, nil
|
|
}
|
|
|
|
// Wait has helper methods for handling watches, including error handling.
|
|
type Wait struct {
|
|
errOut io.Writer
|
|
}
|
|
|
|
// IsDeleted returns true if the object is deleted. It prints any errors it encounters.
|
|
func (w Wait) IsDeleted(event watch.Event) (bool, error) {
|
|
switch event.Type {
|
|
case watch.Error:
|
|
// keep waiting in the event we see an error - we expect the watch to be closed by
|
|
// the server if the error is unrecoverable.
|
|
err := apierrors.FromObject(event.Object)
|
|
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
|
|
return false, nil
|
|
case watch.Deleted:
|
|
return true, nil
|
|
default:
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
type isCondMetFunc func(event watch.Event) (bool, error)
|
|
type checkCondFunc func(obj *unstructured.Unstructured) (bool, error)
|
|
|
|
// getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
|
|
// If the condition is not met, it will make a Watch query to the server and pass in the condMet function
|
|
func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
|
|
if len(info.Name) == 0 {
|
|
return info.Object, false, fmt.Errorf("resource name must be provided")
|
|
}
|
|
|
|
endTime := time.Now().Add(o.Timeout)
|
|
timeout := time.Until(endTime)
|
|
errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
|
|
if o.Timeout == 0 {
|
|
// If timeout is zero we will fetch the object(s) once only and check
|
|
gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
|
|
if initObjGetErr != nil {
|
|
return nil, false, initObjGetErr
|
|
}
|
|
if gottenObj == nil {
|
|
return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName())
|
|
}
|
|
conditionCheck, err := check(gottenObj)
|
|
if err != nil {
|
|
return gottenObj, false, err
|
|
}
|
|
if conditionCheck == false {
|
|
return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
|
|
}
|
|
return gottenObj, true, nil
|
|
}
|
|
if timeout < 0 {
|
|
// we're out of time
|
|
return info.Object, false, errWaitTimeoutWithName
|
|
}
|
|
|
|
mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears
|
|
fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
|
|
lw := &cache.ListWatch{
|
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
|
options.FieldSelector = fieldSelector
|
|
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
|
|
},
|
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
|
options.FieldSelector = fieldSelector
|
|
return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
|
|
},
|
|
}
|
|
|
|
// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
|
|
preconditionFunc := func(store cache.Store) (bool, error) {
|
|
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
if !exists {
|
|
return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name)
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
intrCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
var result runtime.Object
|
|
intr := interrupt.New(nil, cancel)
|
|
err := intr.Run(func() error {
|
|
ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet))
|
|
if ev != nil {
|
|
result = ev.Object
|
|
}
|
|
if errors.Is(err, context.DeadlineExceeded) {
|
|
return errWaitTimeoutWithName
|
|
}
|
|
return err
|
|
})
|
|
if err != nil {
|
|
if err == wait.ErrWaitTimeout {
|
|
return result, false, errWaitTimeoutWithName
|
|
}
|
|
return result, false, err
|
|
}
|
|
|
|
return result, true, nil
|
|
}
|
|
|
|
// ConditionalWait hold information to check an API status condition
|
|
type ConditionalWait struct {
|
|
conditionName string
|
|
conditionStatus string
|
|
// errOut is written to if an error occurs
|
|
errOut io.Writer
|
|
}
|
|
|
|
// IsConditionMet is a conditionfunc for waiting on an API condition to be met
|
|
func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
|
|
return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition)
|
|
}
|
|
|
|
func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
|
|
conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !found {
|
|
return false, nil
|
|
}
|
|
for _, conditionUncast := range conditions {
|
|
condition := conditionUncast.(map[string]interface{})
|
|
name, found, err := unstructured.NestedString(condition, "type")
|
|
if !found || err != nil || !strings.EqualFold(name, w.conditionName) {
|
|
continue
|
|
}
|
|
status, found, err := unstructured.NestedString(condition, "status")
|
|
if !found || err != nil {
|
|
continue
|
|
}
|
|
generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation")
|
|
if found {
|
|
observedGeneration, found := getObservedGeneration(obj, condition)
|
|
if found && observedGeneration < generation {
|
|
return false, nil
|
|
}
|
|
}
|
|
return strings.EqualFold(status, w.conditionStatus), nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
|
|
if event.Type == watch.Error {
|
|
// keep waiting in the event we see an error - we expect the watch to be closed by
|
|
// the server
|
|
err := apierrors.FromObject(event.Object)
|
|
fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
|
|
return false, nil
|
|
}
|
|
if event.Type == watch.Deleted {
|
|
// this will chain back out, result in another get and an return false back up the chain
|
|
return false, nil
|
|
}
|
|
obj := event.Object.(*unstructured.Unstructured)
|
|
return w.checkCondition(obj)
|
|
}
|
|
|
|
func extendErrWaitTimeout(err error, info *resource.Info) error {
|
|
return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name)
|
|
}
|
|
|
|
func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) {
|
|
conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration")
|
|
if found {
|
|
return conditionObservedGeneration, true
|
|
}
|
|
statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration")
|
|
return statusObservedGeneration, found
|
|
}
|
|
|
|
// JSONPathWait holds a JSONPath Parser which has the ability
|
|
// to check for the JSONPath condition and compare with the API server provided JSON output.
|
|
type JSONPathWait struct {
|
|
matchAnyValue bool
|
|
jsonPathValue string
|
|
jsonPathParser *jsonpath.JSONPath
|
|
// errOut is written to if an error occurs
|
|
errOut io.Writer
|
|
}
|
|
|
|
// IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check
|
|
func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
|
|
return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition)
|
|
}
|
|
|
|
// isJSONPathConditionMet is a helper function of IsJSONPathConditionMet
|
|
// which check the watch event and check if a JSONPathWait condition is met
|
|
func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) {
|
|
if event.Type == watch.Error {
|
|
// keep waiting in the event we see an error - we expect the watch to be closed by
|
|
// the server
|
|
err := apierrors.FromObject(event.Object)
|
|
fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
|
|
return false, nil
|
|
}
|
|
if event.Type == watch.Deleted {
|
|
// this will chain back out, result in another get and an return false back up the chain
|
|
return false, nil
|
|
}
|
|
// event runtime Object can be safely asserted to Unstructed
|
|
// because we are working with dynamic client
|
|
obj := event.Object.(*unstructured.Unstructured)
|
|
return j.checkCondition(obj)
|
|
}
|
|
|
|
// checkCondition uses JSONPath parser to parse the JSON received from the API server
|
|
// and check if it matches the desired condition
|
|
func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
|
|
queryObj := obj.UnstructuredContent()
|
|
parseResults, err := j.jsonPathParser.FindResults(queryObj)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if len(parseResults) == 0 || len(parseResults[0]) == 0 {
|
|
return false, nil
|
|
}
|
|
if err := verifyParsedJSONPath(parseResults); err != nil {
|
|
return false, err
|
|
}
|
|
if j.matchAnyValue {
|
|
return true, nil
|
|
}
|
|
isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathValue)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return isConditionMet, nil
|
|
}
|
|
|
|
// verifyParsedJSONPath verifies the JSON received from the API server is valid.
|
|
// It will only accept a single JSON
|
|
func verifyParsedJSONPath(results [][]reflect.Value) error {
|
|
if len(results) > 1 {
|
|
return errors.New("given jsonpath expression matches more than one list")
|
|
}
|
|
if len(results[0]) > 1 {
|
|
return errors.New("given jsonpath expression matches more than one value")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// compareResults will compare the reflect.Value from the result parsed by the
|
|
// JSONPath parser with the expected value given by the value
|
|
//
|
|
// Since this is coming from an unstructured this can only ever be a primitive,
|
|
// map[string]interface{}, or []interface{}.
|
|
// We do not support the last two and rely on fmt to handle conversion to string
|
|
// and compare the result with user input
|
|
func compareResults(r reflect.Value, expectedVal string) (bool, error) {
|
|
switch r.Interface().(type) {
|
|
case map[string]interface{}, []interface{}:
|
|
return false, errors.New("jsonpath leads to a nested object or list which is not supported")
|
|
}
|
|
s := fmt.Sprintf("%v", r.Interface())
|
|
return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil
|
|
}
|