diff --git a/go.mod b/go.mod index 330d8b1ae..d32c44294 100644 --- a/go.mod +++ b/go.mod @@ -22,10 +22,12 @@ require ( github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 go.uber.org/atomic v1.7.0 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 + golang.org/x/text v0.3.7 golang.org/x/time v0.0.0-20220609170525-579cf78fd858 golang.org/x/tools v0.1.12 gomodules.xyz/jsonpatch/v2 v2.2.0 google.golang.org/grpc v1.47.0 + gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.25.4 k8s.io/apiextensions-apiserver v0.25.4 k8s.io/apimachinery v0.25.4 @@ -159,7 +161,6 @@ require ( golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect - golang.org/x/text v0.3.7 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect google.golang.org/protobuf v1.28.0 // indirect @@ -168,7 +169,6 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/square/go-jose.v2 v2.2.2 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect diff --git a/pkg/karmadactl/interpret/execute.go b/pkg/karmadactl/interpret/execute.go index ea8b25594..d8aab86b7 100644 --- a/pkg/karmadactl/interpret/execute.go +++ b/pkg/karmadactl/interpret/execute.go @@ -1,16 +1,156 @@ package interpret import ( + "encoding/json" "fmt" + "io" + "strings" - "github.com/spf13/cobra" + "gopkg.in/yaml.v3" + "k8s.io/cli-runtime/pkg/resource" "k8s.io/kubectl/pkg/cmd/util" + + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/karmadactl/util/genericresource" + "github.com/karmada-io/karmada/pkg/resourceinterpreter/configurableinterpreter" ) -func (o *Options) completeExecute(_ util.Factory, _ *cobra.Command, _ []string) []error { - return nil +func (o *Options) completeExecute(f util.Factory) []error { + var errs []error + if o.DesiredFile != "" { + o.DesiredResult = f.NewBuilder(). + Unstructured(). + FilenameParam(false, &resource.FilenameOptions{Filenames: []string{o.DesiredFile}}). + RequireObject(true). + Local(). + Do() + errs = append(errs, o.DesiredResult.Err()) + } + + if o.ObservedFile != "" { + o.ObservedResult = f.NewBuilder(). + Unstructured(). + FilenameParam(false, &resource.FilenameOptions{Filenames: []string{o.ObservedFile}}). + RequireObject(true). + Local(). + Do() + errs = append(errs, o.ObservedResult.Err()) + } + + if len(o.StatusFile) > 0 { + o.StatusResult = genericresource.NewBuilder(). + Constructor(func() interface{} { return &workv1alpha2.AggregatedStatusItem{} }). + Filename(false, o.StatusFile). + Do() + errs = append(errs, o.StatusResult.Err()) + } + return errs } func (o *Options) runExecute() error { - return fmt.Errorf("not implement") + if o.Operation == "" { + return fmt.Errorf("operation is not set for executing") + } + + customizations, err := o.getCustomizationObject() + if err != nil { + return fmt.Errorf("fail to get customization object: %v", err) + } + + desired, err := getUnstructuredObjectFromResult(o.DesiredResult) + if err != nil { + return fmt.Errorf("fail to get desired object: %v", err) + } + + observed, err := getUnstructuredObjectFromResult(o.ObservedResult) + if err != nil { + return fmt.Errorf("fail to get observed object: %v", err) + } + + status, err := o.getAggregatedStatusItems() + if err != nil { + return fmt.Errorf("fail to get status items: %v", err) + } + + args := ruleArgs{ + Desired: desired, + Observed: observed, + Status: status, + Replica: int64(o.DesiredReplica), + } + + interpreter := configurableinterpreter.NewConfigurableInterpreter(nil) + interpreter.LoadConfig(customizations) + + r := o.Rules.GetByOperation(o.Operation) + if r == nil { + // Shall never occur, because we validate it before. + return fmt.Errorf("operation %s is not supported. Use one of: %s", o.Operation, strings.Join(o.Rules.Names(), ", ")) + } + result := r.Run(interpreter, args) + printExecuteResult(o.Out, o.ErrOut, r.Name(), result) + return nil +} + +func printExecuteResult(w, errOut io.Writer, name string, result *ruleResult) { + if result.Err != nil { + fmt.Fprintf(errOut, "Execute %s error: %v\n", name, result.Err) + return + } + + for i, res := range result.Results { + func() { + fmt.Fprintln(w, "---") + fmt.Fprintf(w, "# [%v/%v] %s:\n", i+1, len(result.Results), res.Name) + if err := printObjectYaml(w, res.Value); err != nil { + fmt.Fprintf(errOut, "ERROR: %v\n", err) + } + }() + } +} + +// MarshalJSON doesn't work for yaml encoder, so unstructured.Unstructured and runtime.RawExtension objects +// will be encoded into unexpected data. +// Example1: +// +// &unstructured.Unstructured{ +// Object: map[string]interface{}{ +// "foo": "bar" +// }, +// } +// +// will be encoded into: +// +// Object: +// foo: bar +// +// Example2: +// +// &runtime.RawExtension{ +// Raw: []byte("{}"), +// } +// +// will be encoded into: +// +// raw: +// - 123 +// - 125 +// +// Inspired from https://github.com/kubernetes/kubernetes/blob/8fb423bfabe0d53934cc94c154c7da2dc3ce1332/staging/src/k8s.io/kubectl/pkg/cmd/get/get.go#L781-L786 +// we convert it to map[string]interface{} by json, then encode the converted object to yaml. +func printObjectYaml(w io.Writer, obj interface{}) error { + data, err := json.Marshal(obj) + if err != nil { + return err + } + + var converted interface{} + err = json.Unmarshal(data, &converted) + if err != nil { + return err + } + + encoder := yaml.NewEncoder(w) + defer encoder.Close() + return encoder.Encode(converted) } diff --git a/pkg/karmadactl/interpret/interpret.go b/pkg/karmadactl/interpret/interpret.go index 12360c22b..21223c414 100644 --- a/pkg/karmadactl/interpret/interpret.go +++ b/pkg/karmadactl/interpret/interpret.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/cli-runtime/pkg/genericclioptions" @@ -13,9 +14,12 @@ import ( "k8s.io/kubectl/pkg/util/templates" configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/karmadactl/options" "github.com/karmada-io/karmada/pkg/karmadactl/util" + "github.com/karmada-io/karmada/pkg/karmadactl/util/genericresource" "github.com/karmada-io/karmada/pkg/util/gclient" + "github.com/karmada-io/karmada/pkg/util/helper" ) var ( @@ -32,18 +36,30 @@ var ( interpretExample = templates.Examples(` # Check the customizations in file %[1]s interpret -f customization.json --check - # Execute the retention rule for + + # Execute the retention rule %[1]s interpret -f customization.yml --operation retain --desired-file desired.yml --observed-file observed.yml - # Execute the replicaRevision rule for + + # Execute the replicaResource rule + %[1]s interpret -f customization.yml --operation interpretReplica --observed-file observed.yml + + # Execute the replicaRevision rule %[1]s interpret -f customization.yml --operation reviseReplica --observed-file observed.yml --desired-replica 2 - # Execute the statusReflection rule for + + # Execute the statusReflection rule %[1]s interpret -f customization.yml --operation interpretStatus --observed-file observed.yml + # Execute the healthInterpretation rule %[1]s interpret -f customization.yml --operation interpretHealth --observed-file observed.yml + # Execute the dependencyInterpretation rule %[1]s interpret -f customization.yml --operation interpretDependency --observed-file observed.yml + # Execute the statusAggregation rule - %[1]s interpret -f customization.yml --operation aggregateStatus --status-file status1.yml --status-file status2.yml + %[1]s interpret -f customization.yml --operation aggregateStatus --observed-file observed.yml --status-file status.yml + + # Fetch observed object from url, and status items from stdin (specified with -) + %[1]s interpret -f customization.yml --operation aggregateStatus --observed-file https://example.com/observed.yml --status-file - `) ) @@ -81,7 +97,7 @@ func NewCmdInterpret(f util.Factory, parentCommand string, streams genericcliopt flags.BoolVar(&o.Check, "check", false, "Validates the given ResourceInterpreterCustomization configuration(s)") flags.StringVar(&o.DesiredFile, "desired-file", o.DesiredFile, "Filename, directory, or URL to files identifying the resource to use as desiredObj argument in rule script.") flags.StringVar(&o.ObservedFile, "observed-file", o.ObservedFile, "Filename, directory, or URL to files identifying the resource to use as observedObj argument in rule script.") - flags.StringSliceVar(&o.StatusFile, "status-file", o.StatusFile, "Filename, directory, or URL to files identifying the resource to use as statusItems argument in rule script.") + flags.StringVar(&o.StatusFile, "status-file", o.StatusFile, "Filename, directory, or URL to files identifying the resource to use as statusItems argument in rule script.") flags.Int32Var(&o.DesiredReplica, "desired-replica", o.DesiredReplica, "The desiredReplica argument in rule script.") cmdutil.AddJsonFilenameFlag(flags, &o.FilenameOptions.Filenames, "Filename, directory, or URL to files containing the customizations") flags.BoolVarP(&o.FilenameOptions.Recursive, "recursive", "R", false, "Process the directory used in -f, --filename recursively. Useful when you want to manage related manifests organized within the same directory.") @@ -99,10 +115,13 @@ type Options struct { // args DesiredFile string ObservedFile string - StatusFile []string + StatusFile string DesiredReplica int32 CustomizationResult *resource.Result + DesiredResult *resource.Result + ObservedResult *resource.Result + StatusResult *genericresource.Result Rules Rules @@ -122,12 +141,18 @@ func (o *Options) Complete(f util.Factory, cmd *cobra.Command, args []string) er var errs []error errs = append(errs, o.CustomizationResult.Err()) - errs = append(errs, o.completeExecute(f, cmd, args)...) + errs = append(errs, o.completeExecute(f)...) return errors.NewAggregate(errs) } -// Validate checks the EditOptions to see if there is sufficient information to run the command. +// Validate validates Options. func (o *Options) Validate() error { + if o.Operation != "" { + r := o.Rules.GetByOperation(o.Operation) + if r == nil { + return fmt.Errorf("operation %s is not supported. Use one of: %s", o.Operation, strings.Join(o.Rules.Names(), ", ")) + } + } return nil } @@ -141,6 +166,56 @@ func (o *Options) Run() error { } } +func (o *Options) getCustomizationObject() ([]*configv1alpha1.ResourceInterpreterCustomization, error) { + infos, err := o.CustomizationResult.Infos() + if err != nil { + return nil, err + } + + customizations := make([]*configv1alpha1.ResourceInterpreterCustomization, len(infos)) + for i, info := range infos { + c, err := asResourceInterpreterCustomization(info.Object) + if err != nil { + return nil, err + } + customizations[i] = c + } + return customizations, nil +} + +func (o *Options) getAggregatedStatusItems() ([]workv1alpha2.AggregatedStatusItem, error) { + if o.StatusResult == nil { + return nil, nil + } + + objs, err := o.StatusResult.Objects() + if err != nil { + return nil, err + } + items := make([]workv1alpha2.AggregatedStatusItem, len(objs)) + for i, obj := range objs { + items[i] = *(obj.(*workv1alpha2.AggregatedStatusItem)) + } + return items, nil +} + +func getUnstructuredObjectFromResult(result *resource.Result) (*unstructured.Unstructured, error) { + if result == nil { + return nil, nil + } + + infos, err := result.Infos() + if err != nil { + return nil, err + } + + if len(infos) > 1 { + return nil, fmt.Errorf("get %v objects, expect one at most", len(infos)) + } + + return helper.ToUnstructured(infos[0].Object) +} + func asResourceInterpreterCustomization(o runtime.Object) (*configv1alpha1.ResourceInterpreterCustomization, error) { c, ok := o.(*configv1alpha1.ResourceInterpreterCustomization) if !ok { diff --git a/pkg/karmadactl/interpret/rule.go b/pkg/karmadactl/interpret/rule.go index 65b03d263..b439a50b3 100644 --- a/pkg/karmadactl/interpret/rule.go +++ b/pkg/karmadactl/interpret/rule.go @@ -2,6 +2,7 @@ package interpret import ( "fmt" + "strings" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -218,14 +219,19 @@ func (s *statusAggregationRule) Run(interpreter *configurableinterpreter.Configu if err != nil { return newRuleResultWithError(err) } - aggregateStatus, enabled, err := interpreter.AggregateStatus(obj, args.Status) + + status := args.Status + if status == nil { + status = []workv1alpha2.AggregatedStatusItem{} + } + aggregateStatus, enabled, err := interpreter.AggregateStatus(obj, status) if err != nil { return newRuleResultWithError(err) } if !enabled { return newRuleResultWithError(fmt.Errorf("rule is not enabled")) } - return newRuleResult().add("aggregateStatus", aggregateStatus) + return newRuleResult().add("aggregatedStatus", aggregateStatus) } type healthInterpretationRule struct { @@ -334,6 +340,21 @@ func (r Rules) Names() []string { return names } +// GetByOperation returns the matched rule by operation name, ignoring case. Return nil if none is matched. +func (r Rules) GetByOperation(operation string) Rule { + if operation == "" { + return nil + } + operation = strings.ToLower(operation) + for _, rule := range r { + ruleName := strings.ToLower(rule.Name()) + if ruleName == operation { + return rule + } + } + return nil +} + // Get returns the rule with the name. If not found, return nil. func (r Rules) Get(name string) Rule { for _, rr := range r { @@ -367,10 +388,10 @@ func (r ruleArgs) getObservedObjectOrError() (*unstructured.Unstructured, error) func (r ruleArgs) getObjectOrError() (*unstructured.Unstructured, error) { if r.Desired == nil && r.Observed == nil { - return nil, fmt.Errorf("desired, desired-file, observed, observed-file options are not set") + return nil, fmt.Errorf("desired-file, observed-file options are not set") } if r.Desired != nil && r.Observed != nil { - return nil, fmt.Errorf("you can not specify multiple object by desired, desired-file, observed, observed-file options") + return nil, fmt.Errorf("you can not specify both desired-file and observed-file options") } if r.Desired != nil { return r.Desired, nil diff --git a/pkg/karmadactl/util/genericresource/builder.go b/pkg/karmadactl/util/genericresource/builder.go new file mode 100644 index 000000000..d6806950c --- /dev/null +++ b/pkg/karmadactl/util/genericresource/builder.go @@ -0,0 +1,167 @@ +package genericresource + +import ( + "fmt" + "net/url" + "os" + "path/filepath" + "strings" + + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/cli-runtime/pkg/resource" +) + +const defaultHTTPGetAttempts int = 3 + +var defaultNewFunc = func() interface{} { + return map[string]interface{}{} +} + +var errMissingResource = fmt.Errorf(`you must provide one or more resources`) + +// Builder provides convenience functions for taking arguments and parameters +// from the command line and converting them to a list of resources to iterate +// over using the Visitor interface. +type Builder struct { + errs []error + paths []Visitor + stdinInUse bool + mapper *mapper + schema resource.ContentValidator +} + +// NewBuilder returns a Builder. +func NewBuilder() *Builder { + return &Builder{ + mapper: &mapper{ + newFunc: defaultNewFunc, + }, + } +} + +// Schema set the schema to validate data in files. +func (b *Builder) Schema(schema resource.ContentValidator) *Builder { + b.schema = schema + return b +} + +// Constructor tells wanted type of object. +func (b *Builder) Constructor(newFunc func() interface{}) *Builder { + b.mapper.newFunc = newFunc + return b +} + +// Filename groups input in two categories: URLs and files (files, directories, STDIN) +func (b *Builder) Filename(recursive bool, filenames ...string) *Builder { + for _, s := range filenames { + switch { + case s == "-": + b.Stdin() + case strings.HasPrefix(s, "http://") || strings.HasPrefix(s, "https://"): + u, err := url.Parse(s) + if err != nil { + b.errs = append(b.errs, fmt.Errorf("the URL passed to filename %q is not valid: %v", s, err)) + continue + } + b.URL(defaultHTTPGetAttempts, u) + default: + matches, err := expandIfFilePattern(s) + if err != nil { + b.errs = append(b.errs, err) + continue + } + b.Path(recursive, matches...) + } + } + return b +} + +// Stdin will read objects from the standard input. +func (b *Builder) Stdin() *Builder { + if b.stdinInUse { + b.errs = append(b.errs, resource.StdinMultiUseError) + } + b.stdinInUse = true + b.paths = append(b.paths, FileVisitorForSTDIN(b.mapper, b.schema)) + return b +} + +// URL accepts a number of URLs directly. +func (b *Builder) URL(httpAttemptCount int, urls ...*url.URL) *Builder { + for _, u := range urls { + b.paths = append(b.paths, NewURLVisitor(b.mapper, httpAttemptCount, u, b.schema)) + } + return b +} + +// Path accepts a set of paths that may be files, directories (all can contain +// one or more resources). Creates a FileVisitor for each file and then each +// FileVisitor is streaming the content to a StreamVisitor. +func (b *Builder) Path(recursive bool, paths ...string) *Builder { + for _, p := range paths { + _, err := os.Stat(p) + if os.IsNotExist(err) { + b.errs = append(b.errs, fmt.Errorf("the path %q does not exist", p)) + continue + } + if err != nil { + b.errs = append(b.errs, fmt.Errorf("the path %q cannot be accessed: %v", p, err)) + continue + } + + visitors, err := ExpandPathsToFileVisitors(b.mapper, p, recursive, resource.FileExtensions, b.schema) + if err != nil { + b.errs = append(b.errs, fmt.Errorf("error reading %q: %v", p, err)) + } + + b.paths = append(b.paths, visitors...) + } + if len(b.paths) == 0 && len(b.errs) == 0 { + b.errs = append(b.errs, fmt.Errorf("error reading %v: recognized file extensions are %v", paths, resource.FileExtensions)) + } + return b +} + +// Do returns a Result object with a Visitor for the resources identified by the Builder. Note that stream +// inputs are consumed by the first execution - use Infos() or Objects() on the Result to capture a list +// for further iteration. +func (b *Builder) Do() *Result { + r := b.visitorResult() + return r +} + +func (b *Builder) visitorResult() *Result { + if len(b.errs) > 0 { + return &Result{err: utilerrors.NewAggregate(b.errs)} + } + + // visit items specified by paths + if len(b.paths) != 0 { + return b.visitByPaths() + } + return &Result{err: errMissingResource} +} + +func (b *Builder) visitByPaths() *Result { + result := &Result{} + + result.visitor = VisitorList(b.paths) + return result +} + +// expandIfFilePattern returns all the filenames that match the input pattern +// or the filename if it is a specific filename and not a pattern. +// If the input is a pattern and it yields no result it will result in an error. +func expandIfFilePattern(pattern string) ([]string, error) { + if _, err := os.Stat(pattern); os.IsNotExist(err) { + matches, err := filepath.Glob(pattern) + if err == nil && len(matches) == 0 { + return nil, fmt.Errorf("the path %q does not exist", pattern) + } + if err == filepath.ErrBadPattern { + return nil, fmt.Errorf("pattern %q is not valid: %v", pattern, err) + } + return matches, err + } + return []string{pattern}, nil +} diff --git a/pkg/karmadactl/util/genericresource/doc.go b/pkg/karmadactl/util/genericresource/doc.go new file mode 100644 index 000000000..ac1c0c321 --- /dev/null +++ b/pkg/karmadactl/util/genericresource/doc.go @@ -0,0 +1,3 @@ +// Package genericresource is modified from "k8s.io/cli-runtime/pkg/resource". +// It can fetch any object (not only runtime.object) from file, http, and stdin. +package genericresource diff --git a/pkg/karmadactl/util/genericresource/interface.go b/pkg/karmadactl/util/genericresource/interface.go new file mode 100644 index 000000000..4dd98aad3 --- /dev/null +++ b/pkg/karmadactl/util/genericresource/interface.go @@ -0,0 +1,13 @@ +package genericresource + +// Visitor lets clients walk a list of resources. +type Visitor interface { + Visit(VisitorFunc) error +} + +// VisitorFunc implements the Visitor interface for a matching function. +// If there was a problem walking a list of resources, the incoming error +// will describe the problem and the function can decide how to handle that error. +// A nil returned indicates to accept an error to continue loops even when errors happen. +// This is useful for ignoring certain kinds of errors or aggregating errors in some way. +type VisitorFunc func(*Info, error) error diff --git a/pkg/karmadactl/util/genericresource/result.go b/pkg/karmadactl/util/genericresource/result.go new file mode 100644 index 000000000..b3129c78c --- /dev/null +++ b/pkg/karmadactl/util/genericresource/result.go @@ -0,0 +1,74 @@ +package genericresource + +import ( + utilerrors "k8s.io/apimachinery/pkg/util/errors" +) + +// Result contains helper methods for dealing with the outcome of a Builder. +type Result struct { + err error + visitor Visitor + + // populated by a call to Infos + info []*Info + + ignoreErrors []utilerrors.Matcher +} + +// Err returns one or more errors (via a util.ErrorList) that occurred prior +// to visiting the elements in the visitor. To see all errors including those +// that occur during visitation, invoke Infos(). +func (r *Result) Err() error { + return r.err +} + +// Objects returns the list of objects of all found resources. +func (r *Result) Objects() ([]interface{}, error) { + infos, err := r.Infos() + if err != nil { + return nil, err + } + + objects := make([]interface{}, len(infos)) + for i, info := range infos { + objects[i] = info.Object + } + return objects, err +} + +// Infos returns an array of all of the resource infos retrieved via traversal. +// Will attempt to traverse the entire set of visitors only once, and will return +// a cached list on subsequent calls. +func (r *Result) Infos() ([]*Info, error) { + if r.err != nil { + return nil, r.err + } + if r.info != nil { + return r.info, nil + } + + var infos []*Info + err := r.visitor.Visit(func(info *Info, err error) error { + if err != nil { + return err + } + infos = append(infos, info) + return nil + }) + err = utilerrors.FilterOut(err, r.ignoreErrors...) + + r.info, r.err = infos, err + return infos, err +} + +// Visit implements the Visitor interface on the items described in the Builder. +// Note that some visitor sources are not traversable more than once, or may +// return different results. If you wish to operate on the same set of resources +// multiple times, use the Infos() method. +func (r *Result) Visit(fn VisitorFunc) error { + if r.err != nil { + return r.err + } + err := r.visitor.Visit(fn) + return utilerrors.FilterOut(err, r.ignoreErrors...) +} diff --git a/pkg/karmadactl/util/genericresource/visitor.go b/pkg/karmadactl/util/genericresource/visitor.go new file mode 100644 index 000000000..00cfa61cd --- /dev/null +++ b/pkg/karmadactl/util/genericresource/visitor.go @@ -0,0 +1,294 @@ +package genericresource + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "time" + + "golang.org/x/text/encoding/unicode" + "golang.org/x/text/transform" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/cli-runtime/pkg/resource" +) + +const ( + constSTDINstr = "STDIN" +) + +// VisitorList implements Visit for the sub visitors it contains. The first error +// returned from a child Visitor will terminate iteration. +type VisitorList []Visitor + +// Visit implements Visitor +func (l VisitorList) Visit(fn VisitorFunc) error { + for i := range l { + if err := l[i].Visit(fn); err != nil { + return err + } + } + return nil +} + +// URLVisitor downloads the contents of a URL, and if successful, returns +// an info object representing the downloaded object. +type URLVisitor struct { + URL *url.URL + *StreamVisitor + HTTPAttemptCount int +} + +// NewURLVisitor returns a visitor to download from given url. It will max retry "httpAttemptCount" when failed. +func NewURLVisitor(mapper *mapper, httpAttemptCount int, u *url.URL, schema resource.ContentValidator) *URLVisitor { + return &URLVisitor{ + URL: u, + StreamVisitor: NewStreamVisitor(nil, mapper, u.String(), schema), + HTTPAttemptCount: httpAttemptCount, + } +} + +// Visit down object from url. +func (v *URLVisitor) Visit(fn VisitorFunc) error { + body, err := readHTTPWithRetries(httpgetImpl, time.Second, v.URL.String(), v.HTTPAttemptCount) + if err != nil { + return err + } + defer body.Close() + v.StreamVisitor.Reader = body + return v.StreamVisitor.Visit(fn) +} + +func ignoreFile(path string, extensions []string) bool { + if len(extensions) == 0 { + return false + } + ext := filepath.Ext(path) + for _, s := range extensions { + if s == ext { + return false + } + } + return true +} + +// FileVisitorForSTDIN return a special FileVisitor just for STDIN +func FileVisitorForSTDIN(mapper *mapper, schema resource.ContentValidator) Visitor { + return &FileVisitor{ + Path: constSTDINstr, + StreamVisitor: NewStreamVisitor(nil, mapper, constSTDINstr, schema), + } +} + +// ExpandPathsToFileVisitors will return a slice of FileVisitors that will handle files from the provided path. +// After FileVisitors open the files, they will pass an io.Reader to a StreamVisitor to do the reading. (stdin +// is also taken care of). Paths argument also accepts a single file, and will return a single visitor +func ExpandPathsToFileVisitors(mapper *mapper, paths string, recursive bool, extensions []string, schema resource.ContentValidator) ([]Visitor, error) { + var visitors []Visitor + err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + + if fi.IsDir() { + if path != paths && !recursive { + return filepath.SkipDir + } + return nil + } + // Don't check extension if the filepath was passed explicitly + if path != paths && ignoreFile(path, extensions) { + return nil + } + + visitor := &FileVisitor{ + Path: path, + StreamVisitor: NewStreamVisitor(nil, mapper, path, schema), + } + + visitors = append(visitors, visitor) + return nil + }) + + if err != nil { + return nil, err + } + return visitors, nil +} + +// FileVisitor is wrapping around a StreamVisitor, to handle open/close files +type FileVisitor struct { + Path string + *StreamVisitor +} + +// Visit in a FileVisitor is just taking care of opening/closing files +func (v *FileVisitor) Visit(fn VisitorFunc) error { + var f *os.File + if v.Path == constSTDINstr { + f = os.Stdin + } else { + var err error + f, err = os.Open(v.Path) + if err != nil { + return err + } + defer f.Close() + } + + // TODO: Consider adding a flag to force to UTF16, apparently some + // Windows tools don't write the BOM + utf16bom := unicode.BOMOverride(unicode.UTF8.NewDecoder()) + v.StreamVisitor.Reader = transform.NewReader(f, utf16bom) + + return v.StreamVisitor.Visit(fn) +} + +// StreamVisitor reads objects from an io.Reader and walks them. A stream visitor can only be +// visited once. +// Unmarshal stream to object by json. +type StreamVisitor struct { + io.Reader + *mapper + + Source string + Schema resource.ContentValidator +} + +// NewStreamVisitor is a helper function that is useful when we want to change the fields of the struct but keep calls the same. +func NewStreamVisitor(r io.Reader, mapper *mapper, source string, schema resource.ContentValidator) *StreamVisitor { + return &StreamVisitor{ + Reader: r, + mapper: mapper, + Source: source, + Schema: schema, + } +} + +// Visit implements Visitor over a stream. StreamVisitor is able to distinct multiple resources in one stream. +func (v *StreamVisitor) Visit(fn VisitorFunc) error { + d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096) + for { + ext := runtime.RawExtension{} + if err := d.Decode(&ext); err != nil { + if err == io.EOF { + return nil + } + return fmt.Errorf("error parsing %s: %v", v.Source, err) + } + // TODO: This needs to be able to handle object in other encodings and schemas. + ext.Raw = bytes.TrimSpace(ext.Raw) + if len(ext.Raw) == 0 || bytes.Equal(ext.Raw, []byte("null")) { + continue + } + if err := resource.ValidateSchema(ext.Raw, v.Schema); err != nil { + return fmt.Errorf("error validating %q: %v", v.Source, err) + } + info, err := v.infoForData(ext.Raw, v.Source) + if err != nil { + if fnErr := fn(info, err); fnErr != nil { + return fnErr + } + continue + } + if err = fn(info, nil); err != nil { + return err + } + } +} + +type mapper struct { + newFunc func() interface{} +} + +func (m *mapper) infoForData(data []byte, source string) (*Info, error) { + info := &Info{ + Source: source, + Data: data, + Object: m.newFunc(), + } + + err := json.Unmarshal(data, info.Object) + if err != nil { + return nil, err + } + + return info, nil +} + +// httpget Defines function to retrieve a url and return the results. Exists for unit test stubbing. +type httpget func(url string) (int, string, io.ReadCloser, error) + +// httpgetImpl Implements a function to retrieve a url and return the results. +func httpgetImpl(url string) (int, string, io.ReadCloser, error) { + // nolint:gosec + resp, err := http.Get(url) + if err != nil { + return 0, "", nil, err + } + return resp.StatusCode, resp.Status, resp.Body, nil +} + +// readHTTPWithRetries tries to http.Get the v.URL retries times before giving up. +func readHTTPWithRetries(get httpget, duration time.Duration, u string, attempts int) (io.ReadCloser, error) { + var err error + if attempts <= 0 { + return nil, fmt.Errorf("http attempts must be greater than 0, was %d", attempts) + } + for i := 0; i < attempts; i++ { + var ( + statusCode int + status string + body io.ReadCloser + ) + if i > 0 { + time.Sleep(duration) + } + + // Try to get the URL + statusCode, status, body, err = get(u) + + // Retry Errors + if err != nil { + continue + } + + if statusCode == http.StatusOK { + return body, nil + } + body.Close() + // Error - Set the error condition from the StatusCode + err = fmt.Errorf("unable to read URL %q, server reported %s, status code=%d", u, status, statusCode) + + if statusCode >= 500 && statusCode < 600 { + // Retry 500's + continue + } else { + // Don't retry other StatusCodes + break + } + } + return nil, err +} + +// Info contains temporary info to execute a REST call, or show the results +// of an already completed REST call. +type Info struct { + // Optional, Source is the filename or URL to template file (.json or .yaml), + // or stdin to use to handle the resource + Source string + // Optional, this is the most recent value returned by the server if available. It will + // typically be in unstructured or internal forms, depending on how the Builder was + // defined. If retrieved from the server, the Builder expects the mapping client to + // decide the final form. Use the AsVersioned, AsUnstructured, and AsInternal helpers + // to alter the object versions. + // If Subresource is specified, this will be the object for the subresource. + Data []byte + + Object interface{} +} diff --git a/pkg/resourceinterpreter/configurableinterpreter/configmanager/manager.go b/pkg/resourceinterpreter/configurableinterpreter/configmanager/manager.go index 404e174d6..ec8c45fff 100644 --- a/pkg/resourceinterpreter/configurableinterpreter/configmanager/manager.go +++ b/pkg/resourceinterpreter/configurableinterpreter/configmanager/manager.go @@ -27,6 +27,7 @@ var resourceInterpreterCustomizationsGVR = schema.GroupVersionResource{ type ConfigManager interface { LuaScriptAccessors() map[schema.GroupVersionKind]CustomAccessor HasSynced() bool + LoadConfig(customizations []*configv1alpha1.ResourceInterpreterCustomization) } // interpreterConfigManager collects the resource interpreter customization. @@ -61,19 +62,24 @@ func (configManager *interpreterConfigManager) HasSynced() bool { // NewInterpreterConfigManager watches ResourceInterpreterCustomization and organizes // the configurations in the cache. -func NewInterpreterConfigManager(inform genericmanager.SingleClusterInformerManager) ConfigManager { +func NewInterpreterConfigManager(informer genericmanager.SingleClusterInformerManager) ConfigManager { manager := &interpreterConfigManager{ - lister: inform.Lister(resourceInterpreterCustomizationsGVR), initialSynced: &atomic.Value{}, configuration: &atomic.Value{}, } manager.configuration.Store(make(map[schema.GroupVersionKind]CustomAccessor)) manager.initialSynced.Store(false) - configHandlers := fedinformer.NewHandlerOnEvents( - func(_ interface{}) { manager.updateConfiguration() }, - func(_, _ interface{}) { manager.updateConfiguration() }, - func(_ interface{}) { manager.updateConfiguration() }) - inform.ForResource(resourceInterpreterCustomizationsGVR, configHandlers) + + // In interpret command, rules are not loaded from server, so we don't start informer for it. + if informer != nil { + manager.lister = informer.Lister(resourceInterpreterCustomizationsGVR) + configHandlers := fedinformer.NewHandlerOnEvents( + func(_ interface{}) { manager.updateConfiguration() }, + func(_, _ interface{}) { manager.updateConfiguration() }, + func(_ interface{}) { manager.updateConfiguration() }) + informer.ForResource(resourceInterpreterCustomizationsGVR, configHandlers) + } + return manager } @@ -94,6 +100,10 @@ func (configManager *interpreterConfigManager) updateConfiguration() { configs[index] = config } + configManager.LoadConfig(configs) +} + +func (configManager *interpreterConfigManager) LoadConfig(configs []*configv1alpha1.ResourceInterpreterCustomization) { sort.Slice(configs, func(i, j int) bool { return configs[i].Name < configs[j].Name }) diff --git a/pkg/resourceinterpreter/configurableinterpreter/configurable.go b/pkg/resourceinterpreter/configurableinterpreter/configurable.go index c8a3eea71..5b88c643f 100644 --- a/pkg/resourceinterpreter/configurableinterpreter/configurable.go +++ b/pkg/resourceinterpreter/configurableinterpreter/configurable.go @@ -145,3 +145,8 @@ func (c *ConfigurableInterpreter) getInterpreter(kind schema.GroupVersionKind, o } return script, len(script) > 0 } + +// LoadConfig loads and stores rules from customizations +func (c *ConfigurableInterpreter) LoadConfig(customizations []*configv1alpha1.ResourceInterpreterCustomization) { + c.configManager.LoadConfig(customizations) +}